Compare commits

..

3 Commits

Author SHA1 Message Date
panda f0fcea03bb 1.新增gitignore文件
2.新启动脚本制作
2026-01-30 11:41:05 +08:00
panda 838453b88f V2.1客户信息修改、项目批量停用、项目批量修改、材料批量修改功能上线 2026-01-28 15:26:48 +08:00
panda 98944ecbdc 新增并注册项目信息修改 2026-01-27 14:07:41 +08:00
10 changed files with 930 additions and 205 deletions
+179
View File
@@ -0,0 +1,179 @@
### Python template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
### 默认 template
# IntelliJ 文件
.idea
*.iml
out
gen
# 数据文件
*.csv
*.xlsx
*.xls
# 环境文件
.vscode
.conda
+119 -9
View File
@@ -9,12 +9,15 @@ F6 后台执行模块
- 车辆信息管理
- 项目信息批量启停
- 材料信息批量修改
- 项目信息批量修改
依赖:
- requests: HTTP 请求
- pandas: Excel 文件处理
- threading: 后台任务处理
"""
import logging
import traceback
import requests
from urllib.parse import quote
import pandas as pd
@@ -36,6 +39,7 @@ from app.tasks.delete_tasks import (
from app.tasks.material_tasks import (
batch_disable_projects,
batch_modify_materials,
batch_modify_projects
)
from app.tasks.customer_tasks import modify_customer_info_background
from app.tasks.bi_tasks import bi_task_background
@@ -43,6 +47,8 @@ from app.tasks.bi_tasks import bi_task_background
# 简道云 API 实例,用于调用简道云 API
api_instance = API()
logger = logging.getLogger('app')
class F6PluginModule:
"""
@@ -170,8 +176,38 @@ class F6PluginModule:
else:
print("'msg':'文件上传格式错误'")
return {'msg': '文件上传格式错误'}
elif action == 'delete_cars':
pass
elif action == 'disable_project':
df1 = pd.read_excel(save_path, sheet_name=0)
if "项目编码" in df1.columns[0]: # 校验表头名字
print('文件校验成功')
return {'msg': f'{save_path}', 'check': ''}
else:
print("'msg':'文件上传格式错误'")
return {'msg': '文件上传格式错误'}
elif action == 'batch_modify_materials':
df2 = pd.read_excel(save_path, sheet_name=0)
required_columns = {'原材料编码', '新材料编码', '品牌', '名称', '规格'}
actual_columns = set(df2.columns)
if required_columns.issubset(actual_columns):
print('文件校验成功')
return {'msg': f'{save_path}', 'check': ''}
else:
missing = required_columns - actual_columns
print(f"文件上传格式错误:缺少列 {missing}")
return {'msg': '文件上传格式错误'}
elif action == 'batch_modify_projects':
df3 = pd.read_excel(save_path, sheet_name=0)
required_columns = {'原项目编码', '新项目编码', '项目名称', '业务分类', '销项税率', '项目说明',
}
actual_columns = set(df3.columns)
if required_columns.issubset(actual_columns):
print('文件校验成功')
return {'msg': f'{save_path}', 'check': ''}
else:
missing = required_columns - actual_columns
print(f"文件上传格式错误:缺少列 {missing}")
return {'msg': '文件上传格式错误'}
else:
pass
@@ -409,6 +445,7 @@ class F6PluginModule:
Returns:
Dict[str, str]: 包含执行状态的字典,{'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
"""
try:
entry_data = api_instance.entry_data_get(data=data, replace=True)
print('执行 项目批量停用/启用')
username = entry_data['data']['账号']
@@ -419,30 +456,41 @@ class F6PluginModule:
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
logger.error(f"F6系统登录失败,用户名: {username}")
return {'msg': '登录失败'}
try:
df = pd.read_excel(save_path, sheet_name=0, dtype='string')
except Exception as e:
logger.error(f"读取Excel文件失败: {save_path}, 错误: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
logger.info("当前登录cookies:{}".format(cookies))
try:
thread = threading.Thread(target=batch_disable_projects,
args=(data, cookies, df, save_path, option))
thread.start()
except Exception as e:
logger.error(f"创建线程失败: {str(e)}, 堆栈: {traceback.format_exc()}")
print(f'创建线程失败: {str(e)}')
return {'msg': f'创建后台线程失败: {str(e)}'}
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
except KeyError as e:
logger.error(f"数据字段缺失: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'数据字段缺失: {str(e)}'}
except Exception as e:
logger.error(f"项目批量启停任务执行失败: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'执行失败: {str(e)}'}
@staticmethod
def disable_material(data: Dict[str, Any]) -> Dict[str, str]:
def modify_material(data: Dict[str, Any]) -> Dict[str, str]:
"""
材料批量启停
材料批量修改
从简道云获取材料批量启停请求,读取 Excel 文件,并在后台线程中批量启停材料。
从简道云获取材料批量修改请求,读取 Excel 文件,并在后台线程中批量修改材料。
立即返回"正在执行"的提示,实际创建在后台线程中执行。
Args:
@@ -451,38 +499,100 @@ class F6PluginModule:
Returns:
Dict[str, str]: 包含执行状态的字典,{'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
"""
try:
entry_data = api_instance.entry_data_get(data=data, replace=True)
print('执行 材料批量停用/启用')
print('执行 材料信息批量修改')
username = entry_data['data']['账号']
password = entry_data['data']['密码']
company_name = entry_data['data']['公司名称']
save_path = entry_data['data']['文件保存地址']
option = entry_data['data']['项目材料批量操作']
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
logger.error(f"F6系统登录失败,用户名: {username}")
return {'msg': '登录失败'}
try:
df = pd.read_excel(save_path, sheet_name=0, dtype='string')
except Exception as e:
logger.error(f"读取Excel文件失败: {save_path}, 错误: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
try:
thread = threading.Thread(target=batch_modify_materials,
args=(data, cookies, df, save_path, option))
args=(data, cookies, df, save_path))
thread.start()
except Exception as e:
logger.error(f"创建线程失败: {str(e)}, 堆栈: {traceback.format_exc()}")
print(f'创建线程失败: {str(e)}')
return {'msg': f'创建后台线程失败: {str(e)}'}
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
except KeyError as e:
logger.error(f"数据字段缺失: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'数据字段缺失: {str(e)}'}
except Exception as e:
logger.error(f"材料批量修改任务执行失败: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'执行失败: {str(e)}'}
@staticmethod
def modify_project(data: Dict[str, Any]) -> Dict[str, str]:
"""
项目批量修改
从简道云获取项目批量修改请求,读取 Excel 文件,并在后台线程中批量修改项目。
立即返回"正在执行"的提示,实际创建在后台线程中执行。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
Returns:
Dict[str, str]: 包含执行状态的字典,{'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
"""
try:
entry_data = api_instance.entry_data_get(data=data, replace=True)
print('执行 项目信息批量修改')
username = entry_data['data']['账号']
password = entry_data['data']['密码']
company_name = entry_data['data']['公司名称']
save_path = entry_data['data']['文件保存地址']
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
logger.error(f"F6系统登录失败,用户名: {username}")
return {'msg': '登录失败'}
try:
df = pd.read_excel(save_path, sheet_name=0, dtype='string')
except Exception as e:
logger.error(f"读取Excel文件失败: {save_path}, 错误: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
try:
thread = threading.Thread(target=batch_modify_projects,
args=(data, cookies, df, save_path))
thread.start()
except Exception as e:
logger.error(f"创建线程失败: {str(e)}, 堆栈: {traceback.format_exc()}")
print(f'创建线程失败: {str(e)}')
return {'msg': f'创建后台线程失败: {str(e)}'}
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
except KeyError as e:
logger.error(f"数据字段缺失: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'数据字段缺失: {str(e)}'}
except Exception as e:
logger.error(f"项目批量修改任务执行失败: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'执行失败: {str(e)}'}
@staticmethod
def bi_task(data: Dict[str, Any]) -> Dict[str, str]:
"""
BI任务
BI任务(示例)
从简道云获取BI任务请求,读取 Excel 文件(如果需要),并在后台线程中执行BI任务。
立即返回"正在执行"的提示,实际执行在后台线程中完成。
+2
View File
@@ -30,6 +30,7 @@ from app.tasks.customer_tasks import modify_customer_info_background
from app.tasks.bi_tasks import bi_task_background
from app.tasks.material_tasks import ( \
batch_modify_projects,
batch_modify_materials,
batch_disable_projects
)
@@ -51,4 +52,5 @@ __all__ = [
# 项目材料任务
'batch_disable_projects',
'batch_modify_materials',
'batch_modify_projects',
]
+13 -4
View File
@@ -137,7 +137,7 @@ def execute_failure_handler(data: Dict[str, Any]):
api_instance.data_batch_create(pay_load)
def get_operate_org_id(cookies: Dict[str, str]) -> Optional[str]:
def get_operate_org_id(cookies: Dict[str, str], data: Dict[str, str] = None) -> Optional[str]:
"""
获取操作门店ID
@@ -145,6 +145,7 @@ def get_operate_org_id(cookies: Dict[str, str]) -> Optional[str]:
Args:
cookies: 用户登录 F6 系统的 cookies 信息
data:数据id等
Returns:
Optional[str]: 门店ID,如果获取失败返回 None
@@ -156,13 +157,22 @@ def get_operate_org_id(cookies: Dict[str, str]) -> Optional[str]:
try:
org_res = requests.get(url=org_url, cookies=cookies)
logger.info(org_res.json())
org_data = org_res.json().get("data", {})
org_list = org_data.get("list", [])
if not org_list or len(org_list) == 0:
logger.error("未获取到门店信息")
return None
if data:
entry_data = api_instance.entry_data_get(data=data, replace=True)
org_name = entry_data.get("data", {}).get("门店名称")
operate_org_id = [
item["orgId"]
for item in org_list
if item.get("abbrName") == org_name
]
else:
operate_org_id = org_list[0].get("orgId")
if not operate_org_id:
logger.error("门店ID为空")
@@ -217,6 +227,7 @@ def get_card_list(
if extract_func is None:
def default_extract(card_item: Dict) -> Optional[str]:
return card_item.get("idCustomer")
extract_func = default_extract
# 分页获取所有会员卡数据
@@ -240,5 +251,3 @@ def get_card_list(
except Exception as e:
logger.error(f"获取会员卡列表时发生错误: {e}")
return card_list
+422 -55
View File
@@ -11,11 +11,9 @@ import logging
import traceback
import requests
import time
from typing import Dict, Any, List, Optional
from datetime import datetime
from typing import Dict, Any
from tqdm import tqdm
from app.tasks.common import update_jiandaoyun, approve_workflow, get_operate_org_id, get_card_list, \
execute_failure_handler
from app.tasks.common import update_jiandaoyun, approve_workflow, get_operate_org_id
import pandas as pd
import os
@@ -25,7 +23,7 @@ logger = logging.getLogger('app')
def batch_disable_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str,
option) -> None:
"""
项目批量停后台任务
项目批量停后台任务
在后台线程中批量停用项目,从 Excel 文件中读取项目编码。
执行完成后会更新简道云表单并自动提交工作流。
@@ -45,53 +43,105 @@ def batch_disable_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd
- 执行完成后会自动删除上传的文件
- 执行结果会更新到简道云表单
"""
logger.info(f"开始执行项目批量启停任务,操作类型: {option}, 文件路径: {save_path}, 数据行数: {len(df)}")
if option == "批量启用":
type_ = 0 # 1 停用,0启用
ob_type = 1
else:
type_ = 1
ob_type = 0
logger.info(f"操作类型设置完成: type_={type_}, ob_type={ob_type}")
df = df.where(pd.notnull(df), None)
# 获取门店id
org_id = get_operate_org_id(data)
logger.info("正在获取门店ID...")
try:
org_id = get_operate_org_id(cookies)
logger.info(f"门店ID获取成功: {org_id}")
except Exception as e:
logger.error(f"获取门店ID失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 获取项目信息
logger.info("开始获取项目列表...")
json_data = {
'param': '',
'name': '',
'customCode': '',
'currentPage': 1,
'pageSize': 100,
'isDel': -type_,
'isDel': ob_type,
'customInvoiceCategory': 0,
'idOwnOrg': org_id,
}
try:
response = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList',
cookies=cookies,
json=json_data,
)
time.sleep(1)
response.raise_for_status()
all_project_list = []
total_pages = response.json().get("data", {}).get("totalPages", "")
total_pages = response.json().get("data", {}).get("totalPages", 0)
logger.info(f"获取项目列表响应: {response.json()}")
try:
total_pages = int(total_pages)
except (ValueError, TypeError):
logger.error(f"无法解析总页数: {total_pages}, 类型: {type(total_pages)}")
total_pages = 0
logger.info(f"项目列表总页数: {total_pages}")
for page in tqdm(range(1, total_pages + 1)):
json_data['currentPage'] = str(page)
json_data['currentPage'] = page
try:
response = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList',
cookies=cookies,
json=json_data,
)
time.sleep(1)
response.raise_for_status()
project_list = response.json().get("data", {}).get("records", [])
all_project_list.extend(project_list)
logger.debug(f"{page}页获取到{len(project_list)}条项目")
except requests.exceptions.RequestException as e:
logger.error(f"获取第{page}页项目列表失败: {str(e)}, 响应状态码: {getattr(e.response, 'status_code', 'N/A')}")
raise
logger.info(f"项目列表获取完成,总计: {len(all_project_list)}条项目")
except requests.exceptions.RequestException as e:
logger.error(f"获取项目列表失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 遍历获取到的项目信息停用文件中的项目
code_list = df.iloc[:, 0].dropna().astype(str).tolist()
res_data_list = []
logger.info(f"Excel文件中待处理的项目编码数量: {len(code_list)}")
results = []
consecutive_failures = 0 # 连续失败计数器
MAX_CONSECUTIVE_FAILURES = 100 # 最大连续失败次数
success_count = 0
failure_count = 0
logger.info("开始处理项目启停操作...")
for item in tqdm(all_project_list):
custom_code = item.get("customCode")
if not custom_code or str(custom_code) not in code_list or not code_list:
if not code_list or not custom_code or str(custom_code) not in code_list:
continue
logger.debug(f"正在处理项目编码: {custom_code}")
info_id = item.get("infoId")
pk_id = item.get("pkId")
if not info_id or not pk_id:
logger.warning(f"项目编码 {custom_code} 缺少必要字段: infoId={info_id}, pkId={pk_id}")
results.append({'项目编码': custom_code, '状态': '缺少必要字段(infoId或pkId)'})
failure_count += 1
consecutive_failures += 1
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
json_data = {
"orgIdList": [
org_id,
@@ -103,28 +153,70 @@ def batch_disable_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd
"idOwnOrg": org_id
}
try:
logger.debug(f"发送启停请求,项目编码: {custom_code}, 操作类型: {type_}")
response = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/editAttributeByType',
cookies=cookies,
json=json_data,
)
res_data_list.append(response.json())
time.sleep(2)
response.raise_for_status() # 抛出HTTP错误
results.append({'材料编码': custom_code, '状态': '停用/启用成功'})
# 检查业务响应码
resp_data = response.json()
if resp_data.get("code") == 200:
results.append({'项目编码': custom_code, '状态': '停用/启用成功'})
success_count += 1
consecutive_failures = 0 # 成功时重置计数器
logger.info(f"项目编码 {custom_code} 启停操作成功")
else:
msg = resp_data.get("message", "未知错误")
results.append({'项目编码': custom_code, '状态': f'停用/启用失败: {msg}'})
failure_count += 1
consecutive_failures += 1
logger.error(f"项目编码 {custom_code} 启停操作失败: {msg}, 响应数据: {resp_data}")
except requests.exceptions.RequestException as e:
results.append({'材料编码': custom_code, '状态': f'停用/启用失败: {str(e)}'})
pass
error_msg = str(e)
results.append({'项目编码': custom_code, '状态': f'停用/启用失败: {error_msg}'})
failure_count += 1
consecutive_failures += 1
logger.error(f"项目编码 {custom_code} 启停操作请求异常: {error_msg}, 堆栈信息: {traceback.format_exc()}")
# 检查连续失败次数
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
logger.info(f"项目启停处理完成,成功: {success_count}条, 失败: {failure_count}条, 总计: {len(results)}")
print({'msg': '已执行', 'msg_details': f'{results}'})
logger.info(f"停用/启用结果: {results}")
os.remove(save_path)
print(f'{save_path}已删除')
# 调用api回写改掉 执行明细与执行状态
msg = update_jiandaoyun(data, f'{results}')
# 删除文件
logger.info(f"准备删除文件: {save_path}")
try:
os.remove(save_path)
logger.info(f"文件删除成功: {save_path}")
print(f'{save_path}已删除')
except Exception as e:
logger.error(f"删除文件失败: {save_path}, 错误信息: {str(e)}, 堆栈信息: {traceback.format_exc()}")
# 调用api回写改掉 执行明细与执行状态
logger.info("开始回写简道云表单...")
try:
msg = update_jiandaoyun(data, f'{results}')
logger.info(f"简道云表单回写结果: {msg}")
if msg.get('msg'):
logger.info("开始自动提交工作流...")
approve_workflow(data)
logger.info("工作流提交成功")
print('表单已自动提交至下一步')
else:
logger.warning(f"简道云表单回写失败: {msg}")
except Exception as e:
logger.error(f"回写简道云表单失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
logger.info(f"项目批量启停任务执行完成,操作类型: {option}")
def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str) -> None:
@@ -147,6 +239,7 @@ def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd
- 执行完成后会自动删除上传的文件
- 执行结果会更新到简道云表单
"""
logger.info(f"开始执行材料批量修改任务,文件: {save_path},行数: {len(df)}")
def safe_str(val):
"""将值转为字符串,NaN 返回空字符串"""
@@ -159,7 +252,13 @@ def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd
return not (pd.isna(new_val) or (isinstance(new_val, str) and new_val.strip() == ""))
# 获取门店id
org_id = get_operate_org_id(data)
logger.info("正在获取门店ID...")
try:
org_id = get_operate_org_id(cookies)
logger.info(f"门店ID获取成功: {org_id}")
except Exception as e:
logger.error(f"获取门店ID失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 第一步:获取所有材料列表(分页)
json_data = {
@@ -182,6 +281,7 @@ def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd
'getThirdPlatformCode': 0,
}
try:
response = requests.post(
'https://ids-goods.f6car.com/f6-ids-goods/part/getExactPartStockInfo',
cookies=cookies,
@@ -189,24 +289,37 @@ def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd
)
response.raise_for_status()
total_pages = response.json().get("data", {}).get("totalPages", 0)
logger.info(f"材料列表页数: {total_pages}")
all_materials_list = []
total_pages = int(total_pages)
for page in tqdm(range(1, total_pages + 1), desc="获取材料列表"):
json_data['currentPage'] = page
try:
resp = requests.post(
'https://ids-goods.f6car.com/f6-ids-goods/part/getExactPartStockInfo',
cookies=cookies,
json=json_data,
)
time.sleep(1)
resp.raise_for_status()
records = resp.json().get("data", {}).get("records", [])
all_materials_list.extend(records)
except requests.exceptions.RequestException as e:
logger.error(f"获取第{page}页材料列表失败: {str(e)}, 响应状态码: {getattr(e.response, 'status_code', 'N/A')}")
raise
logger.info(f"材料列表获取完成,总计: {len(all_materials_list)}")
except requests.exceptions.RequestException as e:
logger.error(f"获取材料列表失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 第二步:构建 update_map(只存原始值,不预处理)
update_map = {}
skipped_count = 0
for _, row in df.iterrows():
orig_code = row.iloc[0] # 原材料编码
if pd.isna(orig_code) or str(orig_code).strip() == "":
skipped_count += 1
continue
orig_code = str(orig_code).strip()
update_map[orig_code] = {
@@ -215,9 +328,16 @@ def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd
"new_name": row.iloc[3],
"new_spec": row.iloc[4],
}
logger.info(f"待更新材料数: {len(update_map)},跳过空编码行: {skipped_count}")
# 第三步:遍历材料,按需更新
results = []
consecutive_failures = 0 # 连续失败计数器
MAX_CONSECUTIVE_FAILURES = 100 # 最大连续失败次数
success_count = 0
failure_count = 0
skip_count = 0
for item in tqdm(all_materials_list, desc="处理材料更新"):
custom_code = item.get("customCode")
if not custom_code or str(custom_code) not in update_map:
@@ -225,7 +345,15 @@ def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd
part_id = item.get("partId")
if not part_id:
results.append({'材料编码': custom_code, '状态': '缺少 partId'})
error_msg = '缺少 partId'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.warning(f"材料编码 {custom_code} 跳过/失败: {error_msg}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'材料编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
try:
@@ -240,13 +368,30 @@ def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd
params=params,
cookies=cookies,
)
time.sleep(1)
if materials_response.status_code != 200:
results.append({'材料编码': custom_code, '状态': f'获取明细失败: {materials_response.status_code}'})
error_msg = f'获取明细失败: {materials_response.status_code}'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"材料编码 {custom_code} {error_msg}, 响应内容: {materials_response.text[:200]}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'材料编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
detail = materials_response.json().get("data")
if not detail:
results.append({'材料编码': custom_code, '状态': '明细为空'})
error_msg = '明细为空'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"材料编码 {custom_code} {error_msg}, 响应JSON: {materials_response.json()}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'材料编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
updates = update_map[str(custom_code)]
@@ -277,33 +422,84 @@ def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd
cookies=cookies,
json=detail
)
time.sleep(2)
if update_resp.status_code == 200 and update_resp.json().get("code") == 200:
results.append({'材料编码': custom_code, '状态': '修改成功'})
success_count += 1
consecutive_failures = 0 # 成功时重置计数器
else:
msg = update_resp.json().get("message", "未知错误")
results.append({'材料编码': custom_code, '状态': f'修改失败: {msg}'})
error_msg = f'修改失败: {msg}'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"材料编码 {custom_code} {error_msg}, 响应数据: {update_resp.json()}")
except requests.exceptions.RequestException as e:
results.append({'材料编码': custom_code, '状态': f'请求异常: {str(e)}'})
error_msg = f'请求异常: {str(e)}'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"材料编码 {custom_code} {error_msg}, 堆栈信息: {traceback.format_exc()}")
except Exception as e:
results.append({'材料编码': custom_code, '状态': f'内部错误: {str(e)}'})
error_msg = f'内部错误: {str(e)}'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"材料编码 {custom_code} {error_msg}, 堆栈信息: {traceback.format_exc()}")
# 检查连续失败次数
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'材料编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
# 统计汇总(总行数 / 待处理 / 成功 / 失败 / 跳过)
total_rows = len(df)
to_process = len(update_map)
# results 里可能包含 “缺少 partId” 等失败信息;这里跳过数按空编码行计数即可
skip_count = skipped_count
summary = {
"总行数": total_rows,
"待处理": to_process,
"成功": success_count,
"失败": failure_count,
"跳过": skip_count,
}
results.insert(0, {"汇总": summary})
logger.info(f"材料修改汇总: {summary}")
# 第四步:清理与回写
print({'msg': '已执行', 'msg_details': results})
logger.info(f"材料批量修改结果: {results}")
# 结果回写包含汇总 + 明细
logger.info("材料批量修改完成,开始回写结果")
# 删除文件
logger.info(f"准备删除文件: {save_path}")
try:
os.remove(save_path)
logger.info(f"文件删除成功: {save_path}")
print(f'{save_path} 已删除')
except Exception as e:
logger.error(f"删除文件失败: {e}")
logger.error(f"删除文件失败: {save_path}, 错误信息: {str(e)}, 堆栈信息: {traceback.format_exc()}")
# 回写简道云
logger.info("开始回写简道云表单...")
try:
msg = update_jiandaoyun(data, str(results))
logger.info(f"简道云表单回写结果: {msg}")
if msg.get('msg'):
logger.info("开始自动提交工作流...")
approve_workflow(data)
logger.info("工作流提交成功")
print('表单已自动提交至下一步')
else:
logger.warning(f"简道云表单回写失败: {msg}")
except Exception as e:
logger.error(f"回写简道云表单失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
logger.info("材料批量修改任务执行完成")
def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str) -> None:
@@ -317,16 +513,18 @@ def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
cookies: 用户登录 F6 系统的 cookies 信息
df: Excel 文件读取的内容,DataFrame 格式,列顺序为:
[0:原项目编码, 1:新项目编码, 2:新项目名称, 3:业务分类,
4:销项税率, 5:项目说明, 6:车辆分类, 7:工时单价, 8:工时]
[0:原项目编码, 1:新项目编码, 2:新项目名称, 3:业务分类, 4:销项税率, 5:项目说明]
save_path: Excel 文件保存的地址,执行完成后会删除此文件
注意:
- 无效的项目编码(None、空字符串)会被跳过
- Excel 中某字段为空(NaN 或空字符串)时,保留原始项目中的对应字段
- 如果新项目名称与系统已有项目名称重复,则跳过处理
- 如果Excel中多个行的新项目名称重复,只执行第一条数据,后续重复的会被跳过
- 执行完成后会自动删除上传的文件
- 执行结果会更新到简道云表单
"""
logger.info(f"开始执行项目批量修改任务,文件: {save_path},行数: {len(df)}")
def safe_str(val):
"""将值转为字符串,NaN 返回空字符串"""
@@ -347,20 +545,43 @@ def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.
"""判断是否应该用 new_val 更新:非 NaN 且非空字符串"""
return not (pd.isna(new_val) or (isinstance(new_val, str) and new_val.strip() == ""))
def safe_iloc(row, idx, default=None):
"""安全按索引取行值,列数不足时返回 default,避免 IndexError"""
try:
if idx < len(row):
return row.iloc[idx]
except (IndexError, KeyError):
pass
return default
# 获取门店id
org_id = get_operate_org_id(data)
logger.info("获取门店ID...")
try:
org_id = get_operate_org_id(cookies)
logger.info(f"门店ID获取成功: {org_id}")
except Exception as e:
logger.error(f"获取门店ID失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 获取服务分类(用于映射业务分类名称 → pkId)
logger.info("获取服务分类列表...")
try:
init_add_resp = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/initAdd',
cookies=cookies,
data={'idOwnOrg': org_id}
)
time.sleep(1)
init_add_resp.raise_for_status()
service_category_list = init_add_resp.json().get("data", {}).get("serviceCategory", [])
category_name_to_pk = {item["name"]: item["pkId"] for item in service_category_list}
logger.info(f"服务分类列表获取成功,共{len(category_name_to_pk)}个分类")
except requests.exceptions.RequestException as e:
logger.error(f"获取服务分类列表失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 第一步:获取所有项目列表(分页)
logger.info("获取项目列表...")
json_data = {
'param': '',
'name': '',
@@ -372,6 +593,7 @@ def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.
'idOwnOrg': org_id,
}
try:
response = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList',
cookies=cookies,
@@ -379,57 +601,116 @@ def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.
)
response.raise_for_status()
total_pages = response.json().get("data", {}).get("totalPages", 0)
logger.info(f"项目列表总页数: {total_pages}")
all_project_list = []
total_pages = int(total_pages)
for page in tqdm(range(1, total_pages + 1), desc="获取项目列表"):
json_data['currentPage'] = page
try:
resp = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList',
cookies=cookies,
json=json_data,
)
time.sleep(1)
resp.raise_for_status()
records = resp.json().get("data", {}).get("records", [])
all_project_list.extend(records)
except requests.exceptions.RequestException as e:
logger.error(f"获取第{page}页项目列表失败: {str(e)}, 响应状态码: {getattr(e.response, 'status_code', 'N/A')}")
raise
logger.info(f"项目列表获取完成,总计: {len(all_project_list)}条项目")
except requests.exceptions.RequestException as e:
logger.error(f"获取项目列表失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 构建系统已有项目名称集合(用于检查重复)
existing_project_names = set()
for project in all_project_list:
project_name = project.get("name")
if project_name:
existing_project_names.add(str(project_name).strip())
logger.info(f"系统已有项目名称数: {len(existing_project_names)}")
# 第二步:构建 update_map
logger.info("构建更新映射表...")
update_map = {}
for _, row in df.iterrows():
orig_code = row.iloc[0]
skipped_count = 0
category_not_found_count = 0
duplicate_name_in_df_count = 0 # DataFrame中重复的新项目名称数量
duplicate_name_in_system_count = 0 # 与系统已有项目名称重复的数量
seen_new_names = {} # 用于跟踪DataFrame中已出现的新项目名称,key为新名称,value为第一次出现的原项目编码
results = [] # 提前创建results,用于记录跳过的信息
for idx, row in df.iterrows():
orig_code = safe_iloc(row, 0)
if pd.isna(orig_code) or str(orig_code).strip() == "":
skipped_count += 1
results.append({'项目编码': '', '状态': '跳过: 项目编码为空'})
continue
orig_code = str(orig_code).strip()
# 计算工时费 = 单价 * 工时
price = safe_float(row.iloc[7])
work_hour = safe_float(row.iloc[8])
amount = None
if price is not None and work_hour is not None:
amount = round(price * work_hour, 2)
# 检查新项目名称(第2列,索引为2
new_name = safe_iloc(row, 2)
if should_update(new_name):
new_name_clean = safe_str(new_name)
# 检查DataFrame中是否有重复的新项目名称
if new_name_clean in seen_new_names:
duplicate_name_in_df_count += 1
logger.warning(f"DataFrame中项目名称重复,跳过: 原项目编码={orig_code}, 新项目名称={new_name_clean}, 首次出现在原项目编码={seen_new_names[new_name_clean]}")
results.append({'项目编码': orig_code, '状态': f'跳过: DataFrame中项目名称重复(首次出现在原项目编码={seen_new_names[new_name_clean]})'})
continue
# 检查新项目名称是否与系统已有项目名称重复
if new_name_clean in existing_project_names:
duplicate_name_in_system_count += 1
logger.warning(f"新项目名称与系统已有项目名称重复,跳过: 原项目编码={orig_code}, 新项目名称={new_name_clean}")
results.append({'项目编码': orig_code, '状态': f'跳过: 新项目名称与系统已有项目名称重复({new_name_clean})'})
continue
# 记录这个新名称
seen_new_names[new_name_clean] = orig_code
# 业务分类映射
category_name = row.iloc[3]
category_name = safe_iloc(row, 3)
category_pk = None
category_not_found = False
if should_update(category_name):
cat_name_clean = safe_str(category_name)
category_pk = category_name_to_pk.get(cat_name_clean)
if category_pk is None:
logger.warning(f"业务分类 '{cat_name_clean}' 未在系统中找到")
logger.warning(f"业务分类 '{cat_name_clean}' 未在系统中找到,项目编码: {orig_code}")
category_not_found = True
category_not_found_count += 1
# 列 6/7/8(车辆分类、工时单价、工时)不再传入,固定为 None,不更新这些字段
car_category_name = None
price = None
work_hour = None
amount = None
update_map[orig_code] = {
"new_customCode": row.iloc[1],
"new_name": row.iloc[2],
"new_customCode": safe_iloc(row, 1), # 新项目编码
"new_name": safe_iloc(row, 2), # 新项目名称
"new_serviceCategoryId": category_pk,
"new_taxRate": row.iloc[4],
"new_memo": row.iloc[5],
"new_carCategoryName": row.iloc[6],
"new_taxRate": safe_iloc(row, 4),
"new_memo": safe_iloc(row, 5),
"new_carCategoryName": car_category_name,
"new_price": price,
"new_workHour": work_hour,
"new_amount": amount,
}
logger.info(f"映射表完成: 待处理={len(update_map)}, 跳过空编码={skipped_count}, 分类未找到={category_not_found_count}, DF重复名={duplicate_name_in_df_count}, 系统重名={duplicate_name_in_system_count}")
# 第三步:遍历项目,按需更新
results = []
logger.info("开始处理项目更新...")
consecutive_failures = 0 # 连续失败计数器
MAX_CONSECUTIVE_FAILURES = 100 # 最大连续失败次数
success_count = 0
failure_count = 0
for item in tqdm(all_project_list, desc="处理项目更新"):
custom_code = item.get("customCode")
if not custom_code or str(custom_code) not in update_map:
@@ -437,7 +718,15 @@ def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.
service_id = item.get("pkId") # 项目主键
if not service_id:
results.append({'项目编码': custom_code, '状态': '缺少 pkId'})
error_msg = '缺少 pkId'
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.warning(f"项目编码 {custom_code} {error_msg}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
try:
@@ -451,13 +740,30 @@ def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.
params=params,
cookies=cookies,
)
time.sleep(1)
if detail_resp.status_code != 200:
results.append({'项目编码': custom_code, '状态': f'获取明细失败: {detail_resp.status_code}'})
error_msg = f'获取明细失败: {detail_resp.status_code}'
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"项目编码 {custom_code} {error_msg}, 响应内容: {detail_resp.text[:200]}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
detail = detail_resp.json().get("data")
if not detail:
results.append({'项目编码': custom_code, '状态': '明细为空'})
error_msg = '明细为空'
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"项目编码 {custom_code} {error_msg}, 响应JSON: {detail_resp.json()}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
updates = update_map[str(custom_code)]
@@ -467,6 +773,18 @@ def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.
detail["customCode"] = safe_str(updates["new_customCode"])
if should_update(updates["new_name"]):
name_val = safe_str(updates["new_name"])
# 再次检查新项目名称是否与系统已有项目名称重复(防止在构建update_map后系统状态发生变化)
if name_val in existing_project_names:
error_msg = f'跳过: 新项目名称与系统已有项目名称重复({name_val})'
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.warning(f"项目编码 {custom_code} {error_msg}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
detail["name"] = name_val
if "showName" in detail:
detail["showName"] = name_val
@@ -496,30 +814,79 @@ def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.
cookies=cookies,
json=detail
)
time.sleep(2)
if update_resp.status_code == 200 and update_resp.json().get("code") == 200:
results.append({'项目编码': custom_code, '状态': '修改成功'})
success_count += 1
consecutive_failures = 0 # 成功时重置计数器
else:
msg = update_resp.json().get("message", "未知错误")
results.append({'项目编码': custom_code, '状态': f'修改失败: {msg}'})
error_msg = f'修改失败: {msg}'
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"项目编码 {custom_code} {error_msg}, 响应数据: {update_resp.json()}")
except requests.exceptions.RequestException as e:
error_msg = f"请求异常: {str(e)}"
logger.error(f"项目编码 {custom_code} {error_msg}, 堆栈信息: {traceback.format_exc()}")
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
except Exception as e:
error_msg = f"异常: {str(e)}"
logger.error(f"项目 {custom_code} 更新出错: {traceback.format_exc()}")
logger.error(f"项目编码 {custom_code} 更新出错: {traceback.format_exc()}")
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
# 检查连续失败次数
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
# 汇总插入到结果顶部,便于简道云直接看到统计
total_rows = len(df)
to_process = len(update_map)
skip_count = skipped_count + duplicate_name_in_df_count + duplicate_name_in_system_count
summary = {
"总行数": total_rows,
"待处理": to_process,
"成功": success_count,
"失败": failure_count,
"跳过": skip_count,
}
results.insert(0, {"汇总": summary})
logger.info(f"项目修改汇总: {summary}")
# 第四步:清理与回写
print({'msg': '已执行', 'msg_details': results})
logger.info(f"项目批量修改结果: {results}")
logger.info("项目批量修改完成,开始回写结果")
# 删除文件
logger.info(f"准备删除文件: {save_path}")
try:
os.remove(save_path)
logger.info(f"文件删除成功: {save_path}")
print(f'{save_path} 已删除')
except Exception as e:
logger.error(f"删除文件失败: {e}")
logger.error(f"删除文件失败: {save_path}, 错误信息: {str(e)}, 堆栈信息: {traceback.format_exc()}")
# 回写简道云
logger.info("开始回写简道云表单...")
try:
msg = update_jiandaoyun(data, str(results))
logger.info(f"简道云表单回写结果: {msg}")
if msg.get('msg'):
logger.info("开始自动提交工作流...")
approve_workflow(data)
logger.info("工作流提交成功")
print('表单已自动提交至下一步')
else:
logger.warning(f"简道云表单回写失败: {msg}")
except Exception as e:
logger.error(f"回写简道云表单失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
logger.info("项目批量修改任务执行完成")
+8 -5
View File
@@ -74,6 +74,12 @@ async def lifespan(app: FastAPI):
description='短信签名状态')
core_manager.register_action('modify_customer_info', f6_plugin_module.modify_customer_info, 'f6_plugin_module',
description='修改客户信息')
core_manager.register_action('disable_project', f6_plugin_module.disable_projects, 'f6_plugin_module',
description='项目信息批量启停')
core_manager.register_action('batch_modify_materials', f6_plugin_module.modify_material, 'f6_plugin_module',
description='材料信息批量修改')
core_manager.register_action('batch_modify_projects', f6_plugin_module.modify_project, 'f6_plugin_module',
description='项目信息批量修改')
core_manager.register_action('bi_task', f6_plugin_module.bi_task, 'f6_plugin_module',
description='BI任务')
@@ -86,7 +92,6 @@ async def lifespan(app: FastAPI):
app.state.app_tools.scheduler.shutdown(wait=False)
app.state.logger.info("应用关闭")
app = FastAPI(
title="简道云FastAPI服务",
description="简道云插件后端服务,提供数据同步和处理功能",
@@ -202,8 +207,6 @@ async def general_exception_handler(request: Request, exc: Exception):
error_code="INTERNAL_ERROR"
).model_dump(),
)
# 路由已移动到 app/api/routes.py
@@ -214,8 +217,8 @@ if __name__ == '__main__':
当直接运行此文件时,启动 uvicorn 服务器。
默认配置:
- 主机: 0.0.0.0 (监听所有网络接口)
- 端口: 5003
- 端口: 5000
- 热重载: 关闭 (生产环境建议关闭)
"""
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=5003, reload=False)
uvicorn.run(app, host="0.0.0.0", port=5000, reload=False)
+1
View File
@@ -9,3 +9,4 @@ pytesseract==0.3.13
Requests==2.32.5
tqdm==4.67.1
uvicorn==0.40.0
openpyxl
+54
View File
@@ -0,0 +1,54 @@
@echo off
title 简道云监听服务 端口5000
:: 切换到应用所在目录(与原始脚本一致)
cd /d "C:\Users\Administrator\Desktop\简道云\简道云"
:: 激活Anaconda基础环境(或你指定的环境)
call C:\ProgramData\anaconda3\Scripts\activate.bat
if %errorlevel% neq 0 (
echo 激活Anaconda环境失败,请检查环境配置。
pause
exit /b 1
)
:: 验证Python是否可用
python --version >nul 2>&1
if %errorlevel% neq 0 (
echo 无法找到Python,请检查Anaconda环境是否正确安装。
pause
exit /b 1
)
:: 验证uvicorn是否已安装(可选但推荐)
python -c "import uvicorn" >nul 2>&1
if %errorlevel% neq 0 (
echo 错误:未检测到 uvicorn。请运行以下命令安装:
echo conda install -c conda-forge uvicorn fastapi
echo
echo pip install "uvicorn[standard]" fastapi
pause
exit /b 1
)
:: 设置默认参数(允许通过环境变量覆盖)
if "%HOST%"=="" set HOST=0.0.0.0
if "%PORT%"=="" set PORT=5000
if "%WORKERS%"=="" set WORKERS=1
if "%LOG_LEVEL%"=="" set LOG_LEVEL=info
set APP_MODULE=main:app
echo.
echo 启动简道云 FastAPI 服务...
echo 模块: %APP_MODULE%
echo 地址: http://%HOST%:%PORT%
echo 进程数: %WORKERS% Windows建议设为1
echo 日志级别: %LOG_LEVEL%
echo.
:: 使用 python -m uvicorn 确保调用当前环境中的 uvicorn
python -m uvicorn %APP_MODULE% --host %HOST% --port %PORT% --workers %WORKERS% --log-level %LOG_LEVEL%
:: 服务退出后暂停,便于查看日志
pause
Binary file not shown.
Binary file not shown.