Files
saas/test/续约待办一致性-全量同步.py
panda 8e57195033 应续约日与过期日对调
更新续约代表数据一致性
2026-03-31 10:41:17 +08:00

250 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import os
import re
import sys
import pandas as pd
from tqdm import tqdm
# 让 test 脚本可以 import 到项目根目录的模块(api.py / yd_api.py / log_config.py
# 将项目根目录加入模块搜索路径,确保可以 import 根目录下的 api.py/yd_api.py
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.dirname(SCRIPT_DIR)
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
from api import API
from log_config import configure_error_task_logger, configure_task_logger
from yd_api import YDAPI
logger = configure_task_logger()
error_task_logger = configure_error_task_logger()
api_instance = API()
yd_api_instance = YDAPI()
# =========================
# 需要你关注/可能要改的配置
# =========================
# 简道云:续约待办表单(目标表单)
APP_ID = "675b900991ad2491c69389ca"
ENTRY_ID = "6931063d64187eaf6b927557"
# 宜搭:续约服务流程(来源流程)
APP_TYPE = "APP_UYZ0KG6L0CCNV80GZ66O"
SYSTEM_TOKEN = "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2"
# 简道云:员工表(用于把“区域经理姓名”转成“员工ID”,如果你简道云字段是人员控件,必须传ID)
STAFF_APP_ID = "6694d3c4fcb69ca9a111a6c4"
STAFF_ENTRY_ID = "6769204a1902c9341340a1bc"
STAFF_NAME_WIDGET = "_widget_1734942794144"
STAFF_ID_WIDGET = "_widget_1734942794145"
# 输入数据源必须包含这三列:
# - data_id:简道云待办数据的 data_id
# - 门店编码:用于简单校验(对不上会提示)
# - 宜搭实例ID:用于拉取宜搭实例详情
REQUIRED_INPUT_COLUMNS = ("data_id", "门店编码", "宜搭实例ID")
if __name__ == "__main__":
# 第 1 个命令行参数:你的数据源文件路径(csv/xlsx/xls
df = pd.read_excel(fr"C:\Users\hp_z66\Downloads\续约服务流程_20260326133517.xlsx",sheet_name="简道云-宜搭实例id", dtype=str).fillna("")
# 兼容列名:有的表叫“数据ID/实例ID”,统一成脚本内部使用的列名
df = df.rename(columns={"数据ID": "data_id", "dataId": "data_id", "DataID": "data_id", "实例ID": "宜搭实例ID"})
for c in REQUIRED_INPUT_COLUMNS:
if c not in df.columns:
raise SystemExit(f"缺少必需列: {c}")
for c in REQUIRED_INPUT_COLUMNS:
df[c] = df[c].astype(str).str.strip()
df = df[(df["data_id"] != "") & (df["宜搭实例ID"] != "")]
df = df.drop_duplicates(subset=["data_id"]).reset_index(drop=True)
# 读取员工表:构建 “姓名 -> 员工ID” 映射
staff_resp = api_instance.entry_data_list({"api_key": STAFF_APP_ID, "entry_id": STAFF_ENTRY_ID}) or {}
staff_list = staff_resp.get("data", []) or []
name_to_staff_id = {}
for item in staff_list:
n = str(item.get("_widget_1734942794144", "")).strip()
i = str(item.get("_widget_1734942794145", "")).strip()
if n and i:
name_to_staff_id[n] = i
logger.info(f"员工数: {len(name_to_staff_id)}")
# 读取简道云表单字段:label(中文名) -> name(_widget_xxx)
# 这样你只要改 labels 里的中文字段名,脚本会自动找到对应 widget_id
widget_list = api_instance.entry_widget_list({"api_key": APP_ID, "entry_id": ENTRY_ID}) or {}
widgets = widget_list.get("widgets", []) or []
label_to_name = {}
for w in widgets:
l = str(w.get("label", "")).strip()
n = str(w.get("name", "")).strip()
if l and n:
label_to_name[l] = n
# 需要同步到简道云的字段(中文 label)
# 如果你简道云字段名不是这些(比如“省份/城市/区县”),就在这里改成你表单里实际的 label
labels = ["120天是否联系上客户", "60天是否联系上客户", "30天是否联系上客户", "区域经理", "服务单号", "门店ID", "公司id", "", "",]
jdy_map = {l: label_to_name.get(l) for l in labels if label_to_name.get(l)}
miss = [l for l in labels if l not in jdy_map]
if miss:
logger.warning(f"简道云缺少字段: {miss}")
logger.warning(f"已匹配字段: {list(jdy_map.keys())} | 门店ID_widget={jdy_map.get('门店ID')} 公司id_widget={jdy_map.get('公司id')}")
# 获取宜搭 token(后续拉取实例详情使用)
token = yd_api_instance.generateToken()
ok = 0
fail = 0
skip = 0
for _, row in tqdm(df.iterrows(), total=len(df)):
data_id = str(row.get("data_id", "")).strip()
instance_id = str(row.get("宜搭实例ID", "")).strip()
store_code = str(row.get("门店编码", "")).strip()
if not data_id or not instance_id:
logger.warning(f"跳过:data_id/实例ID 为空 data_id={data_id} instance_id={instance_id} 行数据={row.to_dict()}")
skip += 1
continue
try:
# 拉取宜搭实例详情(里面的 formData 才是字段数据)
info = yd_api_instance.processes_instancesInfos(token, instance_id, APP_TYPE, SYSTEM_TOKEN) or {}
container = info.get("data") if isinstance(info, dict) else None
form = {}
if isinstance(container, dict):
form = container.get("formData") or container
if not isinstance(form, dict) or not form:
logger.warning(f"跳过:宜搭实例无表单数据 instance_id={instance_id} data_id={data_id}")
skip += 1
# 简单校验:输入的门店编码 vs 宜搭表单里的门店编码,不一致就打日志提醒
# 宜搭门店编码字段IDtextField_ksydghqw(来自你旧脚本/导出的 yd_process_details.csv
yd_store = str(form.get("textField_ksydghqw", "")).strip()
if yd_store and store_code and yd_store != store_code:
logger.warning(f"门店编码不一致: {store_code} vs {yd_store} 实例:{instance_id} data_id:{data_id}")
# 从 formData 中按字段ID取值:取到第一个非空值就返回
def pick(keys):
for k in keys:
v = form.get(k)
if v is None:
continue
s = str(v).strip()
if s and s.lower() not in {"nan", "null", "none"}:
return s
return ""
# 宜搭人员字段常见是:["张三(123)"] 这种结构,这里做一个“只拿姓名”的处理
def first_name(v):
s = str(v).strip()
if s.startswith("[") and s.endswith("]"):
inner = s[1:-1].split(",", 1)[0].strip().strip("'").strip('"')
s = inner
m = re.split(r"\(", s, maxsplit=1)
return m[0].strip() if m else s
# =========================
# 下面是“宜搭字段ID -> 业务字段”的取值逻辑(你最常修改的区域)
# =========================
# 说明:
# - 120/60/30 是否联系上客户:优先各节点字段,取不到就回退用统一字段 radioField_l85ppdia
# - 区域经理:employeeField_ksydghre
# - 服务单号:textField_kuntp6fl(你历史数据是 XYFWDxxxx
# - 省/市:textField_kuj8nx00 / textField_kuj8nx01
# - 区:优先 textField_kuhnydmk;取不到就从地址 textField_ksydghrm 里截取
v120 = pick(["radioField_ksydghrf",])
v60 = pick(["radioField_kuhnydmd", ])
v30 = pick(["radioField_kuhnydn0", ])
region_name = first_name(form.get("employeeField_ksydghre", ""))
# 如果简道云“区域经理”字段是人员控件:应传员工ID;否则传姓名也能写入(取决于你表单控件类型)
region_value = name_to_staff_id.get(region_name, region_name) if region_name else ""
service_no = pick(["textField_kuntp6fl"])
# 门店ID(强烈建议你把下面 keys 改成你宜搭里“门店ID/公司id”的真实字段ID)
# 示例:store_id = pick(["textField_orgid", "textField_id_own_org"])
store_id = pick(["textField_kuntp6fk"])
prov = pick(["textField_kuj8nx00"])
city = pick(["textField_kuj8nx01"])
# 拼装简道云更新 payload:每个字段必须是 {"value": 值}
data_dict = {}
if v120 and jdy_map.get("120天是否联系上客户"):
data_dict[jdy_map["120天是否联系上客户"]] = {"value": v120}
if v60 and jdy_map.get("60天是否联系上客户"):
data_dict[jdy_map["60天是否联系上客户"]] = {"value": v60}
if v30 and jdy_map.get("30天是否联系上客户"):
data_dict[jdy_map["30天是否联系上客户"]] = {"value": v30}
if region_value and jdy_map.get("区域经理"):
data_dict[jdy_map["区域经理"]] = {"value": region_value}
if service_no and jdy_map.get("服务单号"):
data_dict[jdy_map["服务单号"]] = {"value": service_no}
# 门店ID优先写入“门店ID”,如果表单没有此字段则回退写入“公司id”
if store_id:
if jdy_map.get("门店ID"):
data_dict[jdy_map["门店ID"]] = {"value": store_id}
elif jdy_map.get("公司id"):
data_dict[jdy_map["公司id"]] = {"value": store_id}
if prov and jdy_map.get(""):
data_dict[jdy_map[""]] = {"value": prov}
if city and jdy_map.get(""):
data_dict[jdy_map[""]] = {"value": city}
if not data_dict:
reasons = []
if not v120:
reasons.append("120天:值空")
elif not jdy_map.get("120天是否联系上客户"):
reasons.append("120天:未映射")
if not v60:
reasons.append("60天:值空")
elif not jdy_map.get("60天是否联系上客户"):
reasons.append("60天:未映射")
if not v30:
reasons.append("30天:值空")
elif not jdy_map.get("30天是否联系上客户"):
reasons.append("30天:未映射")
if not region_value:
reasons.append("区域经理:值空")
elif not jdy_map.get("区域经理"):
reasons.append("区域经理:未映射")
if not service_no:
reasons.append("服务单号:值空")
elif not jdy_map.get("服务单号"):
reasons.append("服务单号:未映射")
if not store_id:
reasons.append("门店ID:值空")
elif not (jdy_map.get("门店ID") or jdy_map.get("公司id")):
reasons.append("门店ID:表单无对应字段(门店ID/公司id)")
if not prov:
reasons.append("省:值空")
elif not jdy_map.get(""):
reasons.append("省:未映射")
if not city:
reasons.append("市:值空")
elif not jdy_map.get(""):
reasons.append("市:未映射")
logger.warning(
f"跳过 data_id:{data_id} 实例:{instance_id} | 原因: {', '.join(reasons)} | "
f"取值: v120={v120} v60={v60} v30={v30} region_name={region_name} region_value={region_value} "
f"service_no={service_no} store_id={store_id} prov={prov} city={city}"
)
skip += 1
continue
# 更新简道云数据(同你旧脚本)
payload = {
"api_key": APP_ID,
"entry_id": ENTRY_ID,
"data_id": data_id,
"data": data_dict,
"is_start_trigger": False,## 目前宜搭有通知
}
res = api_instance.entry_data_update(payload)
# 兼容两种返回格式:有的返回 {'status':'success',...},有的直接返回 {'data':{...}}
if isinstance(res, dict) and (res.get("status") == "success" or isinstance(res.get("data"), dict)):
ok += 1
else:
fail += 1
logger.warning(f"更新失败 data_id:{data_id} 实例:{instance_id} 返回:{res}")
except Exception as e:
fail += 1
error_task_logger.error(f"同步异常 data_id:{data_id} 实例:{instance_id} 错误:{e}", exc_info=True)
logger.info(f"完成 同步成功:{ok} 失败:{fail} 跳过:{skip}")