""" 项目材料相关后台任务模块 本模块包含项目材料相关的后台任务,包括: - 项目信息批量停用 - 材料信息批量修改 这些任务在后台线程中执行,不会阻塞主请求。 """ import logging import traceback import requests import time from typing import Dict, Any from tqdm import tqdm from app.tasks.common import update_jiandaoyun, approve_workflow, get_operate_org_id import pandas as pd import os logger = logging.getLogger('app') def batch_disable_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str, option) -> None: """ 项目批量启停后台任务 在后台线程中批量停用项目,从 Excel 文件中读取项目编码。 执行完成后会更新简道云表单并自动提交工作流。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 cookies: 用户登录 F6 系统的 cookies 信息 df: Excel 文件读取的内容,DataFrame 格式,第一列为项目编码 save_path: Excel 文件保存的地址,执行完成后会删除此文件 option: 批量启用、批量停用 Returns: None 注意: - 无效的项目(None、空字符串)会被跳过 - 执行完成后会自动删除上传的文件 - 执行结果会更新到简道云表单 """ logger.info(f"开始执行项目批量启停任务,操作类型: {option}, 文件路径: {save_path}, 数据行数: {len(df)}") if option == "批量启用": type_ = 0 # 1 停用,0启用 ob_type = 1 else: type_ = 1 ob_type = 0 logger.info(f"操作类型设置完成: type_={type_}, ob_type={ob_type}") df = df.where(pd.notnull(df), None) # 获取门店id logger.info("正在获取门店ID...") try: org_id = get_operate_org_id(cookies) logger.info(f"门店ID获取成功: {org_id}") except Exception as e: logger.error(f"获取门店ID失败: {str(e)}, 堆栈信息: {traceback.format_exc()}") raise # 获取项目信息 logger.info("开始获取项目列表...") json_data = { 'param': '', 'name': '', 'customCode': '', 'currentPage': 1, 'pageSize': 100, 'isDel': ob_type, 'customInvoiceCategory': 0, 'idOwnOrg': org_id, } try: response = requests.post( 'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList', cookies=cookies, json=json_data, ) time.sleep(1) response.raise_for_status() all_project_list = [] total_pages = response.json().get("data", {}).get("totalPages", 0) logger.info(f"获取项目列表响应: {response.json()}") try: total_pages = int(total_pages) except (ValueError, TypeError): logger.error(f"无法解析总页数: {total_pages}, 类型: {type(total_pages)}") total_pages = 0 logger.info(f"项目列表总页数: {total_pages}") for page in tqdm(range(1, total_pages + 1)): json_data['currentPage'] = page try: response = requests.post( 'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList', cookies=cookies, json=json_data, ) time.sleep(1) response.raise_for_status() project_list = response.json().get("data", {}).get("records", []) all_project_list.extend(project_list) logger.debug(f"第{page}页获取到{len(project_list)}条项目") except requests.exceptions.RequestException as e: logger.error(f"获取第{page}页项目列表失败: {str(e)}, 响应状态码: {getattr(e.response, 'status_code', 'N/A')}") raise logger.info(f"项目列表获取完成,总计: {len(all_project_list)}条项目") except requests.exceptions.RequestException as e: logger.error(f"获取项目列表失败: {str(e)}, 堆栈信息: {traceback.format_exc()}") raise # 遍历获取到的项目信息停用文件中的项目 code_list = df.iloc[:, 0].dropna().astype(str).tolist() logger.info(f"Excel文件中待处理的项目编码数量: {len(code_list)}") results = [] consecutive_failures = 0 # 连续失败计数器 MAX_CONSECUTIVE_FAILURES = 100 # 最大连续失败次数 success_count = 0 failure_count = 0 logger.info("开始处理项目启停操作...") for item in tqdm(all_project_list): custom_code = item.get("customCode") if not code_list or not custom_code or str(custom_code) not in code_list: continue logger.debug(f"正在处理项目编码: {custom_code}") info_id = item.get("infoId") pk_id = item.get("pkId") if not info_id or not pk_id: logger.warning(f"项目编码 {custom_code} 缺少必要字段: infoId={info_id}, pkId={pk_id}") results.append({'项目编码': custom_code, '状态': '缺少必要字段(infoId或pkId)'}) failure_count += 1 consecutive_failures += 1 if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'}) logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行") break continue json_data = { "orgIdList": [ org_id, ], "isDel": type_, # 1 停用 0启用 "infoId": info_id, "pkId": pk_id, "type": 1, "idOwnOrg": org_id } try: logger.debug(f"发送启停请求,项目编码: {custom_code}, 操作类型: {type_}") response = requests.post( 'https://ids-goods.f6car.cn/f6-ids-goods/service/editAttributeByType', cookies=cookies, json=json_data, ) time.sleep(2) response.raise_for_status() # 抛出HTTP错误 # 检查业务响应码 resp_data = response.json() if resp_data.get("code") == 200: results.append({'项目编码': custom_code, '状态': '停用/启用成功'}) success_count += 1 consecutive_failures = 0 # 成功时重置计数器 logger.info(f"项目编码 {custom_code} 启停操作成功") else: msg = resp_data.get("message", "未知错误") results.append({'项目编码': custom_code, '状态': f'停用/启用失败: {msg}'}) failure_count += 1 consecutive_failures += 1 logger.error(f"项目编码 {custom_code} 启停操作失败: {msg}, 响应数据: {resp_data}") except requests.exceptions.RequestException as e: error_msg = str(e) results.append({'项目编码': custom_code, '状态': f'停用/启用失败: {error_msg}'}) failure_count += 1 consecutive_failures += 1 logger.error(f"项目编码 {custom_code} 启停操作请求异常: {error_msg}, 堆栈信息: {traceback.format_exc()}") # 检查连续失败次数 if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'}) logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行") break logger.info(f"项目启停处理完成,成功: {success_count}条, 失败: {failure_count}条, 总计: {len(results)}条") print({'msg': '已执行', 'msg_details': f'{results}'}) logger.info(f"停用/启用结果: {results}") # 删除文件 logger.info(f"准备删除文件: {save_path}") try: os.remove(save_path) logger.info(f"文件删除成功: {save_path}") print(f'{save_path}已删除') except Exception as e: logger.error(f"删除文件失败: {save_path}, 错误信息: {str(e)}, 堆栈信息: {traceback.format_exc()}") # 调用api回写改掉 执行明细与执行状态 logger.info("开始回写简道云表单...") try: msg = update_jiandaoyun(data, f'{results}') logger.info(f"简道云表单回写结果: {msg}") if msg.get('msg'): logger.info("开始自动提交工作流...") approve_workflow(data) logger.info("工作流提交成功") print('表单已自动提交至下一步') else: logger.warning(f"简道云表单回写失败: {msg}") except Exception as e: logger.error(f"回写简道云表单失败: {str(e)}, 堆栈信息: {traceback.format_exc()}") logger.info(f"项目批量启停任务执行完成,操作类型: {option}") def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str) -> None: """ 材料批量修改后台任务 在后台线程中批量修改材料,从 Excel 文件中读取材料编码。 执行完成后会更新简道云表单并自动提交工作流。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 cookies: 用户登录 F6 系统的 cookies 信息 df: Excel 文件读取的内容,DataFrame 格式,列顺序为: [原材料编码, 新材料编码, 品牌, 名称, 规格] save_path: Excel 文件保存的地址,执行完成后会删除此文件 注意: - 无效的材料编码(None、空字符串)会被跳过 - Excel 中某字段为空(NaN 或空字符串)时,保留原材料中的对应字段 - 执行完成后会自动删除上传的文件 - 执行结果会更新到简道云表单 """ logger.info(f"开始执行材料批量修改任务,文件: {save_path},行数: {len(df)}") def safe_str(val): """将值转为字符串,NaN 返回空字符串""" if pd.isna(val): return "" return str(val).strip() def should_update(new_val): """判断是否应该用 new_val 更新:非 NaN 且非空字符串""" return not (pd.isna(new_val) or (isinstance(new_val, str) and new_val.strip() == "")) # 获取门店id logger.info("正在获取门店ID...") try: org_id = get_operate_org_id(cookies) logger.info(f"门店ID获取成功: {org_id}") except Exception as e: logger.error(f"获取门店ID失败: {str(e)}, 堆栈信息: {traceback.format_exc()}") raise # 第一步:获取所有材料列表(分页) json_data = { 'keyWord': '', 'idOwnOrg': org_id, 'currentPage': 1, 'pageSize': 100, 'name': '', 'brand': '', 'supplierCode': '', 'customCode': '', 'categoryName': '', 'categoryId': '', 'labelId': '', 'labelName': '', 'spec': '', 'applyModel': '', 'sellPurchaseStatuses': [2, 3, 4, 5], 'customInvoiceCategory': 0, 'getThirdPlatformCode': 0, } try: response = requests.post( 'https://ids-goods.f6car.com/f6-ids-goods/part/getExactPartStockInfo', cookies=cookies, json=json_data, ) response.raise_for_status() total_pages = response.json().get("data", {}).get("totalPages", 0) logger.info(f"材料列表页数: {total_pages}") all_materials_list = [] total_pages = int(total_pages) for page in tqdm(range(1, total_pages + 1), desc="获取材料列表"): json_data['currentPage'] = page try: resp = requests.post( 'https://ids-goods.f6car.com/f6-ids-goods/part/getExactPartStockInfo', cookies=cookies, json=json_data, ) time.sleep(1) resp.raise_for_status() records = resp.json().get("data", {}).get("records", []) all_materials_list.extend(records) except requests.exceptions.RequestException as e: logger.error(f"获取第{page}页材料列表失败: {str(e)}, 响应状态码: {getattr(e.response, 'status_code', 'N/A')}") raise logger.info(f"材料列表获取完成,总计: {len(all_materials_list)}条") except requests.exceptions.RequestException as e: logger.error(f"获取材料列表失败: {str(e)}, 堆栈信息: {traceback.format_exc()}") raise # 第二步:构建 update_map(只存原始值,不预处理) update_map = {} skipped_count = 0 for _, row in df.iterrows(): orig_code = row.iloc[0] # 原材料编码 if pd.isna(orig_code) or str(orig_code).strip() == "": skipped_count += 1 continue orig_code = str(orig_code).strip() update_map[orig_code] = { "new_customCode": row.iloc[1], "new_brand": row.iloc[2], "new_name": row.iloc[3], "new_spec": row.iloc[4], } logger.info(f"待更新材料数: {len(update_map)},跳过空编码行: {skipped_count}") # 第三步:遍历材料,按需更新 results = [] consecutive_failures = 0 # 连续失败计数器 MAX_CONSECUTIVE_FAILURES = 100 # 最大连续失败次数 success_count = 0 failure_count = 0 skip_count = 0 for item in tqdm(all_materials_list, desc="处理材料更新"): custom_code = item.get("customCode") if not custom_code or str(custom_code) not in update_map: continue part_id = item.get("partId") if not part_id: error_msg = '缺少 partId' results.append({'材料编码': custom_code, '状态': error_msg}) failure_count += 1 consecutive_failures += 1 logger.warning(f"材料编码 {custom_code} 跳过/失败: {error_msg}") if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: results.append({'材料编码': '系统保护', '状态': '连续100次失败已终止'}) logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行") break continue try: # 获取材料明细 params = { 'partId': part_id, 'customInvoiceCategory': '0', 'getThirdPlatformCode': '0', } materials_response = requests.get( 'https://ids-goods.f6car.com/f6-ids-goods/part/getPartInfo', params=params, cookies=cookies, ) time.sleep(1) if materials_response.status_code != 200: error_msg = f'获取明细失败: {materials_response.status_code}' results.append({'材料编码': custom_code, '状态': error_msg}) failure_count += 1 consecutive_failures += 1 logger.error(f"材料编码 {custom_code} {error_msg}, 响应内容: {materials_response.text[:200]}") if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: results.append({'材料编码': '系统保护', '状态': '连续100次失败已终止'}) logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行") break continue detail = materials_response.json().get("data") if not detail: error_msg = '明细为空' results.append({'材料编码': custom_code, '状态': error_msg}) failure_count += 1 consecutive_failures += 1 logger.error(f"材料编码 {custom_code} {error_msg}, 响应JSON: {materials_response.json()}") if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: results.append({'材料编码': '系统保护', '状态': '连续100次失败已终止'}) logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行") break continue updates = update_map[str(custom_code)] # 安全更新字段:仅当 Excel 提供有效值时才覆盖 if should_update(updates["new_customCode"]): detail["customCode"] = safe_str(updates["new_customCode"]) if should_update(updates["new_brand"]): detail["brand"] = safe_str(updates["new_brand"]) if should_update(updates["new_name"]): name_val = safe_str(updates["new_name"]) detail["name"] = name_val detail["showName"] = name_val # 同步 showName if should_update(updates["new_spec"]): detail["spec"] = safe_str(updates["new_spec"]) # 修复价格格式和数组(必须!) for f in ["purchasePrice", "sellPrice"]: val = detail.get(f) if isinstance(val, (int, float)): detail[f] = f"{val:.2f}" if detail.get("partBarCodeVos") is None: detail["partBarCodeVos"] = [] # 提交更新 update_resp = requests.post( 'https://ids-goods.f6car.com/f6-ids-goods/part/updateAreaPartBasicInfo', cookies=cookies, json=detail ) time.sleep(2) if update_resp.status_code == 200 and update_resp.json().get("code") == 200: results.append({'材料编码': custom_code, '状态': '修改成功'}) success_count += 1 consecutive_failures = 0 # 成功时重置计数器 else: msg = update_resp.json().get("message", "未知错误") error_msg = f'修改失败: {msg}' results.append({'材料编码': custom_code, '状态': error_msg}) failure_count += 1 consecutive_failures += 1 logger.error(f"材料编码 {custom_code} {error_msg}, 响应数据: {update_resp.json()}") except requests.exceptions.RequestException as e: error_msg = f'请求异常: {str(e)}' results.append({'材料编码': custom_code, '状态': error_msg}) failure_count += 1 consecutive_failures += 1 logger.error(f"材料编码 {custom_code} {error_msg}, 堆栈信息: {traceback.format_exc()}") except Exception as e: error_msg = f'内部错误: {str(e)}' results.append({'材料编码': custom_code, '状态': error_msg}) failure_count += 1 consecutive_failures += 1 logger.error(f"材料编码 {custom_code} {error_msg}, 堆栈信息: {traceback.format_exc()}") # 检查连续失败次数 if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: results.append({'材料编码': '系统保护', '状态': '连续100次失败已终止'}) logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行") break # 统计汇总(总行数 / 待处理 / 成功 / 失败 / 跳过) total_rows = len(df) to_process = len(update_map) # results 里可能包含 “缺少 partId” 等失败信息;这里跳过数按空编码行计数即可 skip_count = skipped_count summary = { "总行数": total_rows, "待处理": to_process, "成功": success_count, "失败": failure_count, "跳过": skip_count, } results.insert(0, {"汇总": summary}) logger.info(f"材料修改汇总: {summary}") # 第四步:清理与回写 print({'msg': '已执行', 'msg_details': results}) # 结果回写包含汇总 + 明细 logger.info("材料批量修改完成,开始回写结果") # 删除文件 logger.info(f"准备删除文件: {save_path}") try: os.remove(save_path) logger.info(f"文件删除成功: {save_path}") print(f'{save_path} 已删除') except Exception as e: logger.error(f"删除文件失败: {save_path}, 错误信息: {str(e)}, 堆栈信息: {traceback.format_exc()}") # 回写简道云 logger.info("开始回写简道云表单...") try: msg = update_jiandaoyun(data, str(results)) logger.info(f"简道云表单回写结果: {msg}") if msg.get('msg'): logger.info("开始自动提交工作流...") approve_workflow(data) logger.info("工作流提交成功") print('表单已自动提交至下一步') else: logger.warning(f"简道云表单回写失败: {msg}") except Exception as e: logger.error(f"回写简道云表单失败: {str(e)}, 堆栈信息: {traceback.format_exc()}") logger.info("材料批量修改任务执行完成") def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str) -> None: """ 项目批量修改后台任务 在后台线程中批量修改项目,从 Excel 文件中读取项目编码。 执行完成后会更新简道云表单并自动提交工作流。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 cookies: 用户登录 F6 系统的 cookies 信息 df: Excel 文件读取的内容,DataFrame 格式,列顺序为: [0:原项目编码, 1:新项目编码, 2:新项目名称, 3:业务分类, 4:销项税率, 5:项目说明] save_path: Excel 文件保存的地址,执行完成后会删除此文件 注意: - 无效的项目编码(None、空字符串)会被跳过 - Excel 中某字段为空(NaN 或空字符串)时,保留原始项目中的对应字段 - 如果新项目名称与系统已有项目名称重复,则跳过处理 - 如果Excel中多个行的新项目名称重复,只执行第一条数据,后续重复的会被跳过 - 执行完成后会自动删除上传的文件 - 执行结果会更新到简道云表单 """ logger.info(f"开始执行项目批量修改任务,文件: {save_path},行数: {len(df)}") def safe_str(val): """将值转为字符串,NaN 返回空字符串""" if pd.isna(val): return "" return str(val).strip() def safe_float(val): """安全转换为 float,失败返回 None""" if pd.isna(val): return None try: return float(val) except (ValueError, TypeError): return None def should_update(new_val): """判断是否应该用 new_val 更新:非 NaN 且非空字符串""" return not (pd.isna(new_val) or (isinstance(new_val, str) and new_val.strip() == "")) def safe_iloc(row, idx, default=None): """安全按索引取行值,列数不足时返回 default,避免 IndexError""" try: if idx < len(row): return row.iloc[idx] except (IndexError, KeyError): pass return default # 获取门店id logger.info("获取门店ID...") try: org_id = get_operate_org_id(cookies) logger.info(f"门店ID获取成功: {org_id}") except Exception as e: logger.error(f"获取门店ID失败: {str(e)}, 堆栈信息: {traceback.format_exc()}") raise # 获取服务分类(用于映射业务分类名称 → pkId) logger.info("获取服务分类列表...") try: init_add_resp = requests.post( 'https://ids-goods.f6car.cn/f6-ids-goods/service/initAdd', cookies=cookies, data={'idOwnOrg': org_id} ) time.sleep(1) init_add_resp.raise_for_status() service_category_list = init_add_resp.json().get("data", {}).get("serviceCategory", []) category_name_to_pk = {item["name"]: item["pkId"] for item in service_category_list} logger.info(f"服务分类列表获取成功,共{len(category_name_to_pk)}个分类") except requests.exceptions.RequestException as e: logger.error(f"获取服务分类列表失败: {str(e)}, 堆栈信息: {traceback.format_exc()}") raise # 第一步:获取所有项目列表(分页) logger.info("获取项目列表...") json_data = { 'param': '', 'name': '', 'customCode': '', 'currentPage': 1, 'pageSize': 100, 'isDel': 0, # 只查启用的 'customInvoiceCategory': 0, 'idOwnOrg': org_id, } try: response = requests.post( 'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList', cookies=cookies, json=json_data, ) response.raise_for_status() total_pages = response.json().get("data", {}).get("totalPages", 0) logger.info(f"项目列表总页数: {total_pages}") all_project_list = [] total_pages = int(total_pages) for page in tqdm(range(1, total_pages + 1), desc="获取项目列表"): json_data['currentPage'] = page try: resp = requests.post( 'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList', cookies=cookies, json=json_data, ) time.sleep(1) resp.raise_for_status() records = resp.json().get("data", {}).get("records", []) all_project_list.extend(records) except requests.exceptions.RequestException as e: logger.error(f"获取第{page}页项目列表失败: {str(e)}, 响应状态码: {getattr(e.response, 'status_code', 'N/A')}") raise logger.info(f"项目列表获取完成,总计: {len(all_project_list)}条项目") except requests.exceptions.RequestException as e: logger.error(f"获取项目列表失败: {str(e)}, 堆栈信息: {traceback.format_exc()}") raise # 构建系统已有项目名称集合(用于检查重复) existing_project_names = set() for project in all_project_list: project_name = project.get("name") if project_name: existing_project_names.add(str(project_name).strip()) logger.info(f"系统已有项目名称数: {len(existing_project_names)}") # 第二步:构建 update_map logger.info("构建更新映射表...") update_map = {} skipped_count = 0 category_not_found_count = 0 duplicate_name_in_df_count = 0 # DataFrame中重复的新项目名称数量 duplicate_name_in_system_count = 0 # 与系统已有项目名称重复的数量 seen_new_names = {} # 用于跟踪DataFrame中已出现的新项目名称,key为新名称,value为第一次出现的原项目编码 results = [] # 提前创建results,用于记录跳过的信息 for idx, row in df.iterrows(): orig_code = safe_iloc(row, 0) if pd.isna(orig_code) or str(orig_code).strip() == "": skipped_count += 1 results.append({'项目编码': '', '状态': '跳过: 项目编码为空'}) continue orig_code = str(orig_code).strip() # 检查新项目名称(第2列,索引为2) new_name = safe_iloc(row, 2) if should_update(new_name): new_name_clean = safe_str(new_name) # 检查DataFrame中是否有重复的新项目名称 if new_name_clean in seen_new_names: duplicate_name_in_df_count += 1 logger.warning(f"DataFrame中项目名称重复,跳过: 原项目编码={orig_code}, 新项目名称={new_name_clean}, 首次出现在原项目编码={seen_new_names[new_name_clean]}") results.append({'项目编码': orig_code, '状态': f'跳过: DataFrame中项目名称重复(首次出现在原项目编码={seen_new_names[new_name_clean]})'}) continue # 检查新项目名称是否与系统已有项目名称重复 if new_name_clean in existing_project_names: duplicate_name_in_system_count += 1 logger.warning(f"新项目名称与系统已有项目名称重复,跳过: 原项目编码={orig_code}, 新项目名称={new_name_clean}") results.append({'项目编码': orig_code, '状态': f'跳过: 新项目名称与系统已有项目名称重复({new_name_clean})'}) continue # 记录这个新名称 seen_new_names[new_name_clean] = orig_code # 业务分类映射 category_name = safe_iloc(row, 3) category_pk = None category_not_found = False if should_update(category_name): cat_name_clean = safe_str(category_name) category_pk = category_name_to_pk.get(cat_name_clean) if category_pk is None: logger.warning(f"业务分类 '{cat_name_clean}' 未在系统中找到,项目编码: {orig_code}") category_not_found = True category_not_found_count += 1 # 列 6/7/8(车辆分类、工时单价、工时)不再传入,固定为 None,不更新这些字段 car_category_name = None price = None work_hour = None amount = None update_map[orig_code] = { "new_customCode": safe_iloc(row, 1), # 新项目编码 "new_name": safe_iloc(row, 2), # 新项目名称 "new_serviceCategoryId": category_pk, "new_taxRate": safe_iloc(row, 4), "new_memo": safe_iloc(row, 5), "new_carCategoryName": car_category_name, "new_price": price, "new_workHour": work_hour, "new_amount": amount, } logger.info(f"映射表完成: 待处理={len(update_map)}, 跳过空编码={skipped_count}, 分类未找到={category_not_found_count}, DF重复名={duplicate_name_in_df_count}, 系统重名={duplicate_name_in_system_count}") # 第三步:遍历项目,按需更新 logger.info("开始处理项目更新...") consecutive_failures = 0 # 连续失败计数器 MAX_CONSECUTIVE_FAILURES = 100 # 最大连续失败次数 success_count = 0 failure_count = 0 for item in tqdm(all_project_list, desc="处理项目更新"): custom_code = item.get("customCode") if not custom_code or str(custom_code) not in update_map: continue service_id = item.get("pkId") # 项目主键 if not service_id: error_msg = '缺少 pkId' results.append({'项目编码': custom_code, '状态': error_msg}) failure_count += 1 consecutive_failures += 1 logger.warning(f"项目编码 {custom_code} {error_msg}") if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'}) logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行") break continue try: # 获取项目明细 params = { 'serviceId': service_id, 'customInvoiceCategory': '0', } detail_resp = requests.get( 'https://ids-goods.f6car.cn/f6-ids-goods/service/viewService', params=params, cookies=cookies, ) time.sleep(1) if detail_resp.status_code != 200: error_msg = f'获取明细失败: {detail_resp.status_code}' results.append({'项目编码': custom_code, '状态': error_msg}) failure_count += 1 consecutive_failures += 1 logger.error(f"项目编码 {custom_code} {error_msg}, 响应内容: {detail_resp.text[:200]}") if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'}) logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行") break continue detail = detail_resp.json().get("data") if not detail: error_msg = '明细为空' results.append({'项目编码': custom_code, '状态': error_msg}) failure_count += 1 consecutive_failures += 1 logger.error(f"项目编码 {custom_code} {error_msg}, 响应JSON: {detail_resp.json()}") if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'}) logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行") break continue updates = update_map[str(custom_code)] # === 安全更新字段(仅非空时覆盖)=== if should_update(updates["new_customCode"]): detail["customCode"] = safe_str(updates["new_customCode"]) if should_update(updates["new_name"]): name_val = safe_str(updates["new_name"]) # 再次检查新项目名称是否与系统已有项目名称重复(防止在构建update_map后系统状态发生变化) if name_val in existing_project_names: error_msg = f'跳过: 新项目名称与系统已有项目名称重复({name_val})' results.append({'项目编码': custom_code, '状态': error_msg}) failure_count += 1 consecutive_failures += 1 logger.warning(f"项目编码 {custom_code} {error_msg}") if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'}) logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行") break continue detail["name"] = name_val if "showName" in detail: detail["showName"] = name_val if updates["new_serviceCategoryId"] is not None: detail["serviceCategoryId"] = updates["new_serviceCategoryId"] if should_update(updates["new_taxRate"]): detail["taxRate"] = safe_str(updates["new_taxRate"]) if should_update(updates["new_memo"]): detail["memo"] = safe_str(updates["new_memo"]) if should_update(updates["new_carCategoryName"]): detail["carCategoryName"] = safe_str(updates["new_carCategoryName"]) if updates["new_price"] is not None: detail["price"] = f"{updates['new_price']:.2f}" if updates["new_workHour"] is not None: detail["workHour"] = f"{updates['new_workHour']:.2f}" if updates["new_amount"] is not None: detail["amount"] = f"{updates['new_amount']:.2f}" # === 提交更新 === update_resp = requests.post( 'https://ids-goods.f6car.cn/f6-ids-goods/service/editService', cookies=cookies, json=detail ) time.sleep(2) if update_resp.status_code == 200 and update_resp.json().get("code") == 200: results.append({'项目编码': custom_code, '状态': '修改成功'}) success_count += 1 consecutive_failures = 0 # 成功时重置计数器 else: msg = update_resp.json().get("message", "未知错误") error_msg = f'修改失败: {msg}' results.append({'项目编码': custom_code, '状态': error_msg}) failure_count += 1 consecutive_failures += 1 logger.error(f"项目编码 {custom_code} {error_msg}, 响应数据: {update_resp.json()}") except requests.exceptions.RequestException as e: error_msg = f"请求异常: {str(e)}" logger.error(f"项目编码 {custom_code} {error_msg}, 堆栈信息: {traceback.format_exc()}") results.append({'项目编码': custom_code, '状态': error_msg}) failure_count += 1 consecutive_failures += 1 except Exception as e: error_msg = f"异常: {str(e)}" logger.error(f"项目编码 {custom_code} 更新出错: {traceback.format_exc()}") results.append({'项目编码': custom_code, '状态': error_msg}) failure_count += 1 consecutive_failures += 1 # 检查连续失败次数 if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: results.append({'项目编码': '系统保护', '状态': '连续100次失败已终止'}) logger.warning(f"连续失败{MAX_CONSECUTIVE_FAILURES}次,已中止任务执行") break # 汇总插入到结果顶部,便于简道云直接看到统计 total_rows = len(df) to_process = len(update_map) skip_count = skipped_count + duplicate_name_in_df_count + duplicate_name_in_system_count summary = { "总行数": total_rows, "待处理": to_process, "成功": success_count, "失败": failure_count, "跳过": skip_count, } results.insert(0, {"汇总": summary}) logger.info(f"项目修改汇总: {summary}") # 第四步:清理与回写 print({'msg': '已执行', 'msg_details': results}) logger.info("项目批量修改完成,开始回写结果") # 删除文件 logger.info(f"准备删除文件: {save_path}") try: os.remove(save_path) logger.info(f"文件删除成功: {save_path}") print(f'{save_path} 已删除') except Exception as e: logger.error(f"删除文件失败: {save_path}, 错误信息: {str(e)}, 堆栈信息: {traceback.format_exc()}") # 回写简道云 logger.info("开始回写简道云表单...") try: msg = update_jiandaoyun(data, str(results)) logger.info(f"简道云表单回写结果: {msg}") if msg.get('msg'): logger.info("开始自动提交工作流...") approve_workflow(data) logger.info("工作流提交成功") print('表单已自动提交至下一步') else: logger.warning(f"简道云表单回写失败: {msg}") except Exception as e: logger.error(f"回写简道云表单失败: {str(e)}, 堆栈信息: {traceback.format_exc()}") logger.info("项目批量修改任务执行完成")