Files
saas/test/续约待办简道云回传宜搭.py
T
2026-03-25 09:34:48 +08:00

835 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime, timezone
import pandas as pd
import os
import pytz
from typing import Dict
import requests
import json
import time
import numpy as np # 导入numpy库用于处理numpy数组
output_dir = "output" # 设置输出目录
os.makedirs(output_dir, exist_ok=True)
class Config:
JIANDAOYUN_API_TOKEN = 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN' # token
class API:
def entry_data_list(self, data: dict, replace: bool = False, max_retries: int = 20) -> Dict: # 获取多条表单数据
"""
获取多条表单数据
:param max_retries: 最大重试次数
:param replace: 是否替换字段
:param data:
api_key: 应用id
entry_id: 表单id
:return:
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 app_key
'Content-Type': 'application/json'
}
all_data_batches = [] # 用于存储每次请求返回的数据批次
last_data_id = None
exit_flag = False
while True:
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"limit": 90,
"data_id": last_data_id,
"filter": data.get('filter', None)
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
if data_get["data"]:
all_data_batches.extend(data_get['data'])
last_data_id = data_get['data'][-1].get('_id')
print(f"已获取 {len(all_data_batches)} 条数据")
break # 成功则跳出循环
else:
if 'data' not in data_get or len(data_get['data']) == 0:
exit_flag = True
break
# logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
if retries > max_retries:
# error_task_logger.error(f"任务 {last_data_id}组 连续{max_retries}次请求失败,放弃此次请求。")
all_data_batches.append(None) # 或者可以选择记录失败的payload以便后续处理
if exit_flag:
break
# 构建最终返回的字典
final_data = {
'data': all_data_batches # 'data' 键对应的值是列表的列表
}
# logger.info(f"获取了{len(all_data_batches)}条数据")
if replace:
print("进行了替换")
return_data = self.field_replacement(data, final_data) # 字段替换,由id替换为标签名
return return_data
else:
return final_data
def field_replacement(self, data: dict, data_get: dict) -> dict:
"""
字段替换,将id替换为标签名,即唯一值替换为表单中显示字段的名字
:param data: 简道云插件发送过来的data,包含表单id、数据id、应用id
:param data_get: 简道云请求的数据,一般是根据数据id获取到表单的数据
:return: 将根据数据id获取到的表单数据,进行替换,返回替换后的数据
"""
# 获取表单对应字段标签名称
widget_list = self.entry_widget_list(data)
# 检查widget_list是否有效
if not widget_list or 'widgets' not in widget_list or not isinstance(widget_list['widgets'], list):
raise ValueError("映射表没有接受到数据")
# 创建一个映射表,将_widget_名称映射到label
name_to_label = {widget['name']: widget['label'] for widget in widget_list['widgets']}
def replace_keys(obj):
"""递归替换字典中的键名"""
if isinstance(obj, dict):
new_dict = {}
for key, value in obj.items():
new_key = name_to_label.get(key, key)
new_dict[new_key] = replace_keys(value)
return new_dict
elif isinstance(obj, list):
return [replace_keys(item) for item in obj]
else:
return obj
# 复制 data_get,避免修改原始数据
data_get_copy = json.loads(json.dumps(data_get)) # 深拷贝
if 'data' in data_get_copy:
data_get_copy['data'] = replace_keys(data_get_copy['data'])
return data_get_copy
@staticmethod
def workflow_instance_get(data: dict, max_retries: int = 20) -> dict:
"""
查询实例流程信息
:param max_retries:
:param data: 简道云插件发送过来的data,包含应用id
:return: 查询简道云流程实例信息返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v6/workflow/instance/get'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"instance_id": data['data_id'],
"tasks_type": 1
}
)
print("payload:", payload)
data_get = None
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
# res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
# print( "返回结果:", data_get)
if res.status_code == 200:
break # 成功则跳出循环
else:
# logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
if retries > max_retries:
# error_task_logger.error(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。")
print("请求失败")
return data_get
@staticmethod
def entry_data_update(data: dict, max_retries: int = 20) -> dict: # 修改数据
"""
修改数据
:param max_retries: 最大重试次数,此处设置100次
:param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息
:return: 修改数据后简道云返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/update'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"data_id": data['data_id'], # 数据ID
"data": data['data']
}
)
data_get = None
retries = 0
while retries <= max_retries:
try:
res: requests.Response = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
# print(data_get)
if res.status_code == 200:
break # 成功则跳出循环
else:
# logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(10) # 在重试之间稍作停顿
if retries > max_retries:
# error_task_logger.error(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。")
continue
return data_get
class NpEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(NpEncoder, self).default(obj)
class YDAPI:
appKey = "ding5kqocon5s9oph5uq"
appSecret = "HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_"
def generateToken(self) -> str:
"""
函数功能:生成访问令牌(token)
Returns:
str: 返回生成的访问令牌字符串。此token用于后续API调用的身份验证。
"""
token_api = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
data = {
"appKey": f"{self.appKey}",
"appSecret": f'{self.appSecret}'
}
res = requests.post(token_api, json=data)
token = res.json().get('accessToken')
return token
def read_processes_instances(self, token, formUuid, page, n, appType="APP_UYZ0KG6L0CCNV80GZ66O",
systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", instanceStatus="RUNNING",
max_retries=10, delay=2, createFromTimeGMT=None, createToTimeGMT=None,
modifiedFromTimeGMT=None,
modifiedToTimeGMT=None, searchFieldJson={}):
"""
函数功能:读取流程表单的所有数据,并加入重试机制。
Args:
token (str): 登录验证token,用于API调用的身份验证。
formUuid (str): 表单唯一标识符,用于指定需要读取哪个表单的实例数据。
page (int): 分页参数,指定请求的数据页码。
n (int): 每页显示的数据条数。
appType (str): 应用类型标识符,默认为 "APP_UYZ0KG6L0CCNV80GZ66O"
systemToken (str): 系统token,默认为固定值
instanceStatus (str): 流程实例状态,默认为"RUNNING"
max_retries (int): 最大重试次数,默认为10次
delay (int): 每次重试之间的延迟秒数,默认为2秒
Returns:
dict: 返回从API获取的流程表单实例数据的JSON解析结果。
Raises:
Exception: 如果达到最大重试次数仍未成功,则抛出异常。
"""
attempt = 0
api = f'https://api.dingtalk.com/v1.0/yida/processes/instances?pageNumber={page}&pageSize={n}'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
formData = {
"appType": appType,
"systemToken": systemToken,
"userId": "yida_pub_account", # 超级管理员账号
"language": "zh_CN",
"formUuid": formUuid,
"instanceStatus": instanceStatus, # 运行中
"createFromTimeGMT": createFromTimeGMT,
"createToTimeGMT": createToTimeGMT,
"modifiedFromTimeGMT": modifiedFromTimeGMT,
"modifiedToTimeGMT": modifiedToTimeGMT,
"searchFieldJson": json.dumps(
searchFieldJson
)
}
# print(formData)
while True:
if attempt >= max_retries:
# error_task_logger.error(f"请求失败,已达最大重试次数 {max_retries},无法获取流程实例数据,跳过本次请求。")
break
try:
res = requests.post(api, headers=headers, json=formData)
# print(res.json())
res.raise_for_status() # 如果返回状态码不是2xx,抛出异常
return res.json()
except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常: {e},正在尝试第 {attempt + 1} 次重试...")
time.sleep(delay)
attempt += 1
def update_from(self, token, formInstanceId, data_new):
"""
函数功能:更新表单内容
Args:
token (str): 登录验证token,用于API调用的身份验证。
formInstanceId (str): 表单实例ID,读文件获取。
data_new (dict): 新的数据内容,用于替换现有表单实例中的数据。读文件获取。
Returns:
Response: 返回API请求的响应对象。
"""
api = f'https://api.dingtalk.com//v1.0/yida/forms/instances'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
payload = {
"appType": "APP_UYZ0KG6L0CCNV80GZ66O",
"systemToken": "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"userId": "yida_pub_account", # 曹伟 id
"language": "zh_CN",
"useLatestVersion": "false",
"formInstanceId": formInstanceId,
"updateFormDataJson": json.dumps(data_new, cls=NpEncoder),
}
res = requests.put(api, headers=headers, json=payload)
return res
def get_approval_records(self, token: str, processInstanceId: str, appType="APP_UYZ0KG6L0CCNV80GZ66O",
systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", max_retries=10, delay=2):
"""
函数功能:获取流程表单的审批记录,适用于"F6客户服务"应用,并且包含重试机制。
Args:
token (str): 登录验证token,用于API调用的身份验证。
processInstanceId (str): 流程实例ID,用于标识需要获取审批记录的具体流程实例。
appType (str): 应用类型标识符,默认为 "APP_UYZ0KG6L0CCNV80GZ66O"
systemToken (str): 系统token,默认为固定值
max_retries (int): 最大重试次数,默认为10次
delay (int): 每次重试之间的延迟秒数,默认为2秒
Returns:
dict: 返回从API获取的审批记录的JSON解析结果。通常包括审批步骤、审批人、审批时间等信息。
"""
attempt = 0
userId = "yida_pub_account"
api = f'https://api.dingtalk.com/v1.0/yida/processes/operationRecords?appType={appType}&systemToken={systemToken}&userId={userId}&language=zh_CN&processInstanceId={processInstanceId}'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
while True:
if attempt >= max_retries:
# error_task_logger.error(f"请求失败,已达最大重试次数 {max_retries},无法获取审批数据,跳过本次请求。")
break
try:
res = requests.get(api, headers=headers)
res.raise_for_status() # 如果响应状态码不是2xx,则抛出HTTPError
return res.json()
except (requests.exceptions.RequestException, Exception) as e:
# logger.warning(f"请求出现异常: {e}, 正在重试({attempt + 1}/{max_retries})...")
time.sleep(delay) # 等待指定的延迟时间后再次尝试
attempt += 1
def aggree_approval(self, token: str, taskId: str, processInstanceId: str, formData: dict, res_new):
"""_summary_
函数功能:同意审批节点 --F6客户服务 应用
Args:
token (str): 登录验证token
taskId (str): 获取到的审批节点ID
processInstanceId (str): 读取文件获得的实例ID
formData (dict): 数据样式
res_new (响应值): 从员工ID表里获取到员工名对应的员工ID
Returns:
响应值: 返回请求结果
"""
api = 'https://api.dingtalk.com/v1.0/yida/tasks/execute'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
payload = {
"outResult": "AGREE",
"appType": "APP_UYZ0KG6L0CCNV80GZ66O",
"systemToken": "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"remark": "同意(接口自动)",
"formDataJson": json.dumps(formData, cls=NpEncoder),
"processInstanceId": processInstanceId,
"userId": res_new,
"language": "zh_CN",
"taskId": int(taskId)
}
res = requests.post(api, headers=headers, json=payload)
return res
api_instance = API()
yd_api_instance = YDAPI()
class JDYToYDRenewalToDo(object):
def __init__(self):
self.yd_renewal_data_df = None
self.renewal_data_df = None
self.yd_renewal_data_list = None
self.renewal_data_list = None
self.FORMID = "FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22"
self.appType = "APP_UYZ0KG6L0CCNV80GZ66O"
self.systemToken = "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2"
self.token = yd_api_instance.generateToken()
# 简道云/宜搭节点顺序(越靠近 0 天数值越大,表示流程更靠后)
# 120 → 60 → 30 → 0
self.stage_order = {"120": 0, "60": 1, "30": 2, "0": 3}
self.follow_up_fields = {
"120天是否跟进": "_widget_1764820541628",
"120天处理人": "_widget_1764820541634",
"120天跟进时间": "_widget_1765352838631",
"60天是否跟进": "_widget_1764820541630",
"60天处理人": "_widget_1764820541635",
"60天跟进时间": "_widget_1765352838632",
"30天是否跟进": "_widget_1764820541632",
"30天处理人": "_widget_1764820541636",
"30天跟进时间": "_widget_1765352838633",
"是否联系上":"_widget_1764820541638",
"数据ID": "_id"
}
def load_all_data(self):
# 获取简道云已派发续约待办,若无数据直接返回
today_utc = datetime.now(timezone.utc).strftime("%Y-%m-%d")
payload = {"api_key": "675b900991ad2491c69389ca",
"entry_id": "6931063d64187eaf6b927557",
"filter": {"rel": "and",
"cond": [{"field": "updateTime", "type": "range", "method": "eq", "value": [today_utc]},
{"field": "_widget_1766469131897", "type": "range", "method": "eq",
"value": [""]}]},
}
renewal = api_instance.entry_data_list(payload)
self.renewal_data_list = renewal.get("data") or []
if not self.renewal_data_list:
self.renewal_data_df = pd.DataFrame()
return
self.renewal_data_df = pd.DataFrame(self.renewal_data_list)
self.renewal_data_df.to_csv(os.path.join(output_dir, "renewal_data_list.csv"))
def search_yd_renewal_data(self):
# Step 1: 根据简道云获取宜搭数据信息
if self.renewal_data_df is None or self.renewal_data_df.empty:
print("简道云无待办数据")
return []
all_data = []
for _, row in self.renewal_data_df.iterrows():
yd_data = yd_api_instance.read_processes_instances(
token=self.token,
formUuid=self.FORMID,
page=1,
n=100,
appType=self.appType,
systemToken=self.systemToken,
instanceStatus="",
searchFieldJson={"textField_ksydghqw": row["_widget_1764820541661"]},
).get("data", [])
for record in yd_data:
enriched = {**record}
enriched.update({k: row.get(v, "") for k, v in self.follow_up_fields.items()})
all_data.append(enriched)
if not all_data:
print("未获取到任何宜搭数据")
return []
df = pd.DataFrame(all_data)
df.to_csv(os.path.join(output_dir, "yd_renewal_data.csv"), index=False)
# Step 2: 过滤进行中的待办
df = df[df["instanceStatus"] == "RUNNING"].copy()
if df.empty:
print("没有 RUNNING 状态的流程实例")
return []
# Step 3: 提取过期日期和 org_code
df['expire_date'] = pd.to_datetime(
df['data'].apply(lambda x: x.get('dateField_ksirro5l')),
unit='ms',
errors='coerce'
)
df['org_code'] = df['data'].apply(lambda x: x.get('textField_ksydghqw'))
# 按 org_code 分组,每组取 expire_date 最晚的一条
yd_update_list = [] # 用于存储最终要更新的数据
for org_code, group in df.groupby('org_code'):
group_valid = group.dropna(subset=['expire_date'])
if group_valid.empty:
continue
latest_row = group_valid.loc[group_valid['expire_date'].idxmax()]
fields_to_map = {"120天是否跟进", "60天是否跟进", "30天是否跟进"}
# 组装要修改的信息:包含 processInstanceId + 所有跟进字段
update_info = {
"processInstanceId": latest_row["processInstanceId"],
"org_code": org_code,
}
# 添加跟进字段(这些已经在 enriched_record 中)
# 添加跟进字段,并对特定字段做映射
for field_name in self.follow_up_fields.keys():
value = latest_row.get(field_name)
# 处理 NaN 或 None
if pd.isna(value):
final_value = ""
else:
str_val = str(value).strip()
# 如果是需要映射的字段,进行转换
if field_name in fields_to_map:
if str_val == "主动":
final_value = "小六"
elif str_val == "自动":
final_value = "系统"
else:
final_value = str_val # 保留原始值(如“是”、“否”等)
else:
final_value = str_val
update_info[field_name] = final_value
yd_update_list.append(update_info)
# 可选:保存待更新清单
if yd_update_list:
update_df = pd.DataFrame(yd_update_list)
update_df.to_csv(os.path.join(output_dir, "yd_to_update.csv"), index=False)
return yd_update_list # 返回结构化数据供后续调用更新接口
def yd_process_data(self, yd_update_list):
TIME_FIELDS = ["120天跟进时间", "60天跟进时间", "30天跟进时间"]
# === 4. 直接遍历 yd_update_list 中的 item(字段在顶层)===
for item in yd_update_list:
# 转换UTC时间字符串为时间戳
for field in TIME_FIELDS:
if field in item and item[field]:
utc_str = str(item[field]).strip()
try:
if utc_str.endswith("Z"):
utc_str = utc_str[:-1] + "+00:00"
dt = datetime.fromisoformat(utc_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=pytz.UTC)
else:
dt = dt.astimezone(pytz.UTC)
item[field] = int(dt.timestamp() * 1000) # ⏰ 转为秒级时间戳
except Exception as e:
print(f"[时间解析错误] 字段: {field}, 值: {utc_str}, 错误: {e}")
return yd_update_list
def update_yd_renewal_data(self, yd_update_list):
print(yd_update_list)
import ast
def extract_user_id(user_obj):
# 如果是字符串,尝试解析为 dict
if isinstance(user_obj, str):
if user_obj.strip().startswith("{"):
try:
user_obj = ast.literal_eval(user_obj)
except (ValueError, SyntaxError):
return "" # 无法解析,返回空
if isinstance(user_obj, dict):
return user_obj.get("name")
elif isinstance(user_obj, str):
return user_obj # 可能已经是
else:
return ""
yd_update_list = self.yd_process_data(yd_update_list)
for item in yd_update_list:
field_mapping = {
"120天是否跟进": "radioField_kuntp6fm",
"120天处理人": "textField_livc8bjj",
"120天跟进时间": "dateField_lifr1fdv",
"60天是否跟进": "radioField_kurxyhvp",
"60天处理人": "textField_livc8bjl",
"60天跟进时间": "dateField_lifr1fdx",
"30天是否跟进": "radioField_kurxyhvq",
"30天处理人": "textField_livc8bjm",
"30天跟进时间": "dateField_lifr1fdy",
}
item["120天处理人"] = extract_user_id(item["120天处理人"])
item["60天处理人"] = extract_user_id(item["60天处理人"])
item["30天处理人"] = extract_user_id(item["30天处理人"])
update_json = {v: item.get(k, "") for k, v in field_mapping.items()}
print(f"更新数据:{update_json}")
yd_api_instance.update_from(
token=self.token,
formInstanceId=item["processInstanceId"],
data_new=update_json,
)
def update_jd_ydy_renewal_to_do_status(self, yd_update_list):
"""
简道云进度为准:流程节点分 120/60/30/0。若宜搭落后则在宜搭侧自动同意推进到与简道云一致;否则不操作。
"""
def parse_dt(val):
try:
if val is None or val == "":
return datetime.min.replace(tzinfo=timezone.utc)
# 兼容带 Z 的 UTC 写法
dt = datetime.fromisoformat(str(val).replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
except Exception:
return datetime.min.replace(tzinfo=timezone.utc)
def extract_stage_from_text(text: str):
"""
根据节点名称中包含的数字段(120/60/30/0)识别阶段,兼容“联系情况/处理情况”等后缀。
"""
text = (text or "").strip()
# 区间节点映射:120-60 视为已进入 60 阶段;60-30 视为 3030-0 视为 0
if "120-60" in text:
return "60"
if "60-30" in text:
return "30"
if "30-0" in text:
return "0"
for key in self.stage_order.keys():
if key in text:
return key
if f"{key}" in text:
return key
if f"{key}天联系" in text:
return key
if f"{key}天处理" in text:
return key
return None
def fallback_stage_from_obj(obj):
"""
在未知结构的返回中深度搜索含阶段数字的字符串,返回“最靠后的阶段”(stage_order 值最大)。
防止找到第一个 120 就返回,导致明明已进入 60 却误判为 120。
"""
found = []
def _iter(o):
if isinstance(o, dict):
for v in o.values():
yield from _iter(v)
elif isinstance(o, list):
for v in o:
yield from _iter(v)
elif isinstance(o, str):
yield o
else:
yield str(o)
for text in _iter(obj):
stage = extract_stage_from_text(text)
if stage is not None:
found.append(stage)
if not found:
return None
# 取 stage_order 中“值最大”的阶段,表示流程更靠后
return max(found, key=lambda s: self.stage_order.get(s, -1))
for item in yd_update_list:
# 1) 简道云流程日志(权威)
data = {"data_id": item["数据ID"]}
jdy_workflow_data = api_instance.workflow_instance_get(data) or {}
print("简道云流程日志:", jdy_workflow_data)
jdy_logs = jdy_workflow_data.get("logs", [])
jdy_logs = sorted(
jdy_logs,
key=lambda x: parse_dt(x.get("finish_time") or x.get("create_time")),
reverse=True,
)
tasks = jdy_workflow_data.get("tasks", []) or []
pending = [t for t in tasks if t.get("status") == 0]
task_candidate = (pending[0] if pending else None) or (sorted(
tasks,
key=lambda x: parse_dt(x.get("create_time")),
reverse=True
)[0] if tasks else {})
jdy_result_records = jdy_workflow_data.get("result", []) or []
jdy_stage_candidates = [
extract_stage_from_text(str(jdy_logs[0].get("flow_name", ""))) if jdy_logs else None,
extract_stage_from_text(
str(task_candidate.get("flow_name", "")) + str(task_candidate.get("title", ""))
) if task_candidate else None,
]
jdy_stage_candidates.extend(
extract_stage_from_text(str(rec.get("showName", "")) + str(rec.get("activityId", "")))
for rec in jdy_result_records
)
jdy_stage_candidates.append(fallback_stage_from_obj(jdy_workflow_data))
jdy_stage = next((s for s in jdy_stage_candidates if s), None)
# 2) 宜搭流程日志
yd_workflow_data = yd_api_instance.get_approval_records(
token=self.token,
processInstanceId=item["processInstanceId"],
appType=self.appType,
systemToken=self.systemToken
) or {}
print("宜搭流程日志:", yd_workflow_data)
yd_results = yd_workflow_data.get("result", []) or []
# 展开宜搭 domainList 以获取所有动作
yd_records = [
child
for rec in yd_results
for child in ([rec] + (rec.get("domainList", []) or []))
]
yd_records = sorted(
yd_records,
key=lambda x: parse_dt(x.get("operateTimeGMT") or x.get("activeTimeGMT")),
reverse=True,
)
# 优先使用当前待办(type == "TODO"),否则用最新一条
yd_todo = next((r for r in yd_records if str(r.get("type")).upper() == "TODO"), None)
yd_latest = yd_todo or (yd_records[0] if yd_records else {})
yd_stage = extract_stage_from_text(
str(yd_latest.get("showName", "")) + str(yd_latest.get("activityId", ""))
)
# 若无法识别阶段,跳过防止误操作
if jdy_stage is None or yd_stage is None:
print(f"无法识别阶段,跳过同步 data_id={item.get('数据ID')}, jdy_stage={jdy_stage}, yd_stage={yd_stage}")
continue
# 3) 比较阶段:仅在简道云领先时才跳转;否则直接跳过
jdy_idx = self.stage_order.get(jdy_stage)
yd_idx = self.stage_order.get(yd_stage)
if jdy_idx is None or yd_idx is None:
print(f"阶段映射缺失,跳过 data_id={item.get('数据ID')}, jdy_stage={jdy_stage}, yd_stage={yd_stage}")
continue
if jdy_idx == yd_idx:
print(f"阶段一致,无需跳转 data_id={item.get('数据ID')}, stage={jdy_stage}")
continue
if jdy_idx < yd_idx:
print(f"简道云未领先或已落后,跳过 data_id={item.get('数据ID')}, jdy={jdy_stage}, yd={yd_stage}")
continue
# jdy_idx > yd_idx,宜搭落后,执行自动同意
task_id = yd_latest.get("taskId")
operator_user_id = yd_latest.get("operatorUserId") or yd_latest.get("operator")
if not task_id or not operator_user_id:
print(f"缺少 taskId 或 operatorUserId,无法自动同意 processInstanceId={item['processInstanceId']}")
continue
try:
yd_api_instance.aggree_approval(
token=self.token,
taskId=task_id,
processInstanceId=item["processInstanceId"],
formData={}, # 无需修改表单时传空字典
res_new=operator_user_id
)
print(f"宜搭已自动同意至 下一个 节点,processInstanceId={item['processInstanceId']}")
except Exception as e:
print(f"宜搭自动同意失败 processInstanceId={item['processInstanceId']} err={e}")
def retrun_jdy(self, yd_update_list):
for item in yd_update_list:
data = {
"api_key": "675b900991ad2491c69389ca",
"entry_id": "6931063d64187eaf6b927557",
"data_id": item.get('数据ID'),
"data":
{
"_widget_1766469131897": {"value": ""},
}
}
api_instance.entry_data_update(data)
print(f"已返回 data_id={item.get('数据ID')}")
def main(self):
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
# step1 获取简道云与宜搭数据
self.load_all_data()
# step2 根据简道云门店编码精确搜索宜搭数据
yd_update_list = self.search_yd_renewal_data()
# step3 api修改宜搭数据
self.update_yd_renewal_data(yd_update_list)
# step4 简道云与宜搭流程节点保持一致
self.update_jd_ydy_renewal_to_do_status(yd_update_list)
# step5 简道云标记已处理
self.retrun_jdy(yd_update_list)
except Exception as e:
print(e)
if __name__ == '__main__':
jd_ydy_renewal_to_do = JDYToYDRenewalToDo()
jd_ydy_renewal_to_do.main()