Files
jdy_fastapi/app/tasks/material_tasks.py
panda f0fcea03bb 1.新增gitignore文件
2.新启动脚本制作
2026-01-30 11:41:05 +08:00

893 lines
39 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
项目材料相关后台任务模块
本模块包含项目材料相关的后台任务,包括:
- 项目信息批量停用
- 材料信息批量修改
这些任务在后台线程中执行,不会阻塞主请求。
"""
import logging
import traceback
import requests
import time
from typing import Dict, Any
from tqdm import tqdm
from app.tasks.common import update_jiandaoyun, approve_workflow, get_operate_org_id
import pandas as pd
import os
logger = logging.getLogger('app')
def batch_disable_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str,
option) -> None:
"""
项目批量启停后台任务
在后台线程中批量停用项目,从 Excel 文件中读取项目编码。
执行完成后会更新简道云表单并自动提交工作流。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
cookies: 用户登录 F6 系统的 cookies 信息
df: Excel 文件读取的内容,DataFrame 格式,第一列为项目编码
save_path: Excel 文件保存的地址,执行完成后会删除此文件
option: 批量启用、批量停用
Returns:
None
注意:
- 无效的项目(None、空字符串)会被跳过
- 执行完成后会自动删除上传的文件
- 执行结果会更新到简道云表单
"""
logger.info(f"开始执行项目批量启停任务,操作类型: {option}, 文件路径: {save_path}, 数据行数: {len(df)}")
if option == "批量启用":
type_ = 0 # 1 停用,0启用
ob_type = 1
else:
type_ = 1
ob_type = 0
logger.info(f"操作类型设置完成: type_={type_}, ob_type={ob_type}")
df = df.where(pd.notnull(df), None)
# 获取门店id
logger.info("正在获取门店ID...")
try:
org_id = get_operate_org_id(cookies)
logger.info(f"门店ID获取成功: {org_id}")
except Exception as e:
logger.error(f"获取门店ID失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 获取项目信息
logger.info("开始获取项目列表...")
json_data = {
'param': '',
'name': '',
'customCode': '',
'currentPage': 1,
'pageSize': 100,
'isDel': ob_type,
'customInvoiceCategory': 0,
'idOwnOrg': org_id,
}
try:
response = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList',
cookies=cookies,
json=json_data,
)
time.sleep(1)
response.raise_for_status()
all_project_list = []
total_pages = response.json().get("data", {}).get("totalPages", 0)
logger.info(f"获取项目列表响应: {response.json()}")
try:
total_pages = int(total_pages)
except (ValueError, TypeError):
logger.error(f"无法解析总页数: {total_pages}, 类型: {type(total_pages)}")
total_pages = 0
logger.info(f"项目列表总页数: {total_pages}")
for page in tqdm(range(1, total_pages + 1)):
json_data['currentPage'] = page
try:
response = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList',
cookies=cookies,
json=json_data,
)
time.sleep(1)
response.raise_for_status()
project_list = response.json().get("data", {}).get("records", [])
all_project_list.extend(project_list)
logger.debug(f"{page}页获取到{len(project_list)}条项目")
except requests.exceptions.RequestException as e:
logger.error(f"获取第{page}页项目列表失败: {str(e)}, 响应状态码: {getattr(e.response, 'status_code', 'N/A')}")
raise
logger.info(f"项目列表获取完成,总计: {len(all_project_list)}条项目")
except requests.exceptions.RequestException as e:
logger.error(f"获取项目列表失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 遍历获取到的项目信息停用文件中的项目
code_list = df.iloc[:, 0].dropna().astype(str).tolist()
logger.info(f"Excel文件中待处理的项目编码数量: {len(code_list)}")
results = []
consecutive_failures = 0 # 连续失败计数器
MAX_CONSECUTIVE_FAILURES = 100 # 最大连续失败次数
success_count = 0
failure_count = 0
logger.info("开始处理项目启停操作...")
for item in tqdm(all_project_list):
custom_code = item.get("customCode")
if not code_list or not custom_code or str(custom_code) not in code_list:
continue
logger.debug(f"正在处理项目编码: {custom_code}")
info_id = item.get("infoId")
pk_id = item.get("pkId")
if not info_id or not pk_id:
logger.warning(f"项目编码 {custom_code} 缺少必要字段: infoId={info_id}, pkId={pk_id}")
results.append({'项目编码': custom_code, '状态': '缺少必要字段(infoId或pkId)'})
failure_count += 1
consecutive_failures += 1
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
json_data = {
"orgIdList": [
org_id,
],
"isDel": type_, # 1 停用 0启用
"infoId": info_id,
"pkId": pk_id,
"type": 1,
"idOwnOrg": org_id
}
try:
logger.debug(f"发送启停请求,项目编码: {custom_code}, 操作类型: {type_}")
response = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/editAttributeByType',
cookies=cookies,
json=json_data,
)
time.sleep(2)
response.raise_for_status() # 抛出HTTP错误
# 检查业务响应码
resp_data = response.json()
if resp_data.get("code") == 200:
results.append({'项目编码': custom_code, '状态': '停用/启用成功'})
success_count += 1
consecutive_failures = 0 # 成功时重置计数器
logger.info(f"项目编码 {custom_code} 启停操作成功")
else:
msg = resp_data.get("message", "未知错误")
results.append({'项目编码': custom_code, '状态': f'停用/启用失败: {msg}'})
failure_count += 1
consecutive_failures += 1
logger.error(f"项目编码 {custom_code} 启停操作失败: {msg}, 响应数据: {resp_data}")
except requests.exceptions.RequestException as e:
error_msg = str(e)
results.append({'项目编码': custom_code, '状态': f'停用/启用失败: {error_msg}'})
failure_count += 1
consecutive_failures += 1
logger.error(f"项目编码 {custom_code} 启停操作请求异常: {error_msg}, 堆栈信息: {traceback.format_exc()}")
# 检查连续失败次数
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
logger.info(f"项目启停处理完成,成功: {success_count}条, 失败: {failure_count}条, 总计: {len(results)}")
print({'msg': '已执行', 'msg_details': f'{results}'})
logger.info(f"停用/启用结果: {results}")
# 删除文件
logger.info(f"准备删除文件: {save_path}")
try:
os.remove(save_path)
logger.info(f"文件删除成功: {save_path}")
print(f'{save_path}已删除')
except Exception as e:
logger.error(f"删除文件失败: {save_path}, 错误信息: {str(e)}, 堆栈信息: {traceback.format_exc()}")
# 调用api回写改掉 执行明细与执行状态
logger.info("开始回写简道云表单...")
try:
msg = update_jiandaoyun(data, f'{results}')
logger.info(f"简道云表单回写结果: {msg}")
if msg.get('msg'):
logger.info("开始自动提交工作流...")
approve_workflow(data)
logger.info("工作流提交成功")
print('表单已自动提交至下一步')
else:
logger.warning(f"简道云表单回写失败: {msg}")
except Exception as e:
logger.error(f"回写简道云表单失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
logger.info(f"项目批量启停任务执行完成,操作类型: {option}")
def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str) -> None:
"""
材料批量修改后台任务
在后台线程中批量修改材料,从 Excel 文件中读取材料编码。
执行完成后会更新简道云表单并自动提交工作流。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
cookies: 用户登录 F6 系统的 cookies 信息
df: Excel 文件读取的内容,DataFrame 格式,列顺序为:
[原材料编码, 新材料编码, 品牌, 名称, 规格]
save_path: Excel 文件保存的地址,执行完成后会删除此文件
注意:
- 无效的材料编码(None、空字符串)会被跳过
- Excel 中某字段为空(NaN 或空字符串)时,保留原材料中的对应字段
- 执行完成后会自动删除上传的文件
- 执行结果会更新到简道云表单
"""
logger.info(f"开始执行材料批量修改任务,文件: {save_path},行数: {len(df)}")
def safe_str(val):
"""将值转为字符串,NaN 返回空字符串"""
if pd.isna(val):
return ""
return str(val).strip()
def should_update(new_val):
"""判断是否应该用 new_val 更新:非 NaN 且非空字符串"""
return not (pd.isna(new_val) or (isinstance(new_val, str) and new_val.strip() == ""))
# 获取门店id
logger.info("正在获取门店ID...")
try:
org_id = get_operate_org_id(cookies)
logger.info(f"门店ID获取成功: {org_id}")
except Exception as e:
logger.error(f"获取门店ID失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 第一步:获取所有材料列表(分页)
json_data = {
'keyWord': '',
'idOwnOrg': org_id,
'currentPage': 1,
'pageSize': 100,
'name': '',
'brand': '',
'supplierCode': '',
'customCode': '',
'categoryName': '',
'categoryId': '',
'labelId': '',
'labelName': '',
'spec': '',
'applyModel': '',
'sellPurchaseStatuses': [2, 3, 4, 5],
'customInvoiceCategory': 0,
'getThirdPlatformCode': 0,
}
try:
response = requests.post(
'https://ids-goods.f6car.com/f6-ids-goods/part/getExactPartStockInfo',
cookies=cookies,
json=json_data,
)
response.raise_for_status()
total_pages = response.json().get("data", {}).get("totalPages", 0)
logger.info(f"材料列表页数: {total_pages}")
all_materials_list = []
total_pages = int(total_pages)
for page in tqdm(range(1, total_pages + 1), desc="获取材料列表"):
json_data['currentPage'] = page
try:
resp = requests.post(
'https://ids-goods.f6car.com/f6-ids-goods/part/getExactPartStockInfo',
cookies=cookies,
json=json_data,
)
time.sleep(1)
resp.raise_for_status()
records = resp.json().get("data", {}).get("records", [])
all_materials_list.extend(records)
except requests.exceptions.RequestException as e:
logger.error(f"获取第{page}页材料列表失败: {str(e)}, 响应状态码: {getattr(e.response, 'status_code', 'N/A')}")
raise
logger.info(f"材料列表获取完成,总计: {len(all_materials_list)}")
except requests.exceptions.RequestException as e:
logger.error(f"获取材料列表失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 第二步:构建 update_map(只存原始值,不预处理)
update_map = {}
skipped_count = 0
for _, row in df.iterrows():
orig_code = row.iloc[0] # 原材料编码
if pd.isna(orig_code) or str(orig_code).strip() == "":
skipped_count += 1
continue
orig_code = str(orig_code).strip()
update_map[orig_code] = {
"new_customCode": row.iloc[1],
"new_brand": row.iloc[2],
"new_name": row.iloc[3],
"new_spec": row.iloc[4],
}
logger.info(f"待更新材料数: {len(update_map)},跳过空编码行: {skipped_count}")
# 第三步:遍历材料,按需更新
results = []
consecutive_failures = 0 # 连续失败计数器
MAX_CONSECUTIVE_FAILURES = 100 # 最大连续失败次数
success_count = 0
failure_count = 0
skip_count = 0
for item in tqdm(all_materials_list, desc="处理材料更新"):
custom_code = item.get("customCode")
if not custom_code or str(custom_code) not in update_map:
continue
part_id = item.get("partId")
if not part_id:
error_msg = '缺少 partId'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.warning(f"材料编码 {custom_code} 跳过/失败: {error_msg}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'材料编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
try:
# 获取材料明细
params = {
'partId': part_id,
'customInvoiceCategory': '0',
'getThirdPlatformCode': '0',
}
materials_response = requests.get(
'https://ids-goods.f6car.com/f6-ids-goods/part/getPartInfo',
params=params,
cookies=cookies,
)
time.sleep(1)
if materials_response.status_code != 200:
error_msg = f'获取明细失败: {materials_response.status_code}'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"材料编码 {custom_code} {error_msg}, 响应内容: {materials_response.text[:200]}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'材料编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
detail = materials_response.json().get("data")
if not detail:
error_msg = '明细为空'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"材料编码 {custom_code} {error_msg}, 响应JSON: {materials_response.json()}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'材料编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
updates = update_map[str(custom_code)]
# 安全更新字段:仅当 Excel 提供有效值时才覆盖
if should_update(updates["new_customCode"]):
detail["customCode"] = safe_str(updates["new_customCode"])
if should_update(updates["new_brand"]):
detail["brand"] = safe_str(updates["new_brand"])
if should_update(updates["new_name"]):
name_val = safe_str(updates["new_name"])
detail["name"] = name_val
detail["showName"] = name_val # 同步 showName
if should_update(updates["new_spec"]):
detail["spec"] = safe_str(updates["new_spec"])
# 修复价格格式和数组(必须!)
for f in ["purchasePrice", "sellPrice"]:
val = detail.get(f)
if isinstance(val, (int, float)):
detail[f] = f"{val:.2f}"
if detail.get("partBarCodeVos") is None:
detail["partBarCodeVos"] = []
# 提交更新
update_resp = requests.post(
'https://ids-goods.f6car.com/f6-ids-goods/part/updateAreaPartBasicInfo',
cookies=cookies,
json=detail
)
time.sleep(2)
if update_resp.status_code == 200 and update_resp.json().get("code") == 200:
results.append({'材料编码': custom_code, '状态': '修改成功'})
success_count += 1
consecutive_failures = 0 # 成功时重置计数器
else:
msg = update_resp.json().get("message", "未知错误")
error_msg = f'修改失败: {msg}'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"材料编码 {custom_code} {error_msg}, 响应数据: {update_resp.json()}")
except requests.exceptions.RequestException as e:
error_msg = f'请求异常: {str(e)}'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"材料编码 {custom_code} {error_msg}, 堆栈信息: {traceback.format_exc()}")
except Exception as e:
error_msg = f'内部错误: {str(e)}'
results.append({'材料编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"材料编码 {custom_code} {error_msg}, 堆栈信息: {traceback.format_exc()}")
# 检查连续失败次数
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'材料编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
# 统计汇总(总行数 / 待处理 / 成功 / 失败 / 跳过)
total_rows = len(df)
to_process = len(update_map)
# results 里可能包含 “缺少 partId” 等失败信息;这里跳过数按空编码行计数即可
skip_count = skipped_count
summary = {
"总行数": total_rows,
"待处理": to_process,
"成功": success_count,
"失败": failure_count,
"跳过": skip_count,
}
results.insert(0, {"汇总": summary})
logger.info(f"材料修改汇总: {summary}")
# 第四步:清理与回写
print({'msg': '已执行', 'msg_details': results})
# 结果回写包含汇总 + 明细
logger.info("材料批量修改完成,开始回写结果")
# 删除文件
logger.info(f"准备删除文件: {save_path}")
try:
os.remove(save_path)
logger.info(f"文件删除成功: {save_path}")
print(f'{save_path} 已删除')
except Exception as e:
logger.error(f"删除文件失败: {save_path}, 错误信息: {str(e)}, 堆栈信息: {traceback.format_exc()}")
# 回写简道云
logger.info("开始回写简道云表单...")
try:
msg = update_jiandaoyun(data, str(results))
logger.info(f"简道云表单回写结果: {msg}")
if msg.get('msg'):
logger.info("开始自动提交工作流...")
approve_workflow(data)
logger.info("工作流提交成功")
print('表单已自动提交至下一步')
else:
logger.warning(f"简道云表单回写失败: {msg}")
except Exception as e:
logger.error(f"回写简道云表单失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
logger.info("材料批量修改任务执行完成")
def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str) -> None:
"""
项目批量修改后台任务
在后台线程中批量修改项目,从 Excel 文件中读取项目编码。
执行完成后会更新简道云表单并自动提交工作流。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
cookies: 用户登录 F6 系统的 cookies 信息
df: Excel 文件读取的内容,DataFrame 格式,列顺序为:
[0:原项目编码, 1:新项目编码, 2:新项目名称, 3:业务分类, 4:销项税率, 5:项目说明]
save_path: Excel 文件保存的地址,执行完成后会删除此文件
注意:
- 无效的项目编码(None、空字符串)会被跳过
- Excel 中某字段为空(NaN 或空字符串)时,保留原始项目中的对应字段
- 如果新项目名称与系统已有项目名称重复,则跳过处理
- 如果Excel中多个行的新项目名称重复,只执行第一条数据,后续重复的会被跳过
- 执行完成后会自动删除上传的文件
- 执行结果会更新到简道云表单
"""
logger.info(f"开始执行项目批量修改任务,文件: {save_path},行数: {len(df)}")
def safe_str(val):
"""将值转为字符串,NaN 返回空字符串"""
if pd.isna(val):
return ""
return str(val).strip()
def safe_float(val):
"""安全转换为 float,失败返回 None"""
if pd.isna(val):
return None
try:
return float(val)
except (ValueError, TypeError):
return None
def should_update(new_val):
"""判断是否应该用 new_val 更新:非 NaN 且非空字符串"""
return not (pd.isna(new_val) or (isinstance(new_val, str) and new_val.strip() == ""))
def safe_iloc(row, idx, default=None):
"""安全按索引取行值,列数不足时返回 default,避免 IndexError"""
try:
if idx < len(row):
return row.iloc[idx]
except (IndexError, KeyError):
pass
return default
# 获取门店id
logger.info("获取门店ID...")
try:
org_id = get_operate_org_id(cookies)
logger.info(f"门店ID获取成功: {org_id}")
except Exception as e:
logger.error(f"获取门店ID失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 获取服务分类(用于映射业务分类名称 → pkId)
logger.info("获取服务分类列表...")
try:
init_add_resp = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/initAdd',
cookies=cookies,
data={'idOwnOrg': org_id}
)
time.sleep(1)
init_add_resp.raise_for_status()
service_category_list = init_add_resp.json().get("data", {}).get("serviceCategory", [])
category_name_to_pk = {item["name"]: item["pkId"] for item in service_category_list}
logger.info(f"服务分类列表获取成功,共{len(category_name_to_pk)}个分类")
except requests.exceptions.RequestException as e:
logger.error(f"获取服务分类列表失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 第一步:获取所有项目列表(分页)
logger.info("获取项目列表...")
json_data = {
'param': '',
'name': '',
'customCode': '',
'currentPage': 1,
'pageSize': 100,
'isDel': 0, # 只查启用的
'customInvoiceCategory': 0,
'idOwnOrg': org_id,
}
try:
response = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList',
cookies=cookies,
json=json_data,
)
response.raise_for_status()
total_pages = response.json().get("data", {}).get("totalPages", 0)
logger.info(f"项目列表总页数: {total_pages}")
all_project_list = []
total_pages = int(total_pages)
for page in tqdm(range(1, total_pages + 1), desc="获取项目列表"):
json_data['currentPage'] = page
try:
resp = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList',
cookies=cookies,
json=json_data,
)
time.sleep(1)
resp.raise_for_status()
records = resp.json().get("data", {}).get("records", [])
all_project_list.extend(records)
except requests.exceptions.RequestException as e:
logger.error(f"获取第{page}页项目列表失败: {str(e)}, 响应状态码: {getattr(e.response, 'status_code', 'N/A')}")
raise
logger.info(f"项目列表获取完成,总计: {len(all_project_list)}条项目")
except requests.exceptions.RequestException as e:
logger.error(f"获取项目列表失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
raise
# 构建系统已有项目名称集合(用于检查重复)
existing_project_names = set()
for project in all_project_list:
project_name = project.get("name")
if project_name:
existing_project_names.add(str(project_name).strip())
logger.info(f"系统已有项目名称数: {len(existing_project_names)}")
# 第二步:构建 update_map
logger.info("构建更新映射表...")
update_map = {}
skipped_count = 0
category_not_found_count = 0
duplicate_name_in_df_count = 0 # DataFrame中重复的新项目名称数量
duplicate_name_in_system_count = 0 # 与系统已有项目名称重复的数量
seen_new_names = {} # 用于跟踪DataFrame中已出现的新项目名称,key为新名称,value为第一次出现的原项目编码
results = [] # 提前创建results,用于记录跳过的信息
for idx, row in df.iterrows():
orig_code = safe_iloc(row, 0)
if pd.isna(orig_code) or str(orig_code).strip() == "":
skipped_count += 1
results.append({'项目编码': '', '状态': '跳过: 项目编码为空'})
continue
orig_code = str(orig_code).strip()
# 检查新项目名称(第2列,索引为2
new_name = safe_iloc(row, 2)
if should_update(new_name):
new_name_clean = safe_str(new_name)
# 检查DataFrame中是否有重复的新项目名称
if new_name_clean in seen_new_names:
duplicate_name_in_df_count += 1
logger.warning(f"DataFrame中项目名称重复,跳过: 原项目编码={orig_code}, 新项目名称={new_name_clean}, 首次出现在原项目编码={seen_new_names[new_name_clean]}")
results.append({'项目编码': orig_code, '状态': f'跳过: DataFrame中项目名称重复(首次出现在原项目编码={seen_new_names[new_name_clean]})'})
continue
# 检查新项目名称是否与系统已有项目名称重复
if new_name_clean in existing_project_names:
duplicate_name_in_system_count += 1
logger.warning(f"新项目名称与系统已有项目名称重复,跳过: 原项目编码={orig_code}, 新项目名称={new_name_clean}")
results.append({'项目编码': orig_code, '状态': f'跳过: 新项目名称与系统已有项目名称重复({new_name_clean})'})
continue
# 记录这个新名称
seen_new_names[new_name_clean] = orig_code
# 业务分类映射
category_name = safe_iloc(row, 3)
category_pk = None
category_not_found = False
if should_update(category_name):
cat_name_clean = safe_str(category_name)
category_pk = category_name_to_pk.get(cat_name_clean)
if category_pk is None:
logger.warning(f"业务分类 '{cat_name_clean}' 未在系统中找到,项目编码: {orig_code}")
category_not_found = True
category_not_found_count += 1
# 列 6/7/8(车辆分类、工时单价、工时)不再传入,固定为 None,不更新这些字段
car_category_name = None
price = None
work_hour = None
amount = None
update_map[orig_code] = {
"new_customCode": safe_iloc(row, 1), # 新项目编码
"new_name": safe_iloc(row, 2), # 新项目名称
"new_serviceCategoryId": category_pk,
"new_taxRate": safe_iloc(row, 4),
"new_memo": safe_iloc(row, 5),
"new_carCategoryName": car_category_name,
"new_price": price,
"new_workHour": work_hour,
"new_amount": amount,
}
logger.info(f"映射表完成: 待处理={len(update_map)}, 跳过空编码={skipped_count}, 分类未找到={category_not_found_count}, DF重复名={duplicate_name_in_df_count}, 系统重名={duplicate_name_in_system_count}")
# 第三步:遍历项目,按需更新
logger.info("开始处理项目更新...")
consecutive_failures = 0 # 连续失败计数器
MAX_CONSECUTIVE_FAILURES = 100 # 最大连续失败次数
success_count = 0
failure_count = 0
for item in tqdm(all_project_list, desc="处理项目更新"):
custom_code = item.get("customCode")
if not custom_code or str(custom_code) not in update_map:
continue
service_id = item.get("pkId") # 项目主键
if not service_id:
error_msg = '缺少 pkId'
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.warning(f"项目编码 {custom_code} {error_msg}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
try:
# 获取项目明细
params = {
'serviceId': service_id,
'customInvoiceCategory': '0',
}
detail_resp = requests.get(
'https://ids-goods.f6car.cn/f6-ids-goods/service/viewService',
params=params,
cookies=cookies,
)
time.sleep(1)
if detail_resp.status_code != 200:
error_msg = f'获取明细失败: {detail_resp.status_code}'
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"项目编码 {custom_code} {error_msg}, 响应内容: {detail_resp.text[:200]}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
detail = detail_resp.json().get("data")
if not detail:
error_msg = '明细为空'
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"项目编码 {custom_code} {error_msg}, 响应JSON: {detail_resp.json()}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
updates = update_map[str(custom_code)]
# === 安全更新字段(仅非空时覆盖)===
if should_update(updates["new_customCode"]):
detail["customCode"] = safe_str(updates["new_customCode"])
if should_update(updates["new_name"]):
name_val = safe_str(updates["new_name"])
# 再次检查新项目名称是否与系统已有项目名称重复(防止在构建update_map后系统状态发生变化)
if name_val in existing_project_names:
error_msg = f'跳过: 新项目名称与系统已有项目名称重复({name_val})'
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.warning(f"项目编码 {custom_code} {error_msg}")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
continue
detail["name"] = name_val
if "showName" in detail:
detail["showName"] = name_val
if updates["new_serviceCategoryId"] is not None:
detail["serviceCategoryId"] = updates["new_serviceCategoryId"]
if should_update(updates["new_taxRate"]):
detail["taxRate"] = safe_str(updates["new_taxRate"])
if should_update(updates["new_memo"]):
detail["memo"] = safe_str(updates["new_memo"])
if should_update(updates["new_carCategoryName"]):
detail["carCategoryName"] = safe_str(updates["new_carCategoryName"])
if updates["new_price"] is not None:
detail["price"] = f"{updates['new_price']:.2f}"
if updates["new_workHour"] is not None:
detail["workHour"] = f"{updates['new_workHour']:.2f}"
if updates["new_amount"] is not None:
detail["amount"] = f"{updates['new_amount']:.2f}"
# === 提交更新 ===
update_resp = requests.post(
'https://ids-goods.f6car.cn/f6-ids-goods/service/editService',
cookies=cookies,
json=detail
)
time.sleep(2)
if update_resp.status_code == 200 and update_resp.json().get("code") == 200:
results.append({'项目编码': custom_code, '状态': '修改成功'})
success_count += 1
consecutive_failures = 0 # 成功时重置计数器
else:
msg = update_resp.json().get("message", "未知错误")
error_msg = f'修改失败: {msg}'
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
logger.error(f"项目编码 {custom_code} {error_msg}, 响应数据: {update_resp.json()}")
except requests.exceptions.RequestException as e:
error_msg = f"请求异常: {str(e)}"
logger.error(f"项目编码 {custom_code} {error_msg}, 堆栈信息: {traceback.format_exc()}")
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
except Exception as e:
error_msg = f"异常: {str(e)}"
logger.error(f"项目编码 {custom_code} 更新出错: {traceback.format_exc()}")
results.append({'项目编码': custom_code, '状态': error_msg})
failure_count += 1
consecutive_failures += 1
# 检查连续失败次数
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'})
logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行")
break
# 汇总插入到结果顶部,便于简道云直接看到统计
total_rows = len(df)
to_process = len(update_map)
skip_count = skipped_count + duplicate_name_in_df_count + duplicate_name_in_system_count
summary = {
"总行数": total_rows,
"待处理": to_process,
"成功": success_count,
"失败": failure_count,
"跳过": skip_count,
}
results.insert(0, {"汇总": summary})
logger.info(f"项目修改汇总: {summary}")
# 第四步:清理与回写
print({'msg': '已执行', 'msg_details': results})
logger.info("项目批量修改完成,开始回写结果")
# 删除文件
logger.info(f"准备删除文件: {save_path}")
try:
os.remove(save_path)
logger.info(f"文件删除成功: {save_path}")
print(f'{save_path} 已删除')
except Exception as e:
logger.error(f"删除文件失败: {save_path}, 错误信息: {str(e)}, 堆栈信息: {traceback.format_exc()}")
# 回写简道云
logger.info("开始回写简道云表单...")
try:
msg = update_jiandaoyun(data, str(results))
logger.info(f"简道云表单回写结果: {msg}")
if msg.get('msg'):
logger.info("开始自动提交工作流...")
approve_workflow(data)
logger.info("工作流提交成功")
print('表单已自动提交至下一步')
else:
logger.warning(f"简道云表单回写失败: {msg}")
except Exception as e:
logger.error(f"回写简道云表单失败: {str(e)}, 堆栈信息: {traceback.format_exc()}")
logger.info("项目批量修改任务执行完成")