""" 项目材料相关后台任务模块 本模块包含项目材料相关的后台任务,包括: - 项目信息批量停用 - 材料信息批量修改 这些任务在后台线程中执行,不会阻塞主请求。 """ import logging import traceback import requests import time from typing import Dict, Any, List, Optional from datetime import datetime from tqdm import tqdm from app.tasks.common import update_jiandaoyun, approve_workflow, get_operate_org_id, get_card_list, \ execute_failure_handler import pandas as pd import os logger = logging.getLogger('app') def batch_disable_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str, option) -> None: """ 项目批量启停后台任务 在后台线程中批量停用项目,从 Excel 文件中读取项目编码。 执行完成后会更新简道云表单并自动提交工作流。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 cookies: 用户登录 F6 系统的 cookies 信息 df: Excel 文件读取的内容,DataFrame 格式,第一列为项目编码 save_path: Excel 文件保存的地址,执行完成后会删除此文件 option: 批量启用、批量停用 Returns: None 注意: - 无效的项目(None、空字符串)会被跳过 - 执行完成后会自动删除上传的文件 - 执行结果会更新到简道云表单 """ if option == "批量启用": type_ = 0 # 1 停用,0启用 else: type_ = 1 df = df.where(pd.notnull(df), None) # 获取门店id org_id = get_operate_org_id(data) # 获取项目信息 json_data = { 'param': '', 'name': '', 'customCode': '', 'currentPage': 1, 'pageSize': 100, 'isDel': -type_, 'customInvoiceCategory': 0, 'idOwnOrg': org_id, } response = requests.post( 'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList', cookies=cookies, json=json_data, ) all_project_list = [] total_pages = response.json().get("data", {}).get("totalPages", "") for page in tqdm(range(1, total_pages + 1)): json_data['currentPage'] = str(page) response = requests.post( 'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList', cookies=cookies, json=json_data, ) project_list = response.json().get("data", {}).get("records", []) all_project_list.extend(project_list) # 遍历获取到的项目信息停用文件中的项目 code_list = df.iloc[:, 0].dropna().astype(str).tolist() res_data_list = [] results = [] for item in tqdm(all_project_list): custom_code = item.get("customCode") if not custom_code or str(custom_code) not in code_list or not code_list: continue info_id = item.get("infoId") pk_id = item.get("pkId") json_data = { "orgIdList": [ org_id, ], "isDel": type_, # 1 停用 0启用 "infoId": info_id, "pkId": pk_id, "type": 1, "idOwnOrg": org_id } try: response = requests.post( 'https://ids-goods.f6car.cn/f6-ids-goods/service/editAttributeByType', cookies=cookies, json=json_data, ) res_data_list.append(response.json()) response.raise_for_status() # 抛出HTTP错误 results.append({'材料编码': custom_code, '状态': '停用/启用成功'}) except requests.exceptions.RequestException as e: results.append({'材料编码': custom_code, '状态': f'停用/启用失败: {str(e)}'}) pass print({'msg': '已执行', 'msg_details': f'{results}'}) logger.info(f"停用/启用结果: {results}") os.remove(save_path) print(f'{save_path}已删除') # 调用api回写改掉 执行明细与执行状态 msg = update_jiandaoyun(data, f'{results}') if msg.get('msg'): approve_workflow(data) print('表单已自动提交至下一步') def batch_modify_materials(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str) -> None: """ 材料批量修改后台任务 在后台线程中批量修改材料,从 Excel 文件中读取材料编码。 执行完成后会更新简道云表单并自动提交工作流。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 cookies: 用户登录 F6 系统的 cookies 信息 df: Excel 文件读取的内容,DataFrame 格式,列顺序为: [原材料编码, 新材料编码, 品牌, 名称, 规格] save_path: Excel 文件保存的地址,执行完成后会删除此文件 注意: - 无效的材料编码(None、空字符串)会被跳过 - Excel 中某字段为空(NaN 或空字符串)时,保留原材料中的对应字段 - 执行完成后会自动删除上传的文件 - 执行结果会更新到简道云表单 """ def safe_str(val): """将值转为字符串,NaN 返回空字符串""" if pd.isna(val): return "" return str(val).strip() def should_update(new_val): """判断是否应该用 new_val 更新:非 NaN 且非空字符串""" return not (pd.isna(new_val) or (isinstance(new_val, str) and new_val.strip() == "")) # 获取门店id org_id = get_operate_org_id(data) # 第一步:获取所有材料列表(分页) json_data = { 'keyWord': '', 'idOwnOrg': org_id, 'currentPage': 1, 'pageSize': 100, 'name': '', 'brand': '', 'supplierCode': '', 'customCode': '', 'categoryName': '', 'categoryId': '', 'labelId': '', 'labelName': '', 'spec': '', 'applyModel': '', 'sellPurchaseStatuses': [2, 3, 4, 5], 'customInvoiceCategory': 0, 'getThirdPlatformCode': 0, } response = requests.post( 'https://ids-goods.f6car.com/f6-ids-goods/part/getExactPartStockInfo', cookies=cookies, json=json_data, ) response.raise_for_status() total_pages = response.json().get("data", {}).get("totalPages", 0) all_materials_list = [] for page in tqdm(range(1, total_pages + 1), desc="获取材料列表"): json_data['currentPage'] = page resp = requests.post( 'https://ids-goods.f6car.com/f6-ids-goods/part/getExactPartStockInfo', cookies=cookies, json=json_data, ) resp.raise_for_status() records = resp.json().get("data", {}).get("records", []) all_materials_list.extend(records) # 第二步:构建 update_map(只存原始值,不预处理) update_map = {} for _, row in df.iterrows(): orig_code = row.iloc[0] # 原材料编码 if pd.isna(orig_code) or str(orig_code).strip() == "": continue orig_code = str(orig_code).strip() update_map[orig_code] = { "new_customCode": row.iloc[1], "new_brand": row.iloc[2], "new_name": row.iloc[3], "new_spec": row.iloc[4], } # 第三步:遍历材料,按需更新 results = [] for item in tqdm(all_materials_list, desc="处理材料更新"): custom_code = item.get("customCode") if not custom_code or str(custom_code) not in update_map: continue part_id = item.get("partId") if not part_id: results.append({'材料编码': custom_code, '状态': '缺少 partId'}) continue try: # 获取材料明细 params = { 'partId': part_id, 'customInvoiceCategory': '0', 'getThirdPlatformCode': '0', } materials_response = requests.get( 'https://ids-goods.f6car.com/f6-ids-goods/part/getPartInfo', params=params, cookies=cookies, ) if materials_response.status_code != 200: results.append({'材料编码': custom_code, '状态': f'获取明细失败: {materials_response.status_code}'}) continue detail = materials_response.json().get("data") if not detail: results.append({'材料编码': custom_code, '状态': '明细为空'}) continue updates = update_map[str(custom_code)] # 安全更新字段:仅当 Excel 提供有效值时才覆盖 if should_update(updates["new_customCode"]): detail["customCode"] = safe_str(updates["new_customCode"]) if should_update(updates["new_brand"]): detail["brand"] = safe_str(updates["new_brand"]) if should_update(updates["new_name"]): name_val = safe_str(updates["new_name"]) detail["name"] = name_val detail["showName"] = name_val # 同步 showName if should_update(updates["new_spec"]): detail["spec"] = safe_str(updates["new_spec"]) # 修复价格格式和数组(必须!) for f in ["purchasePrice", "sellPrice"]: val = detail.get(f) if isinstance(val, (int, float)): detail[f] = f"{val:.2f}" if detail.get("partBarCodeVos") is None: detail["partBarCodeVos"] = [] # 提交更新 update_resp = requests.post( 'https://ids-goods.f6car.com/f6-ids-goods/part/updateAreaPartBasicInfo', cookies=cookies, json=detail ) if update_resp.status_code == 200 and update_resp.json().get("code") == 200: results.append({'材料编码': custom_code, '状态': '修改成功'}) else: msg = update_resp.json().get("message", "未知错误") results.append({'材料编码': custom_code, '状态': f'修改失败: {msg}'}) except requests.exceptions.RequestException as e: results.append({'材料编码': custom_code, '状态': f'请求异常: {str(e)}'}) except Exception as e: results.append({'材料编码': custom_code, '状态': f'内部错误: {str(e)}'}) # 第四步:清理与回写 print({'msg': '已执行', 'msg_details': results}) logger.info(f"材料批量修改结果: {results}") try: os.remove(save_path) print(f'{save_path} 已删除') except Exception as e: logger.error(f"删除文件失败: {e}") # 回写简道云 msg = update_jiandaoyun(data, str(results)) if msg.get('msg'): approve_workflow(data) print('表单已自动提交至下一步') def batch_modify_projects(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str) -> None: """ 项目批量修改后台任务 在后台线程中批量修改项目,从 Excel 文件中读取项目编码。 执行完成后会更新简道云表单并自动提交工作流。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 cookies: 用户登录 F6 系统的 cookies 信息 df: Excel 文件读取的内容,DataFrame 格式,列顺序为: [0:原项目编码, 1:新项目编码, 2:新项目名称, 3:业务分类, 4:销项税率, 5:项目说明, 6:车辆分类, 7:工时单价, 8:工时] save_path: Excel 文件保存的地址,执行完成后会删除此文件 注意: - 无效的项目编码(None、空字符串)会被跳过 - Excel 中某字段为空(NaN 或空字符串)时,保留原始项目中的对应字段 - 执行完成后会自动删除上传的文件 - 执行结果会更新到简道云表单 """ def safe_str(val): """将值转为字符串,NaN 返回空字符串""" if pd.isna(val): return "" return str(val).strip() def safe_float(val): """安全转换为 float,失败返回 None""" if pd.isna(val): return None try: return float(val) except (ValueError, TypeError): return None def should_update(new_val): """判断是否应该用 new_val 更新:非 NaN 且非空字符串""" return not (pd.isna(new_val) or (isinstance(new_val, str) and new_val.strip() == "")) # 获取门店id org_id = get_operate_org_id(data) # 获取服务分类(用于映射业务分类名称 → pkId) init_add_resp = requests.post( 'https://ids-goods.f6car.cn/f6-ids-goods/service/initAdd', cookies=cookies, data={'idOwnOrg': org_id} ) init_add_resp.raise_for_status() service_category_list = init_add_resp.json().get("data", {}).get("serviceCategory", []) category_name_to_pk = {item["name"]: item["pkId"] for item in service_category_list} # 第一步:获取所有项目列表(分页) json_data = { 'param': '', 'name': '', 'customCode': '', 'currentPage': 1, 'pageSize': 100, 'isDel': 0, # 只查启用的 'customInvoiceCategory': 0, 'idOwnOrg': org_id, } response = requests.post( 'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList', cookies=cookies, json=json_data, ) response.raise_for_status() total_pages = response.json().get("data", {}).get("totalPages", 0) all_project_list = [] for page in tqdm(range(1, total_pages + 1), desc="获取项目列表"): json_data['currentPage'] = page resp = requests.post( 'https://ids-goods.f6car.cn/f6-ids-goods/service/getServiceList', cookies=cookies, json=json_data, ) resp.raise_for_status() records = resp.json().get("data", {}).get("records", []) all_project_list.extend(records) # 第二步:构建 update_map update_map = {} for _, row in df.iterrows(): orig_code = row.iloc[0] if pd.isna(orig_code) or str(orig_code).strip() == "": continue orig_code = str(orig_code).strip() # 计算工时费 = 单价 * 工时 price = safe_float(row.iloc[7]) work_hour = safe_float(row.iloc[8]) amount = None if price is not None and work_hour is not None: amount = round(price * work_hour, 2) # 业务分类映射 category_name = row.iloc[3] category_pk = None if should_update(category_name): cat_name_clean = safe_str(category_name) category_pk = category_name_to_pk.get(cat_name_clean) if category_pk is None: logger.warning(f"业务分类 '{cat_name_clean}' 未在系统中找到") update_map[orig_code] = { "new_customCode": row.iloc[1], # 新的自定义编码,取自当前行的第2列(索引为1) "new_name": row.iloc[2], # 新的名称,取自当前行的第3列(索引为2) "new_serviceCategoryId": category_pk, # 新的服务分类ID,由变量 category_pk 提供(通常为主键) "new_taxRate": row.iloc[4], # 新的税率,取自当前行的第5列(索引为4) "new_memo": row.iloc[5], # 新的备注信息,取自当前行的第6列(索引为5) "new_carCategoryName": row.iloc[6], # 新的车型分类名称,取自当前行的第7列(索引为6) "new_price": price, # 新的价格,由变量 price 提供(可能经过处理或计算) "new_workHour": work_hour, # 新的工时数,由变量 work_hour 提供 "new_amount": amount, # 新的金额(可能是价格 × 工时等计算结果),由变量 amount 提供 } # 第三步:遍历项目,按需更新 results = [] for item in tqdm(all_project_list, desc="处理项目更新"): custom_code = item.get("customCode") if not custom_code or str(custom_code) not in update_map: continue service_id = item.get("pkId") # 项目主键 if not service_id: results.append({'项目编码': custom_code, '状态': '缺少 pkId'}) continue try: # 获取项目明细 params = { 'serviceId': service_id, 'customInvoiceCategory': '0', } detail_resp = requests.get( 'https://ids-goods.f6car.cn/f6-ids-goods/service/viewService', params=params, cookies=cookies, ) if detail_resp.status_code != 200: results.append({'项目编码': custom_code, '状态': f'获取明细失败: {detail_resp.status_code}'}) continue detail = detail_resp.json().get("data") if not detail: results.append({'项目编码': custom_code, '状态': '明细为空'}) continue updates = update_map[str(custom_code)] # === 安全更新字段(仅非空时覆盖)=== if should_update(updates["new_customCode"]): detail["customCode"] = safe_str(updates["new_customCode"]) if should_update(updates["new_name"]): name_val = safe_str(updates["new_name"]) detail["name"] = name_val if "showName" in detail: detail["showName"] = name_val if updates["new_serviceCategoryId"] is not None: detail["serviceCategoryId"] = updates["new_serviceCategoryId"] if should_update(updates["new_taxRate"]): detail["taxRate"] = safe_str(updates["new_taxRate"]) if should_update(updates["new_memo"]): detail["memo"] = safe_str(updates["new_memo"]) if should_update(updates["new_carCategoryName"]): detail["carCategoryName"] = safe_str(updates["new_carCategoryName"]) if updates["new_price"] is not None: detail["price"] = f"{updates['new_price']:.2f}" if updates["new_workHour"] is not None: detail["workHour"] = f"{updates['new_workHour']:.2f}" if updates["new_amount"] is not None: detail["amount"] = f"{updates['new_amount']:.2f}" # === 提交更新 === update_resp = requests.post( 'https://ids-goods.f6car.cn/f6-ids-goods/service/editService', cookies=cookies, json=detail ) if update_resp.status_code == 200 and update_resp.json().get("code") == 200: results.append({'项目编码': custom_code, '状态': '修改成功'}) else: msg = update_resp.json().get("message", "未知错误") results.append({'项目编码': custom_code, '状态': f'修改失败: {msg}'}) except Exception as e: error_msg = f"异常: {str(e)}" logger.error(f"项目 {custom_code} 更新出错: {traceback.format_exc()}") results.append({'项目编码': custom_code, '状态': error_msg}) # 第四步:清理与回写 print({'msg': '已执行', 'msg_details': results}) logger.info(f"项目批量修改结果: {results}") try: os.remove(save_path) print(f'{save_path} 已删除') except Exception as e: logger.error(f"删除文件失败: {e}") # 回写简道云 msg = update_jiandaoyun(data, str(results)) if msg.get('msg'): approve_workflow(data) print('表单已自动提交至下一步')