import os import sys import pandas as pd from datetime import datetime # 获取上级目录并加入路径 nb_path = os.path.abspath('') parent_dir = os.path.dirname(nb_path) sys.path.append(parent_dir) from back_ground_module import CommonModule from log_config import configure_task_logger, configure_error_task_logger from yd_api import YDAPI from api import API from tqdm import tqdm logger = configure_task_logger() error_task_logger = configure_error_task_logger() api_instance = API() yd_api_instance = YDAPI() common_module = CommonModule() output_dir = "output" os.makedirs(output_dir, exist_ok=True) # 加载数据 # df = pd.read_csv(r"D:\Idea Project\SaaS_V1.7\test\output\expanded_yd_data.csv",encoding="gbk").astype(str) df = pd.read_excel(r"C:\Users\hp_z66\OneDrive\Desktop\门店分析新.xlsx",sheet_name="新建").astype(str) df2 = pd.read_excel( r"D:\Idea Project\SaaS_V1.7\test\output\续约服务流程_20260324165743.xlsx" ).fillna('').astype(str) # 从df中获取流程编码获取流程详细信息 # 测试注释 token = yd_api_instance.generateToken() FORMID = "FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22" appType = "APP_UYZ0KG6L0CCNV80GZ66O" systemToken = "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2" all_instance_data = [] for index, row in tqdm(df.iterrows(), total=len(df)): instance_id = row["实例ID"] instance_info = yd_api_instance.processes_instancesInfos(token, instance_id, appType, systemToken) data = instance_info.get("data") if data: # 提取 formData 中的字段并合并到外层 form_data = data.get("formData", {}) if isinstance(form_data, dict): data.update(form_data) # 手动注入实例 ID,确保映射能找到 data["实例ID"] = instance_id all_instance_data.append(data) ndf = pd.DataFrame(all_instance_data) ndf.to_csv(r"D:\Idea Project\SaaS_V1.7\\test\output\yd_process_details.csv", index=False) # 读取宜搭流程详情(已提前导出) ndf = pd.read_csv(r"D:\Idea Project\SaaS_V1.7\test\output\yd_process_details.csv") # 简道云字段中文名 → 字段ID 映射 jdy_map = { "门店编码": "_widget_1764820541661", "120天是否跟进": "_widget_1764820541628", "120天处理人": "_widget_1764820541634", "120天跟进时间": "_widget_1765352838631", "60天是否跟进": "_widget_1764820541630", "60天处理人": "_widget_1764820541635", "60天跟进时间": "_widget_1765352838632", "30天是否跟进": "_widget_1764820541632", "30天处理人": "_widget_1764820541636", "30天跟进时间": "_widget_1765352838633", "是否联系上": "_widget_1764820541638", "现阶段问题": "_widget_1764820541641", "联系情况及问题说明": "_widget_1764820541653", "潜在商机": "_widget_1764820541657", "商机详情": "_widget_1764820541659", "不续约原因": "_widget_1764820541700", "产品问题": "_widget_1764820541707", "服务问题": "_widget_1764820541709", "门店问题": "_widget_1764820541711", "价格问题": "_widget_1764820541713", "不续约具体情况说明": "_widget_1764820541702", "宜搭实例ID": "_widget_1774339442956", } # 宜搭字段ID → 简道云中文名 映射 yd_field_id_to_jdy_chinese = { "textField_ksydghqw": "门店编码", "radioField_kuntp6fm": "120天是否跟进", "textField_livc8bjj": "120天处理人", "dateField_lifr1fdv": "120天跟进时间", "radioField_kurxyhvp": "60天是否跟进", "textField_livc8bjl": "60天处理人", "dateField_lifr1fdx": "60天跟进时间", "radioField_kurxyhvq": "30天是否跟进", "textField_livc8bjm": "30天处理人", "dateField_lifr1fdy": "30天跟进时间", "radioField_l85ppdia": "是否联系上", "radioField_r3yeqvd": "现阶段问题", "textAreaField_972lhkt": "联系情况及问题说明", "radioField_ljqi5we3": "潜在商机", "textareaField_liviovx0": "商机详情", "selectField_l31clxfy": "不续约原因", "selectField_l31clxfz": "产品问题", "selectField_l31clxg0": "服务问题", "selectField_l31clxg1": "门店问题", "selectField_l31clxg2": "价格问题", "textareaField_l31clxg4": "不续约具体情况说明", "radioField_l85ppdie": "续约意愿", "实例ID":"宜搭实例ID" } # 值映射(用于标准化选项值) value_mapping = { "现阶段问题": {"暂时没有问题": "暂时无问题"}, "不续约原因": {"产品原因": "产品问题", "门店原因": "门店问题"}, "服务问题": { "联系不上小六": "联系不上运营顾问", "小六态度问题": "运营顾问态度问题", "小六业务不专业": "运营顾问业务不专业", "小六离职未能获取不续约原因": "运营顾问离职未能获取不续约原因" }, "120天是否跟进": {"小六": "主动", "系统": "自动"}, "60天是否跟进": {"小六": "主动", "系统": "自动"}, "30天是否跟进": {"小六": "主动", "系统": "自动"}, } # ======================== # 1. 获取员工姓名 → ID 映射 # ======================== payload_staff = { "api_key": "6694d3c4fcb69ca9a111a6c4", # 注意:应为 app_id,不是 api_key(根据你实际接口调整) "entry_id": "6769204a1902c9341340a1bc", } staff_resp = api_instance.entry_data_list(payload_staff) staff_id_list = staff_resp.get("data", []) # 构建映射字典:姓名 -> 员工ID name_to_staff_id = {} for item in staff_id_list: name = item.get("_widget_1734942794144", "").strip() staff_id = item.get("_widget_1734942794145", "").strip() if name and staff_id: name_to_staff_id[name] = staff_id logger.info(f"加载 {len(name_to_staff_id)} 名员工信息") # ======================== # 2. 定义哪些字段是“人员字段”(需替换为ID) # ======================== STAFF_COLUMNS_CHINESE = { "120天处理人", "60天处理人", "30天处理人", "运营顾问", "运营专家", "区域客服", } # 构建门店编码 → data_id 映射 df2["门店编码_clean"] = df2["门店编码"].astype(str).str.strip().replace('nan', '') jdy_store_map = df2.set_index("门店编码_clean")["data_id"].to_dict() date_fields_chinese = {"120天跟进时间", "60天跟进时间", "30天跟进时间"} update_records = [] for idx, row in ndf.iterrows(): yd_store_code = str(row.get("textField_ksydghqw", "")).strip() if not yd_store_code or yd_store_code == "nan": continue jdy_id = jdy_store_map.get(yd_store_code) if not jdy_id: continue # 构造 data 字段:每个字段必须是 { "value": ... } data_dict = {} for yd_field_id, jdy_chinese in yd_field_id_to_jdy_chinese.items(): raw_val = row.get(yd_field_id, "") if pd.isna(raw_val) or str(raw_val).strip().lower() in {"", "nan", "-", "无", "null"}: continue # 处理日期字段 if jdy_chinese in date_fields_chinese: try: if isinstance(raw_val, (int, float)) or ( isinstance(raw_val, str) and raw_val.replace('.', '', 1).isdigit()): ts = float(raw_val) if ts < 1e12: ts *= 1000 final_value = int(ts) else: dt = pd.to_datetime([str(raw_val)], errors='coerce')[0] if pd.isna(dt): raise ValueError("Invalid date") if dt.tz is None: dt = dt.tz_localize('Asia/Shanghai') final_value = int(dt.tz_convert('UTC').timestamp() * 1000) if not (1577836800000 <= final_value <= 1900000000000): continue except Exception as e: logger.error(f"日期转换失败 [{jdy_chinese}]: {raw_val}, {e}") continue else: str_val = str(raw_val) final_value = value_mapping.get(jdy_chinese, {}).get(str_val, str_val) # 如果是人员字段,尝试替换为员工ID if jdy_chinese in STAFF_COLUMNS_CHINESE: staff_id = name_to_staff_id.get(str_val) if staff_id: final_value = staff_id else: logger.warning(f"未找到员工ID,保留原姓名 [{jdy_chinese}]: {str_val}") jdy_field_id = jdy_map.get(jdy_chinese) if jdy_field_id: data_dict[jdy_field_id] = {"value": final_value} if data_dict: update_records.append({ "data_id": jdy_id, "data": data_dict }) # 批量发送更新请求 logger.info(f"共构造 {len(update_records)} 条更新记录") APP_ID = "675b900991ad2491c69389ca" # ENTRY_ID = "6965eec36b73376aa0b5bff8" ENTRY_ID = "6931063d64187eaf6b927557" for record in tqdm(update_records): payload = { "api_key": APP_ID, "entry_id": ENTRY_ID, # "transaction_id": str(uuid.uuid4()), # 推荐:保证幂等 "data_id": record["data_id"], "data": record["data"], "is_start_trigger": False } res = api_instance.entry_data_update(payload) # print(res)