续约代办历史记录迁移

This commit is contained in:
2026-03-09 09:24:10 +08:00
parent 69390fd080
commit ab0813c5ec
3 changed files with 2105 additions and 8269 deletions
File diff suppressed because it is too large Load Diff
@@ -1,59 +1,228 @@
import os
import sys
import pandas as pd
import json
import ast # 👈 新增导入
from datetime import datetime
# 获取上级目录并加入路径
nb_path = os.path.abspath('')
parent_dir = os.path.dirname(nb_path)
sys.path.append(parent_dir)
from back_ground_module import CommonModule
from log_config import configure_task_logger, configure_error_task_logger
from yd_api import YDAPI
from api import API
from tqdm.notebook import tqdm
logger = configure_task_logger()
error_task_logger = configure_error_task_logger()
api_instance = API()
yd_api_instance = YDAPI()
common_module = CommonModule()
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)
api_instance = API()
yd_api_instance = YDAPI()
# 加载数据
df = pd.read_csv(r"D:\Idea Project\SaaS_V1.7\test\output\expanded_yd_data.csv").astype(str)
df2 = pd.read_excel(
r"D:\Idea Project\SaaS_V1.7\test\output\续约服务流程_历史维修记录迁移测试_20260116110136.xlsx"
).fillna('').astype(str)
# 从df中获取流程编码获取流程详细信息 # 测试注释
token = yd_api_instance.generateToken()
FORMID = "FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22"
appType = "APP_UYZ0KG6L0CCNV80GZ66O"
systemToken = "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2"
df = pd.read_csv(os.path.join(output_dir, "converted_yd_data.csv"))
all_instance_data = []
for index, row in tqdm(df.iterrows(), total=len(df)):
instance_id = row["processInstanceId"]
instance_info = yd_api_instance.processes_instancesInfos(token, instance_id, appType, systemToken)
all_instance_data.append(instance_info.get("data"))
# 检查 data 列的第一个非空值类型
sample = df['data'].dropna().iloc[0] if not df['data'].dropna().empty else ""
print("Sample of 'data' column:")
print(repr(sample)[:200]) # 打印前200字符,看是单引号还是双引号
ndf = pd.DataFrame(all_instance_data)
ndf.to_csv(r"D:\Idea Project\SaaS_V1.7\\test\output\yd_process_details.csv")
# 如果是字符串(且是 Python dict 格式,单引号),用 ast.literal_eval
if isinstance(sample, str):
print("Detected string format, parsing with ast.literal_eval...")
# 读取宜搭流程详情(已提前导出)
ndf = pd.read_csv(r"D:\Idea Project\SaaS_V1.7\test\output\yd_process_details.csv")
# 简道云字段中文名 → 字段ID 映射
jdy_map = {
"门店编码": "_widget_1764820541661",
"120天是否跟进": "_widget_1764820541628",
"120天处理人": "_widget_1764820541634",
"120天跟进时间": "_widget_1765352838631",
"60天是否跟进": "_widget_1764820541630",
"60天处理人": "_widget_1764820541635",
"60天跟进时间": "_widget_1765352838632",
"30天是否跟进": "_widget_1764820541632",
"30天处理人": "_widget_1764820541636",
"30天跟进时间": "_widget_1765352838633",
"是否联系上": "_widget_1764820541638",
"现阶段问题": "_widget_1764820541641",
"联系情况及问题说明": "_widget_1764820541653",
"潜在商机": "_widget_1764820541657",
"商机详情": "_widget_1764820541659",
"不续约原因": "_widget_1764820541700",
"产品问题": "_widget_1764820541707",
"服务问题": "_widget_1764820541709",
"门店问题": "_widget_1764820541711",
"价格问题": "_widget_1764820541713",
"不续约具体情况说明": "_widget_1764820541702",
}
# 安全地将字符串转为字典
def safe_literal_eval(x):
if pd.isna(x) or x == "":
return {}
try:
return ast.literal_eval(x)
except (ValueError, SyntaxError) as e:
print(f"Parse error on: {repr(x)[:100]}... Error: {e}")
return {}
# 宜搭字段ID → 简道云中文名 映射
yd_field_id_to_jdy_chinese = {
"textField_ksydghqw": "门店编码",
"radioField_kuntp6fm": "120天是否跟进",
"textField_livc8bjj": "120天处理人",
"dateField_lifr1fdv": "120天跟进时间",
"radioField_kurxyhvp": "60天是否跟进",
"textField_livc8bjl": "60天处理人",
"dateField_lifr1fdx": "60天跟进时间",
"radioField_kurxyhvq": "30天是否跟进",
"textField_livc8bjm": "30天处理人",
"dateField_lifr1fdy": "30天跟进时间",
"radioField_l85ppdia": "是否联系上",
"radioField_r3yeqvd": "现阶段问题",
"textAreaField_972lhkt": "联系情况及问题说明",
"radioField_ljqi5we3": "潜在商机",
"textareaField_liviovx0": "商机详情",
"selectField_l31clxfy": "不续约原因",
"selectField_l31clxfz": "产品问题",
"selectField_l31clxg0": "服务问题",
"selectField_l31clxg1": "门店问题",
"selectField_l31clxg2": "价格问题",
"textareaField_l31clxg4": "不续约具体情况说明",
"radioField_l85ppdie": "续约意愿",
}
# 值映射(用于标准化选项值)
value_mapping = {
"现阶段问题": {"暂时没有问题": "暂时无问题"},
"不续约原因": {"产品原因": "产品问题", "门店原因": "门店问题"},
"服务问题": {
"联系不上小六": "联系不上运营顾问",
"小六态度问题": "运营顾问态度问题",
"小六业务不专业": "运营顾问业务不专业",
"小六离职未能获取不续约原因": "运营顾问离职未能获取不续约原因"
},
"120天是否跟进": {"小六": "主动", "系统": "自动"},
"60天是否跟进": {"小六": "主动", "系统": "自动"},
"30天是否跟进": {"小六": "主动", "系统": "自动"},
}
df['data'] = df['data'].apply(safe_literal_eval)
# ========================
# 1. 获取员工姓名 → ID 映射
# ========================
payload_staff = {
"api_key": "6694d3c4fcb69ca9a111a6c4", # 注意:应为 app_id,不是 api_key(根据你实际接口调整)
"entry_id": "6769204a1902c9341340a1bc",
}
staff_resp = api_instance.entry_data_list(payload_staff)
staff_id_list = staff_resp.get("data", [])
# 展开 data 列
expanded = pd.json_normalize(df['data'])
# 构建映射字典:姓名 -> 员工ID
name_to_staff_id = {}
for item in staff_id_list:
name = item.get("_widget_1734942794144", "").strip()
staff_id = item.get("_widget_1734942794145", "").strip()
if name and staff_id:
name_to_staff_id[name] = staff_id
# 保留其他列(注意:原列是 'data',不是 'raw_data'
other_cols = df.drop(columns=['data'])
logger.info(f"加载 {len(name_to_staff_id)} 名员工信息")
# 合并
new_df = pd.concat([other_cols.reset_index(drop=True), expanded.reset_index(drop=True)], axis=1).astype(str)
# 过滤进行中
new_df = new_df[new_df["instanceStatus"] == "RUNNING"]
# 订单编码为空
col = "textField_kto3q3ev"
mask2 = (
new_df[col].isnull() |
(new_df[col].astype(str).str.strip() == "")
)
new_df = new_df[mask2]
# 保存结果
new_df.to_csv(os.path.join(output_dir, "expanded_yd_data.csv"), index=False, encoding='utf-8-sig')
print("✅ Expanded data saved to 'expanded_yd_data.csv'")
# ========================
# 2. 定义哪些字段是“人员字段”(需替换为ID)
# ========================
STAFF_COLUMNS_CHINESE = {
"120天处理人",
"60天处理人",
"30天处理人",
"运营顾问",
"运营专家",
"区域客服",
}
# 构建门店编码 → data_id 映射
df2["门店编码_clean"] = df2["门店编码"].astype(str).str.strip().replace('nan', '')
jdy_store_map = df2.set_index("门店编码_clean")["data_id"].to_dict()
date_fields_chinese = {"120天跟进时间", "60天跟进时间", "30天跟进时间"}
update_records = []
for idx, row in ndf.iterrows():
yd_store_code = str(row.get("textField_ksydghqw", "")).strip()
if not yd_store_code or yd_store_code == "nan":
continue
jdy_id = jdy_store_map.get(yd_store_code)
if not jdy_id:
continue
# 构造 data 字段:每个字段必须是 { "value": ... }
data_dict = {}
for yd_field_id, jdy_chinese in yd_field_id_to_jdy_chinese.items():
raw_val = row.get(yd_field_id, "")
if pd.isna(raw_val) or str(raw_val).strip().lower() in {"", "nan", "-", "", "null"}:
continue
# 处理日期字段
if jdy_chinese in date_fields_chinese:
try:
if isinstance(raw_val, (int, float)) or (
isinstance(raw_val, str) and raw_val.replace('.', '', 1).isdigit()):
ts = float(raw_val)
if ts < 1e12:
ts *= 1000
final_value = int(ts)
else:
dt = pd.to_datetime([str(raw_val)], errors='coerce')[0]
if pd.isna(dt):
raise ValueError("Invalid date")
if dt.tz is None:
dt = dt.tz_localize('Asia/Shanghai')
final_value = int(dt.tz_convert('UTC').timestamp() * 1000)
if not (1577836800000 <= final_value <= 1900000000000):
continue
except Exception as e:
logger.error(f"日期转换失败 [{jdy_chinese}]: {raw_val}, {e}")
continue
else:
str_val = str(raw_val)
final_value = value_mapping.get(jdy_chinese, {}).get(str_val, str_val)
# 如果是人员字段,尝试替换为员工ID
if jdy_chinese in STAFF_COLUMNS_CHINESE:
staff_id = name_to_staff_id.get(str_val)
if staff_id:
final_value = staff_id
else:
logger.warning(f"未找到员工ID,保留原姓名 [{jdy_chinese}]: {str_val}")
jdy_field_id = jdy_map.get(jdy_chinese)
if jdy_field_id:
data_dict[jdy_field_id] = {"value": final_value}
if data_dict:
update_records.append({
"data_id": jdy_id,
"data": data_dict
})
# 批量发送更新请求
logger.info(f"共构造 {len(update_records)} 条更新记录")
APP_ID = "675b900991ad2491c69389ca"
ENTRY_ID = "6965eec36b73376aa0b5bff8"
# ENTRY_ID = "6931063d64187eaf6b927557"
for record in tqdm(update_records):
payload = {
"api_key": APP_ID,
"entry_id": ENTRY_ID,
# "transaction_id": str(uuid.uuid4()), # 推荐:保证幂等
"data_id": record["data_id"],
"data": record["data"],
"is_start_trigger": False
}
res = api_instance.entry_data_update(payload)
# print(res)
File diff suppressed because one or more lines are too long