V2.1客户信息修改、项目批量停用、项目批量修改、材料批量修改功能上线

This commit is contained in:
2026-01-28 15:26:48 +08:00
parent 98944ecbdc
commit 838453b88f
6 changed files with 674 additions and 228 deletions
+144 -77
View File
@@ -16,6 +16,8 @@ F6 后台执行模块
- pandas: Excel 文件处理
- threading: 后台任务处理
"""
import logging
import traceback
import requests
from urllib.parse import quote
import pandas as pd
@@ -37,6 +39,7 @@ from app.tasks.delete_tasks import (
from app.tasks.material_tasks import (
batch_disable_projects,
batch_modify_materials,
batch_modify_projects
)
from app.tasks.customer_tasks import modify_customer_info_background
from app.tasks.bi_tasks import bi_task_background
@@ -44,6 +47,8 @@ from app.tasks.bi_tasks import bi_task_background
# 简道云 API 实例,用于调用简道云 API
api_instance = API()
logger = logging.getLogger('app')
class F6PluginModule:
"""
@@ -171,8 +176,38 @@ class F6PluginModule:
else:
print("'msg':'文件上传格式错误'")
return {'msg': '文件上传格式错误'}
elif action == 'delete_cars':
pass
elif action == 'disable_project':
df1 = pd.read_excel(save_path, sheet_name=0)
if "项目编码" in df1.columns[0]: # 校验表头名字
print('文件校验成功')
return {'msg': f'{save_path}', 'check': ''}
else:
print("'msg':'文件上传格式错误'")
return {'msg': '文件上传格式错误'}
elif action == 'batch_modify_materials':
df2 = pd.read_excel(save_path, sheet_name=0)
required_columns = {'原材料编码', '新材料编码', '品牌', '名称', '规格'}
actual_columns = set(df2.columns)
if required_columns.issubset(actual_columns):
print('文件校验成功')
return {'msg': f'{save_path}', 'check': ''}
else:
missing = required_columns - actual_columns
print(f"文件上传格式错误:缺少列 {missing}")
return {'msg': '文件上传格式错误'}
elif action == 'batch_modify_projects':
df3 = pd.read_excel(save_path, sheet_name=0)
required_columns = {'原项目编码', '新项目编码', '项目名称', '业务分类', '销项税率', '项目说明',
}
actual_columns = set(df3.columns)
if required_columns.issubset(actual_columns):
print('文件校验成功')
return {'msg': f'{save_path}', 'check': ''}
else:
missing = required_columns - actual_columns
print(f"文件上传格式错误:缺少列 {missing}")
return {'msg': '文件上传格式错误'}
else:
pass
@@ -410,33 +445,45 @@ class F6PluginModule:
Returns:
Dict[str, str]: 包含执行状态的字典,{'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
"""
entry_data = api_instance.entry_data_get(data=data, replace=True)
print('执行 项目批量停用/启用')
username = entry_data['data']['账号']
password = entry_data['data']['密码']
company_name = entry_data['data']['公司名称']
save_path = entry_data['data']['文件保存地址']
option = entry_data['data']['项目材料批量操作']
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
return {'msg': '登录失败'}
try:
df = pd.read_excel(save_path, sheet_name=0, dtype='string')
entry_data = api_instance.entry_data_get(data=data, replace=True)
print('执行 项目批量停用/启用')
username = entry_data['data']['账号']
password = entry_data['data']['密码']
company_name = entry_data['data']['公司名称']
save_path = entry_data['data']['文件保存地址']
option = entry_data['data']['项目材料批量操作']
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
logger.error(f"F6系统登录失败,用户名: {username}")
return {'msg': '登录失败'}
try:
df = pd.read_excel(save_path, sheet_name=0, dtype='string')
except Exception as e:
logger.error(f"读取Excel文件失败: {save_path}, 错误: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
logger.info("当前登录cookies:{}".format(cookies))
try:
thread = threading.Thread(target=batch_disable_projects,
args=(data, cookies, df, save_path, option))
thread.start()
except Exception as e:
logger.error(f"创建线程失败: {str(e)}, 堆栈: {traceback.format_exc()}")
print(f'创建线程失败: {str(e)}')
return {'msg': f'创建后台线程失败: {str(e)}'}
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
except KeyError as e:
logger.error(f"数据字段缺失: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'数据字段缺失: {str(e)}'}
except Exception as e:
return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
try:
thread = threading.Thread(target=batch_disable_projects,
args=(data, cookies, df, save_path,option))
thread.start()
except Exception as e:
print(f'创建线程失败: {str(e)}')
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
logger.error(f"项目批量启停任务执行失败: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'执行失败: {str(e)}'}
@staticmethod
def modify_material(data: Dict[str, Any]) -> Dict[str, str]:
@@ -452,33 +499,43 @@ class F6PluginModule:
Returns:
Dict[str, str]: 包含执行状态的字典,{'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
"""
entry_data = api_instance.entry_data_get(data=data, replace=True)
print('执行 材料信息批量修改')
username = entry_data['data']['账号']
password = entry_data['data']['密码']
company_name = entry_data['data']['公司名称']
save_path = entry_data['data']['文件保存地址']
option = entry_data['data']['材料信息批量修改']
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
return {'msg': '登录失败'}
try:
df = pd.read_excel(save_path, sheet_name=0, dtype='string')
entry_data = api_instance.entry_data_get(data=data, replace=True)
print('执行 材料信息批量修改')
username = entry_data['data']['账号']
password = entry_data['data']['密码']
company_name = entry_data['data']['公司名称']
save_path = entry_data['data']['文件保存地址']
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
logger.error(f"F6系统登录失败,用户名: {username}")
return {'msg': '登录失败'}
try:
df = pd.read_excel(save_path, sheet_name=0, dtype='string')
except Exception as e:
logger.error(f"读取Excel文件失败: {save_path}, 错误: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
try:
thread = threading.Thread(target=batch_modify_materials,
args=(data, cookies, df, save_path))
thread.start()
except Exception as e:
logger.error(f"创建线程失败: {str(e)}, 堆栈: {traceback.format_exc()}")
print(f'创建线程失败: {str(e)}')
return {'msg': f'创建后台线程失败: {str(e)}'}
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
except KeyError as e:
logger.error(f"数据字段缺失: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'数据字段缺失: {str(e)}'}
except Exception as e:
return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
try:
thread = threading.Thread(target=batch_modify_materials,
args=(data, cookies, df, save_path, option))
thread.start()
except Exception as e:
print(f'创建线程失败: {str(e)}')
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
logger.error(f"材料批量修改任务执行失败: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'执行失败: {str(e)}'}
@staticmethod
def modify_project(data: Dict[str, Any]) -> Dict[str, str]:
@@ -494,33 +551,43 @@ class F6PluginModule:
Returns:
Dict[str, str]: 包含执行状态的字典,{'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
"""
entry_data = api_instance.entry_data_get(data=data, replace=True)
print('执行 项目信息批量修改')
username = entry_data['data']['账号']
password = entry_data['data']['密码']
company_name = entry_data['data']['公司名称']
save_path = entry_data['data']['文件保存地址']
option = entry_data['data']['项目信息批量修改']
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
return {'msg': '登录失败'}
try:
df = pd.read_excel(save_path, sheet_name=0, dtype='string')
entry_data = api_instance.entry_data_get(data=data, replace=True)
print('执行 项目信息批量修改')
username = entry_data['data']['账号']
password = entry_data['data']['密码']
company_name = entry_data['data']['公司名称']
save_path = entry_data['data']['文件保存地址']
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
logger.error(f"F6系统登录失败,用户名: {username}")
return {'msg': '登录失败'}
try:
df = pd.read_excel(save_path, sheet_name=0, dtype='string')
except Exception as e:
logger.error(f"读取Excel文件失败: {save_path}, 错误: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
try:
thread = threading.Thread(target=batch_modify_projects,
args=(data, cookies, df, save_path))
thread.start()
except Exception as e:
logger.error(f"创建线程失败: {str(e)}, 堆栈: {traceback.format_exc()}")
print(f'创建线程失败: {str(e)}')
return {'msg': f'创建后台线程失败: {str(e)}'}
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
except KeyError as e:
logger.error(f"数据字段缺失: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'数据字段缺失: {str(e)}'}
except Exception as e:
return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
try:
thread = threading.Thread(target=batch_modify_projects,
args=(data, cookies, df, save_path, option))
thread.start()
except Exception as e:
print(f'创建线程失败: {str(e)}')
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
logger.error(f"项目批量修改任务执行失败: {str(e)}, 堆栈: {traceback.format_exc()}")
return {'msg': f'执行失败: {str(e)}'}
@staticmethod
def bi_task(data: Dict[str, Any]) -> Dict[str, str]:
+44 -35
View File
@@ -73,37 +73,37 @@ def approve_workflow(data: Dict[str, Any]):
"""
# 获取简道云当前流程列表
json = api_instance.workflow_instance_get(data)
# 检查返回数据是否有效
if not json:
logger.error("未获取到工作流实例信息")
return
# 安全地获取任务列表
tasks = json.get('tasks', [])
if not tasks:
logger.error("未找到待处理任务")
return
# 将JSON字符串转换为Python字典
username = ''
instance_id = ''
task_id = ''
for task in tasks:
if task.get('status') == 0:
assignee = task.get('assignee', {})
username = assignee.get('username', '')
instance_id = task.get('instance_id', '')
task_id = task.get('task_id', '')
if username and instance_id and task_id:
break
if not username or not instance_id or not task_id:
logger.error("未找到有效的待处理任务信息")
return
task_data = {
"username": username,
"instance_id": instance_id,
@@ -125,10 +125,10 @@ def execute_failure_handler(data: Dict[str, Any]):
"""
now = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
pay_load = {
"api_key":"6694d3c4fcb69ca9a111a6c4",
"entry_id":"6938e011b360a1132522a62a",
"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "6938e011b360a1132522a62a",
"data": {
"_widget_1765335060501": {"value": now}, # 失败时间
"_widget_1765335060501": {"value": now}, # 失败时间
"_widget_1765335060502": {"value": data['failure_name']}, # 任务名称
"_widget_1765335060503": {"value": data['failure_details']} # 失败明细
}
@@ -137,37 +137,47 @@ def execute_failure_handler(data: Dict[str, Any]):
api_instance.data_batch_create(pay_load)
def get_operate_org_id(cookies: Dict[str, str]) -> Optional[str]:
def get_operate_org_id(cookies: Dict[str, str], data: Dict[str, str] = None) -> Optional[str]:
"""
获取操作门店ID
从F6系统获取第一个门店的组织ID,用于后续操作。
Args:
cookies: 用户登录 F6 系统的 cookies 信息
data:数据id等
Returns:
Optional[str]: 门店ID,如果获取失败返回 None
注意:
如果未获取到门店信息或门店ID为空,会记录错误日志并返回 None
"""
org_url = "https://yunxiu.f6car.cn/hive/org/getPageOrgGroupMembers?currentPage=1&pageSize=100&name="
try:
org_res = requests.get(url=org_url, cookies=cookies)
logger.info(org_res.json())
org_data = org_res.json().get("data", {})
org_list = org_data.get("list", [])
if not org_list or len(org_list) == 0:
logger.error("未获取到门店信息")
return None
operate_org_id = org_list[0].get("orgId")
if data:
entry_data = api_instance.entry_data_get(data=data, replace=True)
org_name = entry_data.get("data", {}).get("门店名称")
operate_org_id = [
item["orgId"]
for item in org_list
if item.get("abbrName") == org_name
]
else:
operate_org_id = org_list[0].get("orgId")
if not operate_org_id:
logger.error("门店ID为空")
return None
logger.info(f"获取门店ID成功: {operate_org_id}")
return operate_org_id
except Exception as e:
@@ -176,9 +186,9 @@ def get_operate_org_id(cookies: Dict[str, str]) -> Optional[str]:
def get_card_list(
cookies: Dict[str, str],
operate_org_id: str,
extract_func: Callable[[Dict], Optional[str]] = None
cookies: Dict[str, str],
operate_org_id: str,
extract_func: Callable[[Dict], Optional[str]] = None
) -> List[str]:
"""
获取会员卡列表
@@ -199,46 +209,45 @@ def get_card_list(
- 每页请求间隔0.2秒,避免请求过快
"""
card_list = []
try:
# 获取第一页,确定总页数
card_url = f"https://yunxiu.f6car.cn/marketing/card/paging?useStationIdOwnOrgList={operate_org_id}&pageSize=100&pageNo=1"
card_res = requests.get(url=card_url, cookies=cookies)
total_card = int(card_res.json().get("data", {}).get("total", 0))
if total_card == 0:
logger.info("未找到会员卡数据")
return card_list
total_page = total_card // 100 + (total_card % 100 > 0)
logger.info(f"会员卡总数: {total_card}, 总页数: {total_page}")
# 定义默认提取函数(提取客户ID
if extract_func is None:
def default_extract(card_item: Dict) -> Optional[str]:
return card_item.get("idCustomer")
extract_func = default_extract
# 分页获取所有会员卡数据
for page in tqdm(range(1, total_page + 1), desc="查询会员卡"):
card_url = (f"https://yunxiu.f6car.cn/marketing/card/paging?useStationIdOwnOrgList={operate_org_id}"
f"&pageSize=100&pageNo={page}")
f"&pageSize=100&pageNo={page}")
card_res = requests.get(url=card_url, cookies=cookies)
card_data_list = card_res.json().get("data", {}).get("data", [])
# 使用提取函数提取ID
for card_item in card_data_list:
extracted_id = extract_func(card_item)
if extracted_id is not None:
card_list.append(extracted_id)
time.sleep(0.2)
logger.info(f"获取会员卡列表成功,共 {len(card_list)}")
return card_list
except Exception as e:
logger.error(f"获取会员卡列表时发生错误: {e}")
return card_list
+485 -116
View File
@@ -11,11 +11,9 @@ import logging
import traceback
import requests
import time
from typing import Dict, Any, List, Optional
from datetime import datetime
from typing import Dict, Any
from tqdm import tqdm
from app.tasks.common import update_jiandaoyun, approve_workflow, get_operate_org_id, get_card_list, \
execute_failure_handler
from app.tasks.common import update_jiandaoyun, approve_workflow, get_operate_org_id
import pandas as pd
import os
@@ -45,53 +43,105 @@ def batch_disable_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd
- 执行完成后会自动删除上传的文件
- 执行结果会更新到简道云表单
"""
logger.info(f"开始执行项目批量启停任务,操作类型: {option}, 文件路径: {save_path}, 数据行数: {len(df)}")
if option == "批量启用":
type_ = 0 # 1 停用,0启用
ob_type = 1
else:
type_ = 1
ob_type = 0
logger.info(f"操作类型设置完成: type_={type_}, ob_type={ob_type}")
df = df.where(pd.notnull(df), None)
# 获取门店id
org_id = get_operate_org_id(data)
logger.info("正在获取门店ID...")
try:
org_id = get_operate_org_id(cookies)
logger.info(f"门店ID获取成功: {org_id}")
except Exception as e:
logger.error(f"获取门店ID失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 获取项目信息
logger.info("开始获取项目列表...")
json_data = {
'param': '',
'name': '',
'customCode': '',
'currentPage': 1,
'pageSize': 100,
'isDel': -type_,
'isDel': ob_type,
'customInvoiceCategory': 0,
'idOwnOrg': org_id,
}
response = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList',
cookies=cookies,
json=json_data,
)
all_project_list = []
total_pages = response.json().get("data", {}).get("totalPages", "")
for page in tqdm(range(1, total_pages + 1)):
json_data['currentPage'] = str(page)
try:
response = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList',
cookies=cookies,
json=json_data,
)
project_list = response.json().get("data", {}).get("records", [])
all_project_list.extend(project_list)
time.sleep(3.5)
response.raise_for_status()
all_project_list = []
total_pages = response.json().get("data", {}).get("totalPages", 0)
logger.info(f"获取项目列表响应: {response.json()}")
try:
total_pages = int(total_pages)
except (ValueError, TypeError):
logger.error(f"无法解析总页数: {total_pages}, 类型: {type(total_pages)}")
total_pages = 0
logger.info(f"项目列表总页数: {total_pages}")
for page in tqdm(range(1, total_pages + 1)):
json_data['currentPage'] = page
try:
response = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList',
cookies=cookies,
json=json_data,
)
time.sleep(3.5)
response.raise_for_status()
project_list = response.json().get("data", {}).get("records", [])
all_project_list.extend(project_list)
logger.debug(f"{page}页获取到{len(project_list)}条项目")
except requests.exceptions.RequestException as e:
logger.error(f"获取第{page}页项目列表失败: {str(e)}, 响应状态码: {getattr(e.response, 'status_code', 'N/A')}")
raise
logger.info(f"项目列表获取完成,总计: {len(all_project_list)}条项目")
except requests.exceptions.RequestException as e:
logger.error(f"获取项目列表失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 遍历获取到的项目信息停用文件中的项目
code_list = df.iloc[:, 0].dropna().astype(str).tolist()
res_data_list = []
logger.info(f"Excel文件中待处理的项目编码数量: {len(code_list)}")
results = []
consecutive_failures = 0 # 连续失败计数器
MAX_CONSECUTIVE_FAILURES = 100 # 最大连续失败次数
success_count = 0
failure_count = 0
logger.info("开始处理项目启停操作...")
for item in tqdm(all_project_list):
custom_code = item.get("customCode")
if not custom_code or str(custom_code) not in code_list or not code_list:
if not code_list or not custom_code or str(custom_code) not in code_list:
continue
logger.debug(f"正在处理项目编码: {custom_code}")
info_id = item.get("infoId")
pk_id = item.get("pkId")
if not info_id or not pk_id:
logger.warning(f"项目编码 {custom_code} 缺少必要字段: infoId={info_id}, pkId={pk_id}")
results.append({'项目编码': custom_code, '状态': '缺少必要字段(infoId或pkId)'})
failure_count += 1
consecutive_failures += 1
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
json_data = {
"orgIdList": [
org_id,
@@ -103,28 +153,70 @@ def batch_disable_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd
"idOwnOrg": org_id
}
try:
logger.debug(f"发送启停请求,项目编码: {custom_code}, 操作类型: {type_}")
response = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/editAttributeByType',
cookies=cookies,
json=json_data,
)
res_data_list.append(response.json())
time.sleep(3.5)
response.raise_for_status() # 抛出HTTP错误
results.append({'材料编码': custom_code, '状态': '停用/启用成功'})
# 检查业务响应码
resp_data = response.json()
if resp_data.get("code") == 200:
results.append({'项目编码': custom_code, '状态': '停用/启用成功'})
success_count += 1
consecutive_failures = 0 # 成功时重置计数器
logger.info(f"项目编码 {custom_code} 启停操作成功")
else:
msg = resp_data.get("message", "未知错误")
results.append({'项目编码': custom_code, '状态': f'停用/启用失败: {msg}'})
failure_count += 1
consecutive_failures += 1
logger.error(f"项目编码 {custom_code} 启停操作失败: {msg}, 响应数据: {resp_data}")
except requests.exceptions.RequestException as e:
results.append({'材料编码': custom_code, '状态': f'停用/启用失败: {str(e)}'})
pass
error_msg = str(e)
results.append({'项目编码': custom_code, '状态': f'停用/启用失败: {error_msg}'})
failure_count += 1
consecutive_failures += 1
logger.error(f"项目编码 {custom_code} 启停操作请求异常: {error_msg}, 堆栈信息: {traceback.format_exc()}")
# 检查连续失败次数
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
logger.info(f"项目启停处理完成,成功: {success_count}条, 失败: {failure_count}条, 总计: {len(results)}")
print({'msg': '已执行', 'msg_details': f'{results}'})
logger.info(f"停用/启用结果: {results}")
os.remove(save_path)
print(f'{save_path}已删除')
# 删除文件
logger.info(f"准备删除文件: {save_path}")
try:
os.remove(save_path)
logger.info(f"文件删除成功: {save_path}")
print(f'{save_path}已删除')
except Exception as e:
logger.error(f"删除文件失败: {save_path}, 错误信息: {str(e)}, 堆栈信息: {traceback.format_exc()}")
# 调用api回写改掉 执行明细与执行状态
msg = update_jiandaoyun(data, f'{results}')
if msg.get('msg'):
approve_workflow(data)
print('表单已自动提交至下一步')
logger.info("开始回写简道云表单...")
try:
msg = update_jiandaoyun(data, f'{results}')
logger.info(f"简道云表单回写结果: {msg}")
if msg.get('msg'):
logger.info("开始自动提交工作流...")
approve_workflow(data)
logger.info("工作流提交成功")
print('表单已自动提交至下一步')
else:
logger.warning(f"简道云表单回写失败: {msg}")
except Exception as e:
logger.error(f"回写简道云表单失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
logger.info(f"项目批量启停任务执行完成,操作类型: {option}")
def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str) -> None:
@@ -147,6 +239,7 @@ def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd
- 执行完成后会自动删除上传的文件
- 执行结果会更新到简道云表单
"""
logger.info(f"开始执行材料批量修改任务,文件: {save_path},行数: {len(df)}")
def safe_str(val):
"""将值转为字符串,NaN 返回空字符串"""
@@ -159,7 +252,13 @@ def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd
return not (pd.isna(new_val) or (isinstance(new_val, str) and new_val.strip() == ""))
# 获取门店id
org_id = get_operate_org_id(data)
logger.info("正在获取门店ID...")
try:
org_id = get_operate_org_id(cookies)
logger.info(f"门店ID获取成功: {org_id}")
except Exception as e:
logger.error(f"获取门店ID失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 第一步:获取所有材料列表(分页)
json_data = {
@@ -182,31 +281,46 @@ def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd
'getThirdPlatformCode': 0,
}
response = requests.post(
'https://ids-goods.f6car.com/f6-ids-goods/part/getExactPartStockInfo',
cookies=cookies,
json=json_data,
)
response.raise_for_status()
total_pages = response.json().get("data", {}).get("totalPages", 0)
all_materials_list = []
for page in tqdm(range(1, total_pages + 1), desc="获取材料列表"):
json_data['currentPage'] = page
resp = requests.post(
try:
response = requests.post(
'https://ids-goods.f6car.com/f6-ids-goods/part/getExactPartStockInfo',
cookies=cookies,
json=json_data,
)
resp.raise_for_status()
records = resp.json().get("data", {}).get("records", [])
all_materials_list.extend(records)
time.sleep(3.5)
response.raise_for_status()
total_pages = response.json().get("data", {}).get("totalPages", 0)
logger.info(f"材料列表页数: {total_pages}")
all_materials_list = []
total_pages = int(total_pages)
for page in tqdm(range(1, total_pages + 1), desc="获取材料列表"):
json_data['currentPage'] = page
try:
resp = requests.post(
'https://ids-goods.f6car.com/f6-ids-goods/part/getExactPartStockInfo',
cookies=cookies,
json=json_data,
)
time.sleep(3.5)
resp.raise_for_status()
records = resp.json().get("data", {}).get("records", [])
all_materials_list.extend(records)
except requests.exceptions.RequestException as e:
logger.error(f"获取第{page}页材料列表失败: {str(e)}, 响应状态码: {getattr(e.response, 'status_code', 'N/A')}")
raise
logger.info(f"材料列表获取完成,总计: {len(all_materials_list)}")
except requests.exceptions.RequestException as e:
logger.error(f"获取材料列表失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 第二步:构建 update_map(只存原始值,不预处理)
update_map = {}
skipped_count = 0
for _, row in df.iterrows():
orig_code = row.iloc[0] # 原材料编码
if pd.isna(orig_code) or str(orig_code).strip() == "":
skipped_count += 1
continue
orig_code = str(orig_code).strip()
update_map[orig_code] = {
@@ -215,9 +329,16 @@ def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd
"new_name": row.iloc[3],
"new_spec": row.iloc[4],
}
logger.info(f"待更新材料数: {len(update_map)},跳过空编码行: {skipped_count}")
# 第三步:遍历材料,按需更新
results = []
consecutive_failures = 0 # 连续失败计数器
MAX_CONSECUTIVE_FAILURES = 100 # 最大连续失败次数
success_count = 0
failure_count = 0
skip_count = 0
for item in tqdm(all_materials_list, desc="处理材料更新"):
custom_code = item.get("customCode")
if not custom_code or str(custom_code) not in update_map:
@@ -225,7 +346,15 @@ def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd
part_id = item.get("partId")
if not part_id:
results.append({'材料编码': custom_code, '状态': '缺少 partId'})
error_msg = '缺少 partId'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.warning(f"材料编码 {custom_code} 跳过/失败: {error_msg}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'材料编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
try:
@@ -240,13 +369,30 @@ def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd
params=params,
cookies=cookies,
)
time.sleep(3.5)
if materials_response.status_code != 200:
results.append({'材料编码': custom_code, '状态': f'获取明细失败: {materials_response.status_code}'})
error_msg = f'获取明细失败: {materials_response.status_code}'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"材料编码 {custom_code} {error_msg}, 响应内容: {materials_response.text[:200]}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'材料编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
detail = materials_response.json().get("data")
if not detail:
results.append({'材料编码': custom_code, '状态': '明细为空'})
error_msg = '明细为空'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"材料编码 {custom_code} {error_msg}, 响应JSON: {materials_response.json()}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'材料编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
updates = update_map[str(custom_code)]
@@ -277,33 +423,84 @@ def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd
cookies=cookies,
json=detail
)
time.sleep(3.5)
if update_resp.status_code == 200 and update_resp.json().get("code") == 200:
results.append({'材料编码': custom_code, '状态': '修改成功'})
success_count += 1
consecutive_failures = 0 # 成功时重置计数器
else:
msg = update_resp.json().get("message", "未知错误")
results.append({'材料编码': custom_code, '状态': f'修改失败: {msg}'})
error_msg = f'修改失败: {msg}'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"材料编码 {custom_code} {error_msg}, 响应数据: {update_resp.json()}")
except requests.exceptions.RequestException as e:
results.append({'材料编码': custom_code, '状态': f'请求异常: {str(e)}'})
error_msg = f'请求异常: {str(e)}'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"材料编码 {custom_code} {error_msg}, 堆栈信息: {traceback.format_exc()}")
except Exception as e:
results.append({'材料编码': custom_code, '状态': f'内部错误: {str(e)}'})
error_msg = f'内部错误: {str(e)}'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"材料编码 {custom_code} {error_msg}, 堆栈信息: {traceback.format_exc()}")
# 检查连续失败次数
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'材料编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
# 统计汇总(总行数 / 待处理 / 成功 / 失败 / 跳过)
total_rows = len(df)
to_process = len(update_map)
# results 里可能包含 “缺少 partId” 等失败信息;这里跳过数按空编码行计数即可
skip_count = skipped_count
summary = {
"总行数": total_rows,
"待处理": to_process,
"成功": success_count,
"失败": failure_count,
"跳过": skip_count,
}
results.insert(0, {"汇总": summary})
logger.info(f"材料修改汇总: {summary}")
# 第四步:清理与回写
print({'msg': '已执行', 'msg_details': results})
logger.info(f"材料批量修改结果: {results}")
# 结果回写包含汇总 + 明细
logger.info("材料批量修改完成,开始回写结果")
# 删除文件
logger.info(f"准备删除文件: {save_path}")
try:
os.remove(save_path)
logger.info(f"文件删除成功: {save_path}")
print(f'{save_path} 已删除')
except Exception as e:
logger.error(f"删除文件失败: {e}")
logger.error(f"删除文件失败: {save_path}, 错误信息: {str(e)}, 堆栈信息: {traceback.format_exc()}")
# 回写简道云
msg = update_jiandaoyun(data, str(results))
if msg.get('msg'):
approve_workflow(data)
print('表单已自动提交至下一步')
logger.info("开始回写简道云表单...")
try:
msg = update_jiandaoyun(data, str(results))
logger.info(f"简道云表单回写结果: {msg}")
if msg.get('msg'):
logger.info("开始自动提交工作流...")
approve_workflow(data)
logger.info("工作流提交成功")
print('表单已自动提交至下一步')
else:
logger.warning(f"简道云表单回写失败: {msg}")
except Exception as e:
logger.error(f"回写简道云表单失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
logger.info("材料批量修改任务执行完成")
def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str) -> None:
@@ -317,16 +514,18 @@ def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
cookies: 用户登录 F6 系统的 cookies 信息
df: Excel 文件读取的内容,DataFrame 格式,列顺序为:
[0:原项目编码, 1:新项目编码, 2:新项目名称, 3:业务分类,
4:销项税率, 5:项目说明, 6:车辆分类, 7:工时单价, 8:工时]
[0:原项目编码, 1:新项目编码, 2:新项目名称, 3:业务分类, 4:销项税率, 5:项目说明]
save_path: Excel 文件保存的地址,执行完成后会删除此文件
注意:
- 无效的项目编码(None、空字符串)会被跳过
- Excel 中某字段为空(NaN 或空字符串)时,保留原始项目中的对应字段
- 如果新项目名称与系统已有项目名称重复,则跳过处理
- 如果Excel中多个行的新项目名称重复,只执行第一条数据,后续重复的会被跳过
- 执行完成后会自动删除上传的文件
- 执行结果会更新到简道云表单
"""
logger.info(f"开始执行项目批量修改任务,文件: {save_path},行数: {len(df)}")
def safe_str(val):
"""将值转为字符串,NaN 返回空字符串"""
@@ -347,20 +546,43 @@ def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.
"""判断是否应该用 new_val 更新:非 NaN 且非空字符串"""
return not (pd.isna(new_val) or (isinstance(new_val, str) and new_val.strip() == ""))
def safe_iloc(row, idx, default=None):
"""安全按索引取行值,列数不足时返回 default,避免 IndexError"""
try:
if idx < len(row):
return row.iloc[idx]
except (IndexError, KeyError):
pass
return default
# 获取门店id
org_id = get_operate_org_id(data)
logger.info("获取门店ID...")
try:
org_id = get_operate_org_id(cookies)
logger.info(f"门店ID获取成功: {org_id}")
except Exception as e:
logger.error(f"获取门店ID失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 获取服务分类(用于映射业务分类名称 → pkId)
init_add_resp = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/initAdd',
cookies=cookies,
data={'idOwnOrg': org_id}
)
init_add_resp.raise_for_status()
service_category_list = init_add_resp.json().get("data", {}).get("serviceCategory", [])
category_name_to_pk = {item["name"]: item["pkId"] for item in service_category_list}
logger.info("获取服务分类列表...")
try:
init_add_resp = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/initAdd',
cookies=cookies,
data={'idOwnOrg': org_id}
)
time.sleep(3.5)
init_add_resp.raise_for_status()
service_category_list = init_add_resp.json().get("data", {}).get("serviceCategory", [])
category_name_to_pk = {item["name"]: item["pkId"] for item in service_category_list}
logger.info(f"服务分类列表获取成功,共{len(category_name_to_pk)}个分类")
except requests.exceptions.RequestException as e:
logger.error(f"获取服务分类列表失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 第一步:获取所有项目列表(分页)
logger.info("获取项目列表...")
json_data = {
'param': '',
'name': '',
@@ -372,64 +594,125 @@ def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.
'idOwnOrg': org_id,
}
response = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList',
cookies=cookies,
json=json_data,
)
response.raise_for_status()
total_pages = response.json().get("data", {}).get("totalPages", 0)
all_project_list = []
for page in tqdm(range(1, total_pages + 1), desc="获取项目列表"):
json_data['currentPage'] = page
resp = requests.post(
try:
response = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList',
cookies=cookies,
json=json_data,
)
resp.raise_for_status()
records = resp.json().get("data", {}).get("records", [])
all_project_list.extend(records)
time.sleep(3.5)
response.raise_for_status()
total_pages = response.json().get("data", {}).get("totalPages", 0)
logger.info(f"项目列表总页数: {total_pages}")
all_project_list = []
total_pages = int(total_pages)
for page in tqdm(range(1, total_pages + 1), desc="获取项目列表"):
json_data['currentPage'] = page
try:
resp = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList',
cookies=cookies,
json=json_data,
)
time.sleep(3.5)
resp.raise_for_status()
records = resp.json().get("data", {}).get("records", [])
all_project_list.extend(records)
except requests.exceptions.RequestException as e:
logger.error(f"获取第{page}页项目列表失败: {str(e)}, 响应状态码: {getattr(e.response, 'status_code', 'N/A')}")
raise
logger.info(f"项目列表获取完成,总计: {len(all_project_list)}条项目")
except requests.exceptions.RequestException as e:
logger.error(f"获取项目列表失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 构建系统已有项目名称集合(用于检查重复)
existing_project_names = set()
for project in all_project_list:
project_name = project.get("name")
if project_name:
existing_project_names.add(str(project_name).strip())
logger.info(f"系统已有项目名称数: {len(existing_project_names)}")
# 第二步:构建 update_map
logger.info("构建更新映射表...")
update_map = {}
for _, row in df.iterrows():
orig_code = row.iloc[0]
skipped_count = 0
category_not_found_count = 0
duplicate_name_in_df_count = 0 # DataFrame中重复的新项目名称数量
duplicate_name_in_system_count = 0 # 与系统已有项目名称重复的数量
seen_new_names = {} # 用于跟踪DataFrame中已出现的新项目名称,key为新名称,value为第一次出现的原项目编码
results = [] # 提前创建results,用于记录跳过的信息
for idx, row in df.iterrows():
orig_code = safe_iloc(row, 0)
if pd.isna(orig_code) or str(orig_code).strip() == "":
skipped_count += 1
results.append({'项目编码': '', '状态': '跳过: 项目编码为空'})
continue
orig_code = str(orig_code).strip()
# 计算工时费 = 单价 * 工时
price = safe_float(row.iloc[7])
work_hour = safe_float(row.iloc[8])
amount = None
if price is not None and work_hour is not None:
amount = round(price * work_hour, 2)
# 检查新项目名称(第2列,索引为2
new_name = safe_iloc(row, 2)
if should_update(new_name):
new_name_clean = safe_str(new_name)
# 检查DataFrame中是否有重复的新项目名称
if new_name_clean in seen_new_names:
duplicate_name_in_df_count += 1
logger.warning(f"DataFrame中项目名称重复,跳过: 原项目编码={orig_code}, 新项目名称={new_name_clean}, 首次出现在原项目编码={seen_new_names[new_name_clean]}")
results.append({'项目编码': orig_code, '状态': f'跳过: DataFrame中项目名称重复(首次出现在原项目编码={seen_new_names[new_name_clean]})'})
continue
# 检查新项目名称是否与系统已有项目名称重复
if new_name_clean in existing_project_names:
duplicate_name_in_system_count += 1
logger.warning(f"新项目名称与系统已有项目名称重复,跳过: 原项目编码={orig_code}, 新项目名称={new_name_clean}")
results.append({'项目编码': orig_code, '状态': f'跳过: 新项目名称与系统已有项目名称重复({new_name_clean})'})
continue
# 记录这个新名称
seen_new_names[new_name_clean] = orig_code
# 业务分类映射
category_name = row.iloc[3]
category_name = safe_iloc(row, 3)
category_pk = None
category_not_found = False
if should_update(category_name):
cat_name_clean = safe_str(category_name)
category_pk = category_name_to_pk.get(cat_name_clean)
if category_pk is None:
logger.warning(f"业务分类 '{cat_name_clean}' 未在系统中找到")
logger.warning(f"业务分类 '{cat_name_clean}' 未在系统中找到,项目编码: {orig_code}")
category_not_found = True
category_not_found_count += 1
# 列 6/7/8(车辆分类、工时单价、工时)不再传入,固定为 None,不更新这些字段
car_category_name = None
price = None
work_hour = None
amount = None
update_map[orig_code] = {
"new_customCode": row.iloc[1], # 新的自定义编码,取自当前行的第2列(索引为1)
"new_name": row.iloc[2], # 新的名称,取自当前行的第3列(索引为2)
"new_serviceCategoryId": category_pk, # 新的服务分类ID,由变量 category_pk 提供(通常为主键)
"new_taxRate": row.iloc[4], # 新的税率,取自当前行的第5列(索引为4)
"new_memo": row.iloc[5], # 新的备注信息,取自当前行的第6列(索引为5)
"new_carCategoryName": row.iloc[6], # 新的车型分类名称,取自当前行的第7列(索引为6)
"new_price": price, # 新的价格,由变量 price 提供(可能经过处理或计算)
"new_workHour": work_hour, # 新的工时数,由变量 work_hour 提供
"new_amount": amount, # 新的金额(可能是价格 × 工时等计算结果),由变量 amount 提供
"new_customCode": safe_iloc(row, 1), # 新项目编码
"new_name": safe_iloc(row, 2), # 新项目名称
"new_serviceCategoryId": category_pk,
"new_taxRate": safe_iloc(row, 4),
"new_memo": safe_iloc(row, 5),
"new_carCategoryName": car_category_name,
"new_price": price,
"new_workHour": work_hour,
"new_amount": amount,
}
logger.info(f"映射表完成: 待处理={len(update_map)}, 跳过空编码={skipped_count}, 分类未找到={category_not_found_count}, DF重复名={duplicate_name_in_df_count}, 系统重名={duplicate_name_in_system_count}")
# 第三步:遍历项目,按需更新
results = []
logger.info("开始处理项目更新...")
consecutive_failures = 0 # 连续失败计数器
MAX_CONSECUTIVE_FAILURES = 100 # 最大连续失败次数
success_count = 0
failure_count = 0
for item in tqdm(all_project_list, desc="处理项目更新"):
custom_code = item.get("customCode")
if not custom_code or str(custom_code) not in update_map:
@@ -437,7 +720,15 @@ def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.
service_id = item.get("pkId") # 项目主键
if not service_id:
results.append({'项目编码': custom_code, '状态': '缺少 pkId'})
error_msg = '缺少 pkId'
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.warning(f"项目编码 {custom_code} {error_msg}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
try:
@@ -451,13 +742,30 @@ def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.
params=params,
cookies=cookies,
)
time.sleep(3.5)
if detail_resp.status_code != 200:
results.append({'项目编码': custom_code, '状态': f'获取明细失败: {detail_resp.status_code}'})
error_msg = f'获取明细失败: {detail_resp.status_code}'
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"项目编码 {custom_code} {error_msg}, 响应内容: {detail_resp.text[:200]}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
detail = detail_resp.json().get("data")
if not detail:
results.append({'项目编码': custom_code, '状态': '明细为空'})
error_msg = '明细为空'
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"项目编码 {custom_code} {error_msg}, 响应JSON: {detail_resp.json()}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
updates = update_map[str(custom_code)]
@@ -467,6 +775,18 @@ def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.
detail["customCode"] = safe_str(updates["new_customCode"])
if should_update(updates["new_name"]):
name_val = safe_str(updates["new_name"])
# 再次检查新项目名称是否与系统已有项目名称重复(防止在构建update_map后系统状态发生变化)
if name_val in existing_project_names:
error_msg = f'跳过: 新项目名称与系统已有项目名称重复({name_val})'
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.warning(f"项目编码 {custom_code} {error_msg}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
detail["name"] = name_val
if "showName" in detail:
detail["showName"] = name_val
@@ -496,30 +816,79 @@ def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.
cookies=cookies,
json=detail
)
time.sleep(3.5)
if update_resp.status_code == 200 and update_resp.json().get("code") == 200:
results.append({'项目编码': custom_code, '状态': '修改成功'})
success_count += 1
consecutive_failures = 0 # 成功时重置计数器
else:
msg = update_resp.json().get("message", "未知错误")
results.append({'项目编码': custom_code, '状态': f'修改失败: {msg}'})
error_msg = f'修改失败: {msg}'
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"项目编码 {custom_code} {error_msg}, 响应数据: {update_resp.json()}")
except requests.exceptions.RequestException as e:
error_msg = f"请求异常: {str(e)}"
logger.error(f"项目编码 {custom_code} {error_msg}, 堆栈信息: {traceback.format_exc()}")
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
except Exception as e:
error_msg = f"异常: {str(e)}"
logger.error(f"项目 {custom_code} 更新出错: {traceback.format_exc()}")
logger.error(f"项目编码 {custom_code} 更新出错: {traceback.format_exc()}")
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
# 检查连续失败次数
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
# 汇总插入到结果顶部,便于简道云直接看到统计
total_rows = len(df)
to_process = len(update_map)
skip_count = skipped_count + duplicate_name_in_df_count + duplicate_name_in_system_count
summary = {
"总行数": total_rows,
"待处理": to_process,
"成功": success_count,
"失败": failure_count,
"跳过": skip_count,
}
results.insert(0, {"汇总": summary})
logger.info(f"项目修改汇总: {summary}")
# 第四步:清理与回写
print({'msg': '已执行', 'msg_details': results})
logger.info(f"项目批量修改结果: {results}")
logger.info("项目批量修改完成,开始回写结果")
# 删除文件
logger.info(f"准备删除文件: {save_path}")
try:
os.remove(save_path)
logger.info(f"文件删除成功: {save_path}")
print(f'{save_path} 已删除')
except Exception as e:
logger.error(f"删除文件失败: {e}")
logger.error(f"删除文件失败: {save_path}, 错误信息: {str(e)}, 堆栈信息: {traceback.format_exc()}")
# 回写简道云
msg = update_jiandaoyun(data, str(results))
if msg.get('msg'):
approve_workflow(data)
print('表单已自动提交至下一步')
logger.info("开始回写简道云表单...")
try:
msg = update_jiandaoyun(data, str(results))
logger.info(f"简道云表单回写结果: {msg}")
if msg.get('msg'):
logger.info("开始自动提交工作流...")
approve_workflow(data)
logger.info("工作流提交成功")
print('表单已自动提交至下一步')
else:
logger.warning(f"简道云表单回写失败: {msg}")
except Exception as e:
logger.error(f"回写简道云表单失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
logger.info("项目批量修改任务执行完成")
+1
View File
@@ -9,3 +9,4 @@ pytesseract==0.3.13
Requests==2.32.5
tqdm==4.67.1
uvicorn==0.40.0
openpyxl
Binary file not shown.
Binary file not shown.