非标业绩提报、合伙人结算登记字段修改

续约回访 宜搭同步简道云辅助脚本 简道云同步宜搭辅助脚本
This commit is contained in:
z66
2025-12-25 14:56:45 +08:00
parent a3541ab5e1
commit ab434f6c4c
15 changed files with 2327 additions and 38 deletions
+20
View File
@@ -0,0 +1,20 @@
from yd_api import YDAPI
yd_api_instance = YDAPI()
token = yd_api_instance.generateToken()
update_json = {
"textField_kto3q3ev": "594561",
"dateField_kto3q3ex": "1766557929000",
"textField_kyjy1kkm": "9987",
"textField_kyjy1kkn": "续约",
}
res = yd_api_instance.update_from(
token=token,
formInstanceId="ef7aabe7-4931-4271-823f-f9a43bc516b2",
data_new=update_json,
)
print(res.json())
+71
View File
@@ -0,0 +1,71 @@
{
"cells": [
{
"cell_type": "code",
"id": "initial_id",
"metadata": {
"collapsed": true,
"ExecuteTime": {
"end_time": "2025-12-24T06:28:49.052568Z",
"start_time": "2025-12-24T06:28:48.336385200Z"
}
},
"source": [
"\n",
"from yd_api import YDAPI\n",
"\n",
"yd_api_instance = YDAPI()\n",
"token = yd_api_instance.generateToken()\n",
"\n",
"update_json = {\n",
" \"data\": {\n",
" \"textField_kto3q3ev\": \"594561\",\n",
" \"dateField_kto3q3ex\": \"1766557690\",\n",
" \"textField_kyjy1kkm\": \"9987\",\n",
" \"textField_kyjy1kkn\": \"续约\",\n",
" }\n",
"}\n",
"yd_api_instance.update_from(\n",
" token=token,\n",
" formInstanceId=\"ef7aabe7-4931-4271-823f-f9a43bc516b2\",\n",
" data_new=update_json,\n",
" )"
],
"outputs": [
{
"ename": "ModuleNotFoundError",
"evalue": "No module named 'yd_api'",
"output_type": "error",
"traceback": [
"\u001B[31m---------------------------------------------------------------------------\u001B[39m",
"\u001B[31mModuleNotFoundError\u001B[39m Traceback (most recent call last)",
"\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[1]\u001B[39m\u001B[32m, line 1\u001B[39m\n\u001B[32m----> \u001B[39m\u001B[32m1\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01myd_api\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m YDAPI\n\u001B[32m 3\u001B[39m yd_api_instance = YDAPI()\n\u001B[32m 4\u001B[39m token = yd_api_instance.generateToken()\n",
"\u001B[31mModuleNotFoundError\u001B[39m: No module named 'yd_api'"
]
}
],
"execution_count": 1
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
+16
View File
@@ -0,0 +1,16 @@
from yd_api import YDAPI
yd_api_instance = YDAPI()
token = yd_api_instance.generateToken()
{'appType': 'APP_UYZ0KG6L0CCNV80GZ66O', 'systemToken': 'XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2',
'userId': 'yida_pub_account', 'language': 'zh_CN', 'formUuid': 'FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22',
'formInstanceIdList': ['CHS444555666']}
res = yd_api_instance.get_ids_query(
token=token,
formInstanceIdList=["ef7aabe7-4931-4271-823f-f9a43bc516b2"],
formUuid="FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22",
appType="APP_UYZ0KG6L0CCNV80GZ66O",
systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
)
print(res)
+693
View File
@@ -0,0 +1,693 @@
import os
from datetime import datetime, timedelta, timezone
import pandas as pd
from tqdm import tqdm
from datetime import datetime, timezone
import pandas as pd
import os
from typing import Dict
import requests
import json
import time
import numpy as np # 导入numpy库用于处理numpy数组
output_dir = "output" # 设置输出目录
os.makedirs(output_dir, exist_ok=True)
class Config:
JIANDAOYUN_API_TOKEN = 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN' # token
class API:
def entry_data_list(self, data: dict, replace: bool = False, max_retries: int = 20) -> Dict: # 获取多条表单数据
"""
获取多条表单数据
:param max_retries: 最大重试次数
:param replace: 是否替换字段
:param data:
api_key: 应用id
entry_id: 表单id
:return:
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 app_key
'Content-Type': 'application/json'
}
all_data_batches = [] # 用于存储每次请求返回的数据批次
last_data_id = None
exit_flag = False
while True:
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"limit": 90,
"data_id": last_data_id,
"filter": data.get('filter', None)
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
if data_get["data"]:
all_data_batches.extend(data_get['data'])
last_data_id = data_get['data'][-1].get('_id')
print(f"已获取 {len(all_data_batches)} 条数据")
break # 成功则跳出循环
else:
if 'data' not in data_get or len(data_get['data']) == 0:
exit_flag = True
break
# logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
if retries > max_retries:
# error_task_logger.error(f"任务 {last_data_id}组 连续{max_retries}次请求失败,放弃此次请求。")
all_data_batches.append(None) # 或者可以选择记录失败的payload以便后续处理
if exit_flag:
break
# 构建最终返回的字典
final_data = {
'data': all_data_batches # 'data' 键对应的值是列表的列表
}
# logger.info(f"获取了{len(all_data_batches)}条数据")
if replace:
print("进行了替换")
return_data = self.field_replacement(data, final_data) # 字段替换,由id替换为标签名
return return_data
else:
return final_data
def field_replacement(self, data: dict, data_get: dict) -> dict:
"""
字段替换,将id替换为标签名,即唯一值替换为表单中显示字段的名字
:param data: 简道云插件发送过来的data,包含表单id、数据id、应用id
:param data_get: 简道云请求的数据,一般是根据数据id获取到表单的数据
:return: 将根据数据id获取到的表单数据,进行替换,返回替换后的数据
"""
# 获取表单对应字段标签名称
widget_list = self.entry_widget_list(data)
# 检查widget_list是否有效
if not widget_list or 'widgets' not in widget_list or not isinstance(widget_list['widgets'], list):
raise ValueError("映射表没有接受到数据")
# 创建一个映射表,将_widget_名称映射到label
name_to_label = {widget['name']: widget['label'] for widget in widget_list['widgets']}
def replace_keys(obj):
"""递归替换字典中的键名"""
if isinstance(obj, dict):
new_dict = {}
for key, value in obj.items():
new_key = name_to_label.get(key, key)
new_dict[new_key] = replace_keys(value)
return new_dict
elif isinstance(obj, list):
return [replace_keys(item) for item in obj]
else:
return obj
# 复制 data_get,避免修改原始数据
data_get_copy = json.loads(json.dumps(data_get)) # 深拷贝
if 'data' in data_get_copy:
data_get_copy['data'] = replace_keys(data_get_copy['data'])
return data_get_copy
@staticmethod
def workflow_instance_get(data: dict, max_retries: int = 20) -> dict:
"""
查询实例流程信息
:param max_retries:
:param data: 简道云插件发送过来的data,包含应用id
:return: 查询简道云流程实例信息返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v6/workflow/instance/get'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"instance_id": data['data_id'],
"tasks_type": 1
}
)
print("payload:", payload)
data_get = None
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
# res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
# print( "返回结果:", data_get)
if res.status_code == 200:
break # 成功则跳出循环
else:
# logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
if retries > max_retries:
# error_task_logger.error(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。")
print("请求失败")
return data_get
@staticmethod
def entry_data_update(data: dict, max_retries: int = 20) -> dict: # 修改数据
"""
修改数据
:param max_retries: 最大重试次数,此处设置100次
:param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息
:return: 修改数据后简道云返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/update'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"data_id": data['data_id'], # 数据ID
"data": data['data']
}
)
data_get = None
retries = 0
while retries <= max_retries:
try:
res: requests.Response = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
# print(data_get)
if res.status_code == 200:
break # 成功则跳出循环
else:
# logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(10) # 在重试之间稍作停顿
if retries > max_retries:
# error_task_logger.error(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。")
continue
return data_get
@staticmethod
def workflow_instance_end(data: dict, max_retries: int = 20) -> dict:
"""
关闭流程
:param max_retries:
:param data: 简道云插件发送过来的data,包含应用id
:return: 查询简道云流程实例信息返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v1/workflow/instance/close'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"instance_id": data['data_id'],
}
)
print("payload:", payload)
data_get = None
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
# res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
print("返回结果:", data_get)
if res.status_code == 200:
break # 成功则跳出循环
else:
retries += 1
time.sleep(3) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
return data_get
class NpEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(NpEncoder, self).default(obj)
class YDAPI:
appKey = "ding5kqocon5s9oph5uq"
appSecret = "HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_"
@staticmethod
def get_ids_query(token, formUuid, appType, systemToken, formInstanceIdList=None, max_retries=10, delay=2):
"""
函数功能:读取表单的所有数据,并加入重试机制。
Args:
token (str): 登录验证token,用于API调用的身份验证。
formUuid (str): 表单唯一标识符,用于指定需要读取哪个表单的实例数据。
page (int): 分页参数,指定请求的数据页码。
n (int): 每页显示的数据条数。
appType (str): 应用类型标识符,默认为 "APP_UYZ0KG6L0CCNV80GZ66O"
systemToken (str): 系统token,默认为固定值
instanceStatus (str): 流程实例状态,默认为"RUNNING"
max_retries (int): 最大重试次数,默认为10次
delay (int): 每次重试之间的延迟秒数,默认为2秒
Returns:
dict: 返回从API获取的流程表单实例数据的JSON解析结果。
Raises:
Exception: 如果达到最大重试次数仍未成功,则抛出异常。
"""
attempt = 0
api = f'https://api.dingtalk.com/v1.0/yida/forms/instances/ids/query'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
formData = {
"appType": appType,
"systemToken": systemToken,
"userId": "yida_pub_account", # 超级管理员账号
"language": "zh_CN",
"formUuid": formUuid,
"formInstanceIdList": formInstanceIdList,
}
# print(formData)
while True:
if attempt >= max_retries:
break
try:
res = requests.post(api, headers=headers, json=formData)
# print(res.json())
res.raise_for_status() # 如果返回状态码不是2xx,抛出异常
return res.json()
except requests.exceptions.RequestException as e:
time.sleep(delay)
attempt += 1
def generateToken(self) -> str:
"""
函数功能:生成访问令牌(token)
Returns:
str: 返回生成的访问令牌字符串。此token用于后续API调用的身份验证。
"""
token_api = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
data = {
"appKey": f"{self.appKey}",
"appSecret": f'{self.appSecret}'
}
res = requests.post(token_api, json=data)
token = res.json().get('accessToken')
return token
def read_processes_instances(self, token, formUuid, page, n, appType="APP_UYZ0KG6L0CCNV80GZ66O",
systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", instanceStatus="RUNNING",
max_retries=10, delay=2, createFromTimeGMT=None, createToTimeGMT=None,
modifiedFromTimeGMT=None,
modifiedToTimeGMT=None, searchFieldJson={}):
"""
函数功能:读取流程表单的所有数据,并加入重试机制。
Args:
token (str): 登录验证token,用于API调用的身份验证。
formUuid (str): 表单唯一标识符,用于指定需要读取哪个表单的实例数据。
page (int): 分页参数,指定请求的数据页码。
n (int): 每页显示的数据条数。
appType (str): 应用类型标识符,默认为 "APP_UYZ0KG6L0CCNV80GZ66O"
systemToken (str): 系统token,默认为固定值
instanceStatus (str): 流程实例状态,默认为"RUNNING"
max_retries (int): 最大重试次数,默认为10次
delay (int): 每次重试之间的延迟秒数,默认为2秒
Returns:
dict: 返回从API获取的流程表单实例数据的JSON解析结果。
Raises:
Exception: 如果达到最大重试次数仍未成功,则抛出异常。
"""
attempt = 0
api = f'https://api.dingtalk.com/v1.0/yida/processes/instances?pageNumber={page}&pageSize={n}'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
formData = {
"appType": appType,
"systemToken": systemToken,
"userId": "yida_pub_account", # 超级管理员账号
"language": "zh_CN",
"formUuid": formUuid,
"instanceStatus": instanceStatus, # 运行中
"createFromTimeGMT": createFromTimeGMT,
"createToTimeGMT": createToTimeGMT,
"modifiedFromTimeGMT": modifiedFromTimeGMT,
"modifiedToTimeGMT": modifiedToTimeGMT,
"searchFieldJson": json.dumps(
searchFieldJson
)
}
# print(formData)
while True:
if attempt >= max_retries:
# error_task_logger.error(f"请求失败,已达最大重试次数 {max_retries},无法获取流程实例数据,跳过本次请求。")
break
try:
res = requests.post(api, headers=headers, json=formData)
# print(res.json())
res.raise_for_status() # 如果返回状态码不是2xx,抛出异常
return res.json()
except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常: {e},正在尝试第 {attempt + 1} 次重试...")
time.sleep(delay)
attempt += 1
def update_from(self, token, formInstanceId, data_new):
"""
函数功能:更新表单内容
Args:
token (str): 登录验证token,用于API调用的身份验证。
formInstanceId (str): 表单实例ID,读文件获取。
data_new (dict): 新的数据内容,用于替换现有表单实例中的数据。读文件获取。
Returns:
Response: 返回API请求的响应对象。
"""
api = f'https://api.dingtalk.com//v1.0/yida/forms/instances'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
payload = {
"appType": "APP_UYZ0KG6L0CCNV80GZ66O",
"systemToken": "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"userId": "yida_pub_account", # 曹伟 id
"language": "zh_CN",
"useLatestVersion": "false",
"formInstanceId": formInstanceId,
"updateFormDataJson": json.dumps(data_new, cls=NpEncoder),
}
res = requests.put(api, headers=headers, json=payload)
return res
def get_approval_records(self, token: str, processInstanceId: str, appType="APP_UYZ0KG6L0CCNV80GZ66O",
systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", max_retries=10, delay=2):
"""
函数功能:获取流程表单的审批记录,适用于"F6客户服务"应用,并且包含重试机制。
Args:
token (str): 登录验证token,用于API调用的身份验证。
processInstanceId (str): 流程实例ID,用于标识需要获取审批记录的具体流程实例。
appType (str): 应用类型标识符,默认为 "APP_UYZ0KG6L0CCNV80GZ66O"
systemToken (str): 系统token,默认为固定值
max_retries (int): 最大重试次数,默认为10次
delay (int): 每次重试之间的延迟秒数,默认为2秒
Returns:
dict: 返回从API获取的审批记录的JSON解析结果。通常包括审批步骤、审批人、审批时间等信息。
"""
attempt = 0
userId = "yida_pub_account"
api = f'https://api.dingtalk.com/v1.0/yida/processes/operationRecords?appType={appType}&systemToken={systemToken}&userId={userId}&language=zh_CN&processInstanceId={processInstanceId}'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
while True:
if attempt >= max_retries:
# error_task_logger.error(f"请求失败,已达最大重试次数 {max_retries},无法获取审批数据,跳过本次请求。")
break
try:
res = requests.get(api, headers=headers)
res.raise_for_status() # 如果响应状态码不是2xx,则抛出HTTPError
return res.json()
except (requests.exceptions.RequestException, Exception) as e:
# logger.warning(f"请求出现异常: {e}, 正在重试({attempt + 1}/{max_retries})...")
time.sleep(delay) # 等待指定的延迟时间后再次尝试
attempt += 1
def aggree_approval(self, token: str, taskId: str, processInstanceId: str, formData: dict, res_new):
"""_summary_
函数功能:同意审批节点 --F6客户服务 应用
Args:
token (str): 登录验证token
taskId (str): 获取到的审批节点ID
processInstanceId (str): 读取文件获得的实例ID
formData (dict): 数据样式
res_new (响应值): 从员工ID表里获取到员工名对应的员工ID
Returns:
响应值: 返回请求结果
"""
api = 'https://api.dingtalk.com/v1.0/yida/tasks/execute'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
payload = {
"outResult": "AGREE",
"appType": "APP_UYZ0KG6L0CCNV80GZ66O",
"systemToken": "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"remark": "同意(接口自动)",
"formDataJson": json.dumps(formData, cls=NpEncoder),
"processInstanceId": processInstanceId,
"userId": res_new,
"language": "zh_CN",
"taskId": int(taskId)
}
res = requests.post(api, headers=headers, json=payload)
return res
api_instance = API()
yd_api_instance = YDAPI()
class YDToJDYRenewalToDo(object):
def __init__(self):
self.FORMID = "FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22"
self.appType = "APP_UYZ0KG6L0CCNV80GZ66O"
self.systemToken = "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2"
self.token = yd_api_instance.generateToken()
def load_all_data(self):
today_midnight = datetime.now() + timedelta(days=1)
yesterday_midnight = today_midnight - timedelta(days=2)
start_time = yesterday_midnight.strftime("%Y-%m-%d") # 昨天
end_time = today_midnight.strftime("%Y-%m-%d") # 今天
yd_data = yd_api_instance.read_processes_instances(
token=self.token,
formUuid=self.FORMID,
page=1,
n=100,
appType=self.appType,
systemToken=self.systemToken,
instanceStatus="",
modifiedFromTimeGMT=start_time,
modifiedToTimeGMT=end_time,
)
all_process_list = []
PAGES_two = yd_data.get('totalCount') // 100 + 1
for a in tqdm(range(1, PAGES_two + 1)):
try:
yd_data = yd_api_instance.read_processes_instances(
token=self.token,
formUuid=self.FORMID,
page=a,
n=100,
appType=self.appType,
systemToken=self.systemToken,
instanceStatus="",
modifiedFromTimeGMT=start_time,
modifiedToTimeGMT=end_time,
)
all_process_list = all_process_list + yd_data.get("data")
except Exception as e:
print(f"获取流程实例数据时出错: {e}")
continue
df = pd.DataFrame(all_process_list)
df.to_csv(f"{output_dir}/{start_time}_{end_time}_all_process_list.csv", index=False)
return all_process_list
def filter_renewal_data(self, all_process_list):
update_data_list = []
for item in all_process_list:
if item.get("data").get("textField_kto3q3ev"):
org_id = item.get("data").get("textField_ksydghqw")
order_code = item.get("data").get("textField_kto3q3ev") # 订单编码
pay_time = item.get("data").get("dateField_kto3q3ex") # 订单支付日期
res = yd_api_instance.get_ids_query(
token=self.token,
formInstanceIdList=[item.get("processInstanceId")],
formUuid="FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22",
appType="APP_UYZ0KG6L0CCNV80GZ66O",
systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
)
payment_amount = None
bussiness_type = None
# print(res)
if res.get("result"):
form_data = res["result"][0]["formData"]
payment_amount = form_data.get("textField_kyjy1kkm") # "9987"
bussiness_type = form_data.get("textField_kyjy1kkn") # "续约"
# payment_amount = res.get("data").get("textField_kyjy1kkm") # 支付金额
# bussiness_type = res.get("data").get("textField_kyjy1kkn") # 业务类型
if pay_time:
# 确保是整数
timestamp_ms = int(pay_time)
# 转为 UTC datetime 对象
pay_datetime_utc = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
print(pay_datetime_utc) # 例如: 2024-04-05 12:34:38.901000+00:00
else:
pay_datetime_utc = None
update_data_list.append({
"order_code": order_code,
"pay_time": pay_time,
"payment_amount": payment_amount,
"bussiness_type": bussiness_type,
"org_id": org_id
})
return update_data_list
def check_jd_ydy_data(self, update_data_list):
for item in update_data_list:
# 简道云搜索门店编码
payload = {
"api_key": "675b900991ad2491c69389ca",
"entry_id": "6931063d64187eaf6b927557",
"filter": {
"rel": "and",
"cond": [{
"field": "_widget_1764820541661",
"type": "text",
"method": "eq",
"value": [item.get("org_id")]
}, {"field": "flowState",
"type": "flowstate",
"method": "eq",
"value": [0]}]
}
}
# print(payload)
data_list = api_instance.entry_data_list(payload).get("data", [])
result = data_list[0] if data_list else None
data_id = result.get("_id") if result else None
if not data_id:
# print(f"未找到订单 {item.get('order_code')} 的简道云数据,跳过。")
continue
# 查询实例状态
instance_status = api_instance.workflow_instance_get({"data_id": data_id})
task_status = instance_status.get("status", -1)
print(f"订单 {item.get('order_code')} 的简道云流程状态为 {task_status}")
if task_status == 0:
print("简道云流程正在进行中,执行流程关闭")
api_instance.workflow_instance_end({"data_id": data_id})
# 同步宜搭四个字段
print("开始同步数据")
update_payload = {
"api_key": "675b900991ad2491c69389ca",
"entry_id": "6931063d64187eaf6b927557",
"data_id": data_id,
"data": {
"_widget_1764820541674": {"value": item.get("order_code")}, # 订单编码
"_widget_1764820541679": {"value": item.get("pay_time")}, # 订单支付日期
"_widget_1764820541676": {"value": item.get("payment_amount")}, # 支付金额
"_widget_1764820541680": {"value": item.get("bussiness_type")}, # 业务类型
}
}
print(update_payload)
api_instance.entry_data_update(update_payload)
print("数据同步完成")
def main(self):
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
# step1 获取简道云与宜搭数据
jd_ydy_data = self.load_all_data()
# step2 过滤已经续约的单子
update_data_list = self.filter_renewal_data(jd_ydy_data)
# step3 校验简道云是否有进行中的单子并关闭
self.check_jd_ydy_data(update_data_list)
except Exception as e:
print(e)
if __name__ == '__main__':
jd_ydy_renewal_to_do = YDToJDYRenewalToDo()
jd_ydy_renewal_to_do.main()
+7 -6
View File
@@ -83,6 +83,8 @@ class RenewalToDo:
"当前所处节点": "_widget_1765352838609",
"流程状态": "_widget_1765352838610",
"经营模式": "_widget_1765964381952",
"公司等级": "_widget_1766130435561",
"公司id": "_widget_1766631811839",
"提交人": "creator",
"提交时间": "createTime",
"更新时间": "updateTime"
@@ -97,8 +99,10 @@ class RenewalToDo:
"contacts": "联系人",
"contact_mobile": "联系手机号",
"service_impl_principal": "专属运营顾问",
"group_grade": "公司等级",
"technician": "运营专家",
"manage_model": "经营模式",
"id_own_group": "公司id",
}
self.subform_field_map = {
"商品名称": "_widget_1764820541719",
@@ -153,8 +157,8 @@ class RenewalToDo:
self.province_staff_id_list = []
# 获取已派发续约待办(进行中)
payload = {"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "6769204a1902c9341340a1bc",
payload = {"api_key": "675b900991ad2491c69389ca",
"entry_id": "6931063d64187eaf6b927557",
"filter": {"rel": "and",
"cond": [{"field": "flowState", "type": "flowstate", "method": "eq", "value": [0]}]},
}
@@ -360,7 +364,7 @@ class RenewalToDo:
pd.to_numeric(df_lp['价格'], errors='coerce')
.round().fillna(0).astype(int).astype(str)
)
df_lp['类型_价格'] = df_lp['类型'] + df_lp['价格']
df_lp['类型_价格'] = df_lp['类型'] + ':' + df_lp['价格']
# 2. 按门店聚合,分号连接
agg_df = df_lp.groupby('门店编码', as_index=False)['类型_价格'].apply(';'.join)
@@ -499,9 +503,6 @@ class RenewalToDo:
data_NGV = self.process_data()
# step3:数据派发
self.dispatch_task(data_NGV)
# step4:过期日发生变化更新已有表单
# step5:自动同意原表单
common_module.send_task_status(task_start_time, "续约回访待办")
except Exception as e:
+516
View File
@@ -0,0 +1,516 @@
import os
from datetime import datetime
import pandas as pd
from api import API
from back_ground_module import CommonModule
from log_config import configure_task_logger, configure_error_task_logger
from collections import defaultdict
logger = configure_task_logger()
error_task_logger = configure_error_task_logger()
api_instance = API()
common_module = CommonModule()
output_dir = "output" # 设置输出目录
os.makedirs(output_dir, exist_ok=True)
class RenewalToDo:
def __init__(self):
self.renewal_data_list = None
self.cyclic_increasing = None
self.franchisee = None
self.last_price = None
self.province_staff_id_list = None
self.json_list = None
self.data_NGV = None
self.staff_id_list = None
self.NGV_data_list = None
self.field_map = {
"关联数据": "_widget_1764820541663",
"公司名称": "_widget_1764820541616",
"门店名称": "_widget_1764820541617",
"门店编码": "_widget_1764820541661",
"加盟商": "_widget_1764820541618",
"过期日": "_widget_1764820541672",
"Saas版本": "_widget_1764820541623",
"上次购买价格": "_widget_1764820541624",
"联系人": "_widget_1764820541621",
"联系手机号": "_widget_1764820541622",
"专属运营顾问": "_widget_1764820541625",
"区域客服": "_widget_1764820541715",
"运营专家": "_widget_1764820541678",
"120天是否跟进": "_widget_1764820541628",
"120天处理人": "_widget_1764820541634",
"120天跟进时间": "_widget_1765352838631",
"60天是否跟进": "_widget_1764820541630",
"60天处理人": "_widget_1764820541635",
"60天跟进时间": "_widget_1765352838632",
"30天是否跟进": "_widget_1764820541632",
"30天处理人": "_widget_1764820541636",
"30天跟进时间": "_widget_1765352838633",
"是否联系上客户": "_widget_1764820541638",
"客户现阶段问题分类": "_widget_1764820541641",
"未联系上原因字段": "_widget_1765330820509",
"联系情况及问题说明": "_widget_1764820541653",
"潜在商机": "_widget_1764820541657",
"商机详情": "_widget_1764820541659",
"门店续约意愿": "_widget_1764820541654",
"不续约原因": "_widget_1764820541700",
"产品原因": "_widget_1764820541707",
"服务问题": "_widget_1764820541709",
"门店原因": "_widget_1764820541711",
"价格原因": "_widget_1764820541713",
"不续约具体情况说明": "_widget_1764820541702",
"回访完成方式": "_widget_1764820541697",
"周期性增购": "_widget_1764820541717",
"周期性增购.商品名称": "_widget_1764820541717._widget_1764820541719",
"周期性增购.分母金额": "_widget_1764820541717._widget_1764820541720",
"周期性增购.应续约日": "_widget_1764820541717._widget_1764820541721",
"周期性增购.上次购买数量": "_widget_1764820541717._widget_1764820541722",
"周期性增购.不续约原因": "_widget_1764820541717._widget_1764820541723",
"周期性增购.是否愿意续约": "_widget_1764820541717._widget_1764820541724",
"周期性增购.续约后订单编码": "_widget_1764820541717._widget_1764820541725",
"订单编码": "_widget_1764820541674",
"订单支付日期": "_widget_1764820541679",
"本次-实付金额(元)": "_widget_1764820541676",
"业务类型(续约、升级)": "_widget_1764820541680",
"连锁门店待办同步处理": "_widget_1764820541681",
"选择需要同步的门店名称": "_widget_1765330820391",
"120天自动流转时间": "_widget_1764820541865",
"60天自动流转时间": "_widget_1765964381895",
"30天自动流转时间": "_widget_1765964381896",
"0天自动流转时间": "_widget_1765964381897",
"当前所处节点": "_widget_1765352838609",
"流程状态": "_widget_1765352838610",
"经营模式": "_widget_1765964381952",
"公司等级": "_widget_1766130435561",
"提交人": "creator",
"提交时间": "createTime",
"更新时间": "updateTime"
}
self.cn_field_map = {
"related_data": "关联数据",
"group_name": "公司名称",
"org_name": "门店名称",
"org_code": "门店编码",
"expiry_time": "过期日",
"saas_edition_fmt": "Saas版本",
"contacts": "联系人",
"contact_mobile": "联系手机号",
"service_impl_principal": "专属运营顾问",
"group_grade": "公司等级",
"technician": "运营专家",
"manage_model": "经营模式",
}
self.subform_field_map = {
"商品名称": "_widget_1764820541719",
"分母金额": "_widget_1764820541720",
"应续约日": "_widget_1764820541721",
"上次购买数量": "_widget_1764820541722",
"不续约原因": "_widget_1764820541723",
"是否愿意续约": "_widget_1764820541724",
"续约后订单编码": "_widget_1764820541725",
# 根据实际需要添加更多字段
}
self.renewal_list_map ={
}
def load_all_data(self):
"""
从各类来源加载数据上加载数据
:return:
"""
# 数据库获取续约回访数据
self.data_NGV = pd.read_csv(os.path.join(output_dir, "data_NGV.csv"), encoding="gbk")
# 获取加盟商信息
self.franchisee = common_module.get_renewal_franchisee_details()
self.franchisee.to_csv(os.path.join(output_dir, "franchisee.csv"))
# 获取上次购买价格
self.last_price = common_module.get_renewal_last_price_details()
self.last_price.to_csv(os.path.join(output_dir, "last_price.csv"))
# 周期性增购
self.cyclic_increasing = common_module.get_cyclic_increasing_renewal_details()
self.cyclic_increasing.to_csv(os.path.join(output_dir, "cyclic_increasing.csv"))
# 获取NGV数据
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "675bb02bd2d53c2034c665e4"}
self.NGV_data_list = api_instance.entry_data_list(payload).get("data")
# 获取简道云员工id
payload = {"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "6769204a1902c9341340a1bc",
}
staff_id = api_instance.entry_data_list(payload)
self.staff_id_list = staff_id.get("data") # api请求格式,将数据封装在data字典里
# 省市区人员关系表
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "676512ac3e54dc3159460c0a"}
json_dict = api_instance.entry_data_list(payload)
if json_dict and "data" in json_dict:
self.province_staff_id_list = json_dict.get("data")
else:
print("加载省市区人员关系表失败")
self.province_staff_id_list = []
# 获取已派发续约待办(进行中)
payload = {"api_key": "675b900991ad2491c69389ca",
"entry_id": "6931063d64187eaf6b927557",
"filter": {"rel": "and",
"cond": [{"field": "flowState", "type": "flowstate", "method": "eq", "value": [0]}]},
}
renewal = api_instance.entry_data_list(payload)
self.renewal_data_list = renewal.get("data")
@staticmethod
def replace_names_with_staff_ids(df, name_columns, staff_id_list):
"""
将 DataFrame 中多个姓名列替换为对应的员工ID。
:param staff_id_list: 简道云获取到员工id
:param df: pandas.DataFrame
:param name_columns: list[str],需要替换的姓名列名列表,例如 ["col1", "col2"]
:return: 修改后的 DataFrame(原列被替换)
"""
# 1. 构建姓名 -> 员工ID 的映射字典(只做一次)
name_to_id = {}
for item in staff_id_list or []:
name = item.get("_widget_1734942794144")
staff_id = item.get("_widget_1734942794145")
if name and staff_id:
name_to_id[str(name).strip()] = str(staff_id)
# 2. 对每个指定的列进行替换
df = df.copy() # 避免修改原始数据
for col in name_columns:
if col not in df.columns:
continue # 跳过不存在的列
# 替换:姓名 → ID,找不到的保留原值(可改为 fillna(None)
df[col] = (
df[col]
.astype(str)
.str.strip()
.map(name_to_id)
.fillna(df[col])
)
return df
@staticmethod
def row_to_dict(row, field_mapping):
"""将一行数据转换为指定格式的字典"""
result = {}
for col_name, widget_id in field_mapping.items():
if col_name not in row:
continue
value = row[col_name]
# 处理:如果 value 是容器类型(list, dict, tuple, np.ndarray),不进行 pd.isna 判断
if isinstance(value, (list, dict, tuple)) or (hasattr(value, '__len__') and not isinstance(value, str)):
clean_value = value
else:
# 标量类型:安全使用 pd.isna
if pd.isna(value):
clean_value = None
elif isinstance(value, pd.Timestamp):
clean_value = value.strftime('%Y-%m-%dT%H:%M:%SZ')
else:
clean_value = value
# 所有字段统一包 {"value": ...},包括子表单
result[widget_id] = {"value": clean_value}
return result
@staticmethod
def en_row_to_cn_row(en_row, en_to_cn_map):
"""
将英文字段的行数据转换为中文字段的行数据
:param en_row: dict 或 pandas.Serieskey 为英文字段
:param en_to_cn_map: dict, 英文字段名 -> 中文字段名
:return: dictkey 为中文字段名
"""
cn_row = {}
for en_key, value in en_row.items():
if en_key in en_to_cn_map:
cn_key = en_to_cn_map[en_key]
cn_row[cn_key] = value
# 可选:忽略无法映射的字段,或记录警告
return cn_row
@staticmethod
def get_customer_service_by_location(province_name, city_name, area_name, staff_id_list):
"""
直接遍历 self.staff_id_list,根据省市区匹配续约回访客服。
:return: 客服用户名(str),未找到则返回提示信息
"""
if not all([province_name, city_name, area_name]):
return "数据缺失: 省市区不完整"
for item in staff_id_list or []:
try:
prov = item.get('_widget_1734677164861', '').strip()
city = item.get('_widget_1734677164862', '').strip()
area = item.get('_widget_1734677164863', '').strip()
if (prov == province_name.strip() and
city == city_name.strip() and
area == area_name.strip()):
# 提取客服用户名
staff_info = item.get('_widget_1734677164869', {}) # 续约回访客服
username = staff_info.get('username')
return username if username else "数据缺失: 客服用户名为空"
except Exception:
continue # 跳过格式异常的记录
return "数据缺失: 未找到对应的续约回访客服"
def build_subform_records(
self,
df: pd.DataFrame,
group_by_col: str,
field_mapping: dict,
) -> dict:
"""
通用子表单预处理函数:将子表单 DataFrame 转换为 {group_key: [subform_record1, subform_record2, ...]} 的字典。
:param df: 子表单数据 DataFrame,列名为中文(如 "商品名称", "分母金额"
:param group_by_col: 用于分组的列名(如 "门店编码"
:param field_mapping: 字段映射字典,{中文字段名: widget_id},例如 {"商品名称": "_widget_xxx"}
:return: dictkey 为 group_by_col 的值,value 为该组对应的子表单记录列表,
每条记录是 {widget_id: {"value": clean_value}} 的 dict
"""
if df.empty:
return defaultdict(list)
result = defaultdict(list)
target_fields = set(field_mapping.keys())
for _, row in df.iterrows():
row_dict = row.to_dict()
group_key = row_dict.get(group_by_col)
if not group_key or (isinstance(group_key, str) and group_key.strip() == ""):
warning_msg = f"子表单行缺少分组字段 '{group_by_col}',跳过: {row_dict}"
# 构建单条子表单记录
sub_record = {}
for field_cn, widget_id in field_mapping.items():
val = row_dict.get(field_cn)
# 清理值
if pd.isna(val):
clean_val = None
elif hasattr(val, 'to_eng_string'): # Decimal
try:
clean_val = float(val)
except (ValueError, TypeError):
clean_val = str(val)
elif isinstance(val, pd.Timestamp):
clean_val = val.strftime('%Y-%m-%d %H:%M:%S')
else:
clean_val = val
sub_record[widget_id] = {"value": clean_val}
result[group_key].append(sub_record)
return result
def process_data(self):
"""
数据处理加工
:return: 处理后的 DataFrame,列名为中文
"""
data_NGV = self.data_NGV.copy() # 避免修改原始数据
# === 将英文字段名替换为中文字段名 ===
# 但只重命名存在的列
rename_map = {en: cn for en, cn in self.cn_field_map.items() if en in data_NGV.columns}
data_NGV.rename(columns=rename_map, inplace=True)
# 日期字段处理(使用中文列名)
time_columns = ['过期日']
data_NGV[time_columns] = data_NGV[time_columns].apply(
lambda col: pd.to_datetime(col, errors='coerce')
.dt.tz_localize('Asia/Shanghai')
.dt.tz_convert('UTC')
)
# 新增4列:辅助时间字段
data_NGV['120天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=60)
data_NGV['60天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=30)
data_NGV['30天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=0)
data_NGV['0天自动流转时间'] = data_NGV['过期日'] + pd.Timedelta(days=90)
# 格式化为字符串(去掉时区)
for col in ['过期日', '120天自动流转时间', '60天自动流转时间', '30天自动流转时间', '0天自动流转时间']:
data_NGV[col] = data_NGV[col].dt.strftime('%Y-%m-%d %H:%M:%S')
# 新增加盟商列
data_NGV = data_NGV.merge(
self.franchisee[['门店编码', '加盟商']],
on='门店编码',
how='left'
)
# 新增上次购买价格列
# 1. 清洗并拼接类型+价格
df_lp = self.last_price[['门店编码', '类型', '价格']].copy()
df_lp['类型'] = df_lp['类型'].fillna('').astype(str)
df_lp['价格'] = (
pd.to_numeric(df_lp['价格'], errors='coerce')
.round().fillna(0).astype(int).astype(str)
)
df_lp['类型_价格'] = df_lp['类型'] + df_lp['价格']
# 2. 按门店聚合,分号连接
agg_df = df_lp.groupby('门店编码', as_index=False)['类型_价格'].apply(';'.join)
# 3. 合并回主表
data_NGV = data_NGV.merge(agg_df, on='门店编码', how='left').fillna({'类型_价格': ''})
data_NGV.rename(columns={'类型_价格': '上次购买价格'}, inplace=True)
# 4. 处理没有匹配记录的门店(填空或默认值)
data_NGV['上次购买价格'] = data_NGV['上次购买价格'].fillna('')
# 成员字段替换(现在列名是中文)
staff_name_cols = [
"专属运营顾问",
"运营专家",
]
data_NGV = self.replace_names_with_staff_ids(data_NGV, staff_name_cols, self.staff_id_list)
return data_NGV
def dispatch_task(self, data_NGV):
"""
拆分为三个独立动作(输入 data_NGV 列名为中文):
1. 获取关联数据(NGV_data_id
2. 获取区域客服(regional_customer_service
3. 字段映射与格式化(中文 → widget),正确处理子表单
"""
records = []
no_customer_service_data = []
# === 使用通用函数预处理周期性增购子表单 ===
cyclic_subforms = self.build_subform_records(
df=self.cyclic_increasing,
group_by_col="门店编码",
field_mapping=self.subform_field_map,
)
# === Step 1: 构建 门店编码 → NGV 数据ID 映射 ===
org_code_to_ngv_id = {}
for ngv_item in self.NGV_data_list or []:
org_code = ngv_item.get("_widget_1734062123071")
ngv_id = ngv_item.get("_id")
if org_code and ngv_id:
org_code_to_ngv_id[org_code] = ngv_id
# === Step 2: 定义获取区域客服的函数 ===
def get_regional_customer_service(row):
province = row.get("省份") or row.get("province_name")
city = row.get("城市") or row.get("city_name")
area = row.get("区县") or row.get("district_name") or row.get("area_name")
org_code = row.get("门店编码")
# 若省市区缺失,尝试从 NGV 补全
if not all([province, city, area]) or any(
v in [None, '', 'None', 'NA'] for v in [province, city, area]
):
ngv_record = next(
(item for item in self.NGV_data_list
if item.get("_widget_1734062123071") == org_code),
None
)
if ngv_record:
province = ngv_record.get("_widget_1734062123090")
city = ngv_record.get("_widget_1734062123092")
area = ngv_record.get("_widget_1734062123094")
logger.info(f"【从NGV补全省市区】门店 {org_code}: {province}, {city}, {area}")
if not all([province, city, area]) or any(
v in [None, '', 'None', 'NA'] for v in [province, city, area]
):
logger.warning(f"【省市区信息缺失】门店 {org_code} 省市区不完整,客服设为空")
return None
customer_service = self.get_customer_service_by_location(
str(province).strip(),
str(city).strip(),
str(area).strip(),
self.province_staff_id_list
)
if customer_service and "数据缺失" not in str(customer_service):
logger.info(f"【派发客服】门店 {org_code} 派发给客服: {customer_service}")
return customer_service
else:
logger.warning(f"未找到区域客服,请检查门店编码: {org_code}")
return None
# === Step 3: 遍历主表每一行,构建最终提交记录 ===
for _, row in data_NGV.iterrows():
row_dict = row.to_dict()
# 3.1 关联数据(NGV ID
org_code = row_dict.get("门店编码")
ngv_id = org_code_to_ngv_id.get(org_code)
row_dict["关联数据"] = ngv_id if ngv_id else None
if not ngv_id:
logger.warning(f"未找到关联数据,请检查门店编码: {org_code}")
# 3.2 区域客服
customer_service = get_regional_customer_service(row_dict)
row_dict["区域客服"] = customer_service
if not customer_service:
no_customer_service_data.append(row_dict)
# 3.3 注入周期性增购子表单
row_dict["周期性增购"] = cyclic_subforms.get(org_code, [])
# 3.4 转换为 widget 格式
widget_record = self.row_to_dict(row_dict, self.field_map)
records.append(widget_record)
# === Step 4: 批量提交 ===
if not records:
logger.info("无数据需要派发")
return
payload = {
"api_key": "675b900991ad2491c69389ca",
"entry_id": "6931063d64187eaf6b927557",
"data_list": records
}
print(payload)
api_instance.entry_data_batch_create(payload)
logger.info(f"已提交 {len(records)} 条数据进行派发")
def main(self):
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
logger.info("任务开始")
# step1: 获取数据
self.load_all_data()
logger.info("加载数据完成")
# step2:数据处理
data_NGV = self.process_data()
# step3:数据派发
self.dispatch_task(data_NGV)
# step4:过期日发生变化更新已有表单
# step5:自动同意原表单
common_module.send_task_status(task_start_time, "续约回访待办")
except Exception as e:
error_task_logger.error(f"续约回访待办发生错误{e}")
# common_module.send_task_error(task_start_time, "续约回访待办", str(e))
if __name__ == '__main__':
RenewalToDo().main()
+833
View File
@@ -0,0 +1,833 @@
from datetime import datetime, timezone
import pandas as pd
import os
import pytz
from typing import Dict
import requests
import json
import time
import numpy as np # 导入numpy库用于处理numpy数组
output_dir = "output" # 设置输出目录
os.makedirs(output_dir, exist_ok=True)
class Config:
JIANDAOYUN_API_TOKEN = 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN' # token
class API:
def entry_data_list(self, data: dict, replace: bool = False, max_retries: int = 20) -> Dict: # 获取多条表单数据
"""
获取多条表单数据
:param max_retries: 最大重试次数
:param replace: 是否替换字段
:param data:
api_key: 应用id
entry_id: 表单id
:return:
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 app_key
'Content-Type': 'application/json'
}
all_data_batches = [] # 用于存储每次请求返回的数据批次
last_data_id = None
exit_flag = False
while True:
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"limit": 90,
"data_id": last_data_id,
"filter": data.get('filter', None)
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
if data_get["data"]:
all_data_batches.extend(data_get['data'])
last_data_id = data_get['data'][-1].get('_id')
print(f"已获取 {len(all_data_batches)} 条数据")
break # 成功则跳出循环
else:
if 'data' not in data_get or len(data_get['data']) == 0:
exit_flag = True
break
# logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
if retries > max_retries:
# error_task_logger.error(f"任务 {last_data_id}组 连续{max_retries}次请求失败,放弃此次请求。")
all_data_batches.append(None) # 或者可以选择记录失败的payload以便后续处理
if exit_flag:
break
# 构建最终返回的字典
final_data = {
'data': all_data_batches # 'data' 键对应的值是列表的列表
}
# logger.info(f"获取了{len(all_data_batches)}条数据")
if replace:
print("进行了替换")
return_data = self.field_replacement(data, final_data) # 字段替换,由id替换为标签名
return return_data
else:
return final_data
def field_replacement(self, data: dict, data_get: dict) -> dict:
"""
字段替换,将id替换为标签名,即唯一值替换为表单中显示字段的名字
:param data: 简道云插件发送过来的data,包含表单id、数据id、应用id
:param data_get: 简道云请求的数据,一般是根据数据id获取到表单的数据
:return: 将根据数据id获取到的表单数据,进行替换,返回替换后的数据
"""
# 获取表单对应字段标签名称
widget_list = self.entry_widget_list(data)
# 检查widget_list是否有效
if not widget_list or 'widgets' not in widget_list or not isinstance(widget_list['widgets'], list):
raise ValueError("映射表没有接受到数据")
# 创建一个映射表,将_widget_名称映射到label
name_to_label = {widget['name']: widget['label'] for widget in widget_list['widgets']}
def replace_keys(obj):
"""递归替换字典中的键名"""
if isinstance(obj, dict):
new_dict = {}
for key, value in obj.items():
new_key = name_to_label.get(key, key)
new_dict[new_key] = replace_keys(value)
return new_dict
elif isinstance(obj, list):
return [replace_keys(item) for item in obj]
else:
return obj
# 复制 data_get,避免修改原始数据
data_get_copy = json.loads(json.dumps(data_get)) # 深拷贝
if 'data' in data_get_copy:
data_get_copy['data'] = replace_keys(data_get_copy['data'])
return data_get_copy
@staticmethod
def workflow_instance_get(data: dict, max_retries: int = 20) -> dict:
"""
查询实例流程信息
:param max_retries:
:param data: 简道云插件发送过来的data,包含应用id
:return: 查询简道云流程实例信息返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v6/workflow/instance/get'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"instance_id": data['data_id'],
"tasks_type": 1
}
)
print("payload:", payload)
data_get = None
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
# res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
# print( "返回结果:", data_get)
if res.status_code == 200:
break # 成功则跳出循环
else:
# logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
if retries > max_retries:
# error_task_logger.error(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。")
print("请求失败")
return data_get
@staticmethod
def entry_data_update(data: dict, max_retries: int = 20) -> dict: # 修改数据
"""
修改数据
:param max_retries: 最大重试次数,此处设置100次
:param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息
:return: 修改数据后简道云返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/update'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"data_id": data['data_id'], # 数据ID
"data": data['data']
}
)
data_get = None
retries = 0
while retries <= max_retries:
try:
res: requests.Response = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
# print(data_get)
if res.status_code == 200:
break # 成功则跳出循环
else:
# logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(10) # 在重试之间稍作停顿
if retries > max_retries:
# error_task_logger.error(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。")
continue
return data_get
class NpEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(NpEncoder, self).default(obj)
class YDAPI:
appKey = "ding5kqocon5s9oph5uq"
appSecret = "HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_"
def generateToken(self) -> str:
"""
函数功能:生成访问令牌(token)
Returns:
str: 返回生成的访问令牌字符串。此token用于后续API调用的身份验证。
"""
token_api = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
data = {
"appKey": f"{self.appKey}",
"appSecret": f'{self.appSecret}'
}
res = requests.post(token_api, json=data)
token = res.json().get('accessToken')
return token
def read_processes_instances(self, token, formUuid, page, n, appType="APP_UYZ0KG6L0CCNV80GZ66O",
systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", instanceStatus="RUNNING",
max_retries=10, delay=2, createFromTimeGMT=None, createToTimeGMT=None,
modifiedFromTimeGMT=None,
modifiedToTimeGMT=None, searchFieldJson={}):
"""
函数功能:读取流程表单的所有数据,并加入重试机制。
Args:
token (str): 登录验证token,用于API调用的身份验证。
formUuid (str): 表单唯一标识符,用于指定需要读取哪个表单的实例数据。
page (int): 分页参数,指定请求的数据页码。
n (int): 每页显示的数据条数。
appType (str): 应用类型标识符,默认为 "APP_UYZ0KG6L0CCNV80GZ66O"
systemToken (str): 系统token,默认为固定值
instanceStatus (str): 流程实例状态,默认为"RUNNING"
max_retries (int): 最大重试次数,默认为10次
delay (int): 每次重试之间的延迟秒数,默认为2秒
Returns:
dict: 返回从API获取的流程表单实例数据的JSON解析结果。
Raises:
Exception: 如果达到最大重试次数仍未成功,则抛出异常。
"""
attempt = 0
api = f'https://api.dingtalk.com/v1.0/yida/processes/instances?pageNumber={page}&pageSize={n}'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
formData = {
"appType": appType,
"systemToken": systemToken,
"userId": "yida_pub_account", # 超级管理员账号
"language": "zh_CN",
"formUuid": formUuid,
"instanceStatus": instanceStatus, # 运行中
"createFromTimeGMT": createFromTimeGMT,
"createToTimeGMT": createToTimeGMT,
"modifiedFromTimeGMT": modifiedFromTimeGMT,
"modifiedToTimeGMT": modifiedToTimeGMT,
"searchFieldJson": json.dumps(
searchFieldJson
)
}
# print(formData)
while True:
if attempt >= max_retries:
# error_task_logger.error(f"请求失败,已达最大重试次数 {max_retries},无法获取流程实例数据,跳过本次请求。")
break
try:
res = requests.post(api, headers=headers, json=formData)
# print(res.json())
res.raise_for_status() # 如果返回状态码不是2xx,抛出异常
return res.json()
except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常: {e},正在尝试第 {attempt + 1} 次重试...")
time.sleep(delay)
attempt += 1
def update_from(self, token, formInstanceId, data_new):
"""
函数功能:更新表单内容
Args:
token (str): 登录验证token,用于API调用的身份验证。
formInstanceId (str): 表单实例ID,读文件获取。
data_new (dict): 新的数据内容,用于替换现有表单实例中的数据。读文件获取。
Returns:
Response: 返回API请求的响应对象。
"""
api = f'https://api.dingtalk.com//v1.0/yida/forms/instances'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
payload = {
"appType": "APP_UYZ0KG6L0CCNV80GZ66O",
"systemToken": "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"userId": "yida_pub_account", # 曹伟 id
"language": "zh_CN",
"useLatestVersion": "false",
"formInstanceId": formInstanceId,
"updateFormDataJson": json.dumps(data_new, cls=NpEncoder),
}
res = requests.put(api, headers=headers, json=payload)
return res
def get_approval_records(self, token: str, processInstanceId: str, appType="APP_UYZ0KG6L0CCNV80GZ66O",
systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", max_retries=10, delay=2):
"""
函数功能:获取流程表单的审批记录,适用于"F6客户服务"应用,并且包含重试机制。
Args:
token (str): 登录验证token,用于API调用的身份验证。
processInstanceId (str): 流程实例ID,用于标识需要获取审批记录的具体流程实例。
appType (str): 应用类型标识符,默认为 "APP_UYZ0KG6L0CCNV80GZ66O"
systemToken (str): 系统token,默认为固定值
max_retries (int): 最大重试次数,默认为10次
delay (int): 每次重试之间的延迟秒数,默认为2秒
Returns:
dict: 返回从API获取的审批记录的JSON解析结果。通常包括审批步骤、审批人、审批时间等信息。
"""
attempt = 0
userId = "yida_pub_account"
api = f'https://api.dingtalk.com/v1.0/yida/processes/operationRecords?appType={appType}&systemToken={systemToken}&userId={userId}&language=zh_CN&processInstanceId={processInstanceId}'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
while True:
if attempt >= max_retries:
# error_task_logger.error(f"请求失败,已达最大重试次数 {max_retries},无法获取审批数据,跳过本次请求。")
break
try:
res = requests.get(api, headers=headers)
res.raise_for_status() # 如果响应状态码不是2xx,则抛出HTTPError
return res.json()
except (requests.exceptions.RequestException, Exception) as e:
# logger.warning(f"请求出现异常: {e}, 正在重试({attempt + 1}/{max_retries})...")
time.sleep(delay) # 等待指定的延迟时间后再次尝试
attempt += 1
def aggree_approval(self, token: str, taskId: str, processInstanceId: str, formData: dict, res_new):
"""_summary_
函数功能:同意审批节点 --F6客户服务 应用
Args:
token (str): 登录验证token
taskId (str): 获取到的审批节点ID
processInstanceId (str): 读取文件获得的实例ID
formData (dict): 数据样式
res_new (响应值): 从员工ID表里获取到员工名对应的员工ID
Returns:
响应值: 返回请求结果
"""
api = 'https://api.dingtalk.com/v1.0/yida/tasks/execute'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
payload = {
"outResult": "AGREE",
"appType": "APP_UYZ0KG6L0CCNV80GZ66O",
"systemToken": "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"remark": "同意(接口自动)",
"formDataJson": json.dumps(formData, cls=NpEncoder),
"processInstanceId": processInstanceId,
"userId": res_new,
"language": "zh_CN",
"taskId": int(taskId)
}
res = requests.post(api, headers=headers, json=payload)
return res
api_instance = API()
yd_api_instance = YDAPI()
class JDYToYDRenewalToDo(object):
def __init__(self):
self.yd_renewal_data_df = None
self.renewal_data_df = None
self.yd_renewal_data_list = None
self.renewal_data_list = None
self.FORMID = "FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22"
self.appType = "APP_UYZ0KG6L0CCNV80GZ66O"
self.systemToken = "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2"
self.token = yd_api_instance.generateToken()
# 简道云/宜搭节点顺序(越靠近 0 天数值越大,表示流程更靠后)
# 120 → 60 → 30 → 0
self.stage_order = {"120": 0, "60": 1, "30": 2, "0": 3}
self.follow_up_fields = {
"120天是否跟进": "_widget_1764820541628",
"120天处理人": "_widget_1764820541634",
"120天跟进时间": "_widget_1765352838631",
"60天是否跟进": "_widget_1764820541630",
"60天处理人": "_widget_1764820541635",
"60天跟进时间": "_widget_1765352838632",
"30天是否跟进": "_widget_1764820541632",
"30天处理人": "_widget_1764820541636",
"30天跟进时间": "_widget_1765352838633",
"数据ID": "_id"
}
def load_all_data(self):
# 获取简道云已派发续约待办,若无数据直接返回
today_utc = datetime.now(timezone.utc).strftime("%Y-%m-%d")
payload = {"api_key": "675b900991ad2491c69389ca",
"entry_id": "6931063d64187eaf6b927557",
"filter": {"rel": "and",
"cond": [{"field": "updateTime", "type": "range", "method": "eq", "value": [today_utc]},
{"field": "_widget_1766469131897", "type": "range", "method": "eq",
"value": [""]}]},
}
renewal = api_instance.entry_data_list(payload)
self.renewal_data_list = renewal.get("data") or []
if not self.renewal_data_list:
self.renewal_data_df = pd.DataFrame()
return
self.renewal_data_df = pd.DataFrame(self.renewal_data_list)
self.renewal_data_df.to_csv(os.path.join(output_dir, "renewal_data_list.csv"))
def search_yd_renewal_data(self):
# Step 1: 根据简道云获取宜搭数据信息
if self.renewal_data_df is None or self.renewal_data_df.empty:
print("简道云无待办数据")
return []
all_data = []
for _, row in self.renewal_data_df.iterrows():
yd_data = yd_api_instance.read_processes_instances(
token=self.token,
formUuid=self.FORMID,
page=1,
n=100,
appType=self.appType,
systemToken=self.systemToken,
instanceStatus="",
searchFieldJson={"textField_ksydghqw": row["_widget_1764820541661"]},
).get("data", [])
for record in yd_data:
enriched = {**record}
enriched.update({k: row.get(v, "") for k, v in self.follow_up_fields.items()})
all_data.append(enriched)
if not all_data:
print("未获取到任何宜搭数据")
return []
df = pd.DataFrame(all_data)
df.to_csv(os.path.join(output_dir, "yd_renewal_data.csv"), index=False)
# Step 2: 过滤进行中的待办
df = df[df["instanceStatus"] == "RUNNING"].copy()
if df.empty:
print("没有 RUNNING 状态的流程实例")
return []
# Step 3: 提取过期日期和 org_code
df['expire_date'] = pd.to_datetime(
df['data'].apply(lambda x: x.get('dateField_ksirro5l')),
unit='ms',
errors='coerce'
)
df['org_code'] = df['data'].apply(lambda x: x.get('textField_ksydghqw'))
# 按 org_code 分组,每组取 expire_date 最晚的一条
yd_update_list = [] # 用于存储最终要更新的数据
for org_code, group in df.groupby('org_code'):
group_valid = group.dropna(subset=['expire_date'])
if group_valid.empty:
continue
latest_row = group_valid.loc[group_valid['expire_date'].idxmax()]
fields_to_map = {"120天是否跟进", "60天是否跟进", "30天是否跟进"}
# 组装要修改的信息:包含 processInstanceId + 所有跟进字段
update_info = {
"processInstanceId": latest_row["processInstanceId"],
"org_code": org_code,
}
# 添加跟进字段(这些已经在 enriched_record 中)
# 添加跟进字段,并对特定字段做映射
for field_name in self.follow_up_fields.keys():
value = latest_row.get(field_name)
# 处理 NaN 或 None
if pd.isna(value):
final_value = ""
else:
str_val = str(value).strip()
# 如果是需要映射的字段,进行转换
if field_name in fields_to_map:
if str_val == "主动":
final_value = "小六"
elif str_val == "自动":
final_value = "系统"
else:
final_value = str_val # 保留原始值(如“是”、“否”等)
else:
final_value = str_val
update_info[field_name] = final_value
yd_update_list.append(update_info)
# 可选:保存待更新清单
if yd_update_list:
update_df = pd.DataFrame(yd_update_list)
update_df.to_csv(os.path.join(output_dir, "yd_to_update.csv"), index=False)
return yd_update_list # 返回结构化数据供后续调用更新接口
def yd_process_data(self, yd_update_list):
TIME_FIELDS = ["120天跟进时间", "60天跟进时间", "30天跟进时间"]
# === 4. 直接遍历 yd_update_list 中的 item(字段在顶层)===
for item in yd_update_list:
# 转换UTC时间字符串为时间戳
for field in TIME_FIELDS:
if field in item and item[field]:
utc_str = str(item[field]).strip()
try:
if utc_str.endswith("Z"):
utc_str = utc_str[:-1] + "+00:00"
dt = datetime.fromisoformat(utc_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=pytz.UTC)
else:
dt = dt.astimezone(pytz.UTC)
item[field] = int(dt.timestamp() * 1000) # ⏰ 转为秒级时间戳
except Exception as e:
print(f"[时间解析错误] 字段: {field}, 值: {utc_str}, 错误: {e}")
return yd_update_list
def update_yd_renewal_data(self, yd_update_list):
print(yd_update_list)
import ast
def extract_user_id(user_obj):
# 如果是字符串,尝试解析为 dict
if isinstance(user_obj, str):
if user_obj.strip().startswith("{"):
try:
user_obj = ast.literal_eval(user_obj)
except (ValueError, SyntaxError):
return "" # 无法解析,返回空
if isinstance(user_obj, dict):
return user_obj.get("name")
elif isinstance(user_obj, str):
return user_obj # 可能已经是
else:
return ""
yd_update_list = self.yd_process_data(yd_update_list)
for item in yd_update_list:
field_mapping = {
"120天是否跟进": "radioField_kuntp6fm",
"120天处理人": "textField_livc8bjj",
"120天跟进时间": "dateField_lifr1fdv",
"60天是否跟进": "radioField_kurxyhvp",
"60天处理人": "textField_livc8bjl",
"60天跟进时间": "dateField_lifr1fdx",
"30天是否跟进": "radioField_kurxyhvq",
"30天处理人": "textField_livc8bjm",
"30天跟进时间": "dateField_lifr1fdy",
}
item["120天处理人"] = extract_user_id(item["120天处理人"])
item["60天处理人"] = extract_user_id(item["60天处理人"])
item["30天处理人"] = extract_user_id(item["30天处理人"])
update_json = {v: item.get(k, "") for k, v in field_mapping.items()}
print(f"更新数据:{update_json}")
yd_api_instance.update_from(
token=self.token,
formInstanceId=item["processInstanceId"],
data_new=update_json,
)
def update_jd_ydy_renewal_to_do_status(self, yd_update_list):
"""
简道云进度为准:流程节点分 120/60/30/0。若宜搭落后则在宜搭侧自动同意推进到与简道云一致;否则不操作。
"""
def parse_dt(val):
try:
if val is None or val == "":
return datetime.min.replace(tzinfo=timezone.utc)
# 兼容带 Z 的 UTC 写法
dt = datetime.fromisoformat(str(val).replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
except Exception:
return datetime.min.replace(tzinfo=timezone.utc)
def extract_stage_from_text(text: str):
"""
根据节点名称中包含的数字段(120/60/30/0)识别阶段,兼容“联系情况/处理情况”等后缀。
"""
text = (text or "").strip()
# 区间节点映射:120-60 视为已进入 60 阶段;60-30 视为 3030-0 视为 0
if "120-60" in text:
return "60"
if "60-30" in text:
return "30"
if "30-0" in text:
return "0"
for key in self.stage_order.keys():
if key in text:
return key
if f"{key}" in text:
return key
if f"{key}天联系" in text:
return key
if f"{key}天处理" in text:
return key
return None
def fallback_stage_from_obj(obj):
"""
在未知结构的返回中深度搜索含阶段数字的字符串,返回“最靠后的阶段”(stage_order 值最大)。
防止找到第一个 120 就返回,导致明明已进入 60 却误判为 120。
"""
found = []
def _iter(o):
if isinstance(o, dict):
for v in o.values():
yield from _iter(v)
elif isinstance(o, list):
for v in o:
yield from _iter(v)
elif isinstance(o, str):
yield o
else:
yield str(o)
for text in _iter(obj):
stage = extract_stage_from_text(text)
if stage is not None:
found.append(stage)
if not found:
return None
# 取 stage_order 中“值最大”的阶段,表示流程更靠后
return max(found, key=lambda s: self.stage_order.get(s, -1))
for item in yd_update_list:
# 1) 简道云流程日志(权威)
data = {"data_id": item["数据ID"]}
jdy_workflow_data = api_instance.workflow_instance_get(data) or {}
print("简道云流程日志:", jdy_workflow_data)
jdy_logs = jdy_workflow_data.get("logs", [])
jdy_logs = sorted(
jdy_logs,
key=lambda x: parse_dt(x.get("finish_time") or x.get("create_time")),
reverse=True,
)
tasks = jdy_workflow_data.get("tasks", []) or []
pending = [t for t in tasks if t.get("status") == 0]
task_candidate = (pending[0] if pending else None) or (sorted(
tasks,
key=lambda x: parse_dt(x.get("create_time")),
reverse=True
)[0] if tasks else {})
jdy_result_records = jdy_workflow_data.get("result", []) or []
jdy_stage_candidates = [
extract_stage_from_text(str(jdy_logs[0].get("flow_name", ""))) if jdy_logs else None,
extract_stage_from_text(
str(task_candidate.get("flow_name", "")) + str(task_candidate.get("title", ""))
) if task_candidate else None,
]
jdy_stage_candidates.extend(
extract_stage_from_text(str(rec.get("showName", "")) + str(rec.get("activityId", "")))
for rec in jdy_result_records
)
jdy_stage_candidates.append(fallback_stage_from_obj(jdy_workflow_data))
jdy_stage = next((s for s in jdy_stage_candidates if s), None)
# 2) 宜搭流程日志
yd_workflow_data = yd_api_instance.get_approval_records(
token=self.token,
processInstanceId=item["processInstanceId"],
appType=self.appType,
systemToken=self.systemToken
) or {}
print("宜搭流程日志:", yd_workflow_data)
yd_results = yd_workflow_data.get("result", []) or []
# 展开宜搭 domainList 以获取所有动作
yd_records = [
child
for rec in yd_results
for child in ([rec] + (rec.get("domainList", []) or []))
]
yd_records = sorted(
yd_records,
key=lambda x: parse_dt(x.get("operateTimeGMT") or x.get("activeTimeGMT")),
reverse=True,
)
# 优先使用当前待办(type == TODO),否则用最新一条
yd_todo = next((r for r in yd_records if str(r.get("type")).upper() == "TODO"), None)
yd_latest = yd_todo or (yd_records[0] if yd_records else {})
yd_stage = extract_stage_from_text(
str(yd_latest.get("showName", "")) + str(yd_latest.get("activityId", ""))
)
# 若无法识别阶段,跳过防止误操作
if jdy_stage is None or yd_stage is None:
print(f"无法识别阶段,跳过同步 data_id={item.get('数据ID')}, jdy_stage={jdy_stage}, yd_stage={yd_stage}")
continue
# 3) 比较阶段:仅在简道云领先时才跳转;否则直接跳过
jdy_idx = self.stage_order.get(jdy_stage)
yd_idx = self.stage_order.get(yd_stage)
if jdy_idx is None or yd_idx is None:
print(f"阶段映射缺失,跳过 data_id={item.get('数据ID')}, jdy_stage={jdy_stage}, yd_stage={yd_stage}")
continue
if jdy_idx == yd_idx:
print(f"阶段一致,无需跳转 data_id={item.get('数据ID')}, stage={jdy_stage}")
continue
if jdy_idx < yd_idx:
print(f"简道云未领先或已落后,跳过 data_id={item.get('数据ID')}, jdy={jdy_stage}, yd={yd_stage}")
continue
# jdy_idx > yd_idx,宜搭落后,执行自动同意
task_id = yd_latest.get("taskId")
operator_user_id = yd_latest.get("operatorUserId") or yd_latest.get("operator")
if not task_id or not operator_user_id:
print(f"缺少 taskId 或 operatorUserId,无法自动同意 processInstanceId={item['processInstanceId']}")
continue
try:
yd_api_instance.aggree_approval(
token=self.token,
taskId=task_id,
processInstanceId=item["processInstanceId"],
formData={}, # 无需修改表单时传空字典
res_new=operator_user_id
)
print(f"宜搭已自动同意至 下一个 节点,processInstanceId={item['processInstanceId']}")
except Exception as e:
print(f"宜搭自动同意失败 processInstanceId={item['processInstanceId']} err={e}")
def retrun_jdy(self, yd_update_list):
for item in yd_update_list:
data = {
"api_key": "675b900991ad2491c69389ca",
"entry_id": "6931063d64187eaf6b927557",
"data_id": item.get('数据ID'),
"data":
{
"_widget_1766469131897": {"value": ""},
}
}
api_instance.entry_data_update(data)
print(f"已返回 data_id={item.get('数据ID')}")
def main(self):
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
# step1 获取简道云与宜搭数据
self.load_all_data()
# step2 根据简道云门店编码精确搜索宜搭数据
yd_update_list = self.search_yd_renewal_data()
# step3 api修改宜搭数据
self.update_yd_renewal_data(yd_update_list)
# step4 简道云与宜搭流程节点保持一致
self.update_jd_ydy_renewal_to_do_status(yd_update_list)
# step5 简道云标记已处理
self.retrun_jdy(yd_update_list)
except Exception as e:
print(e)
if __name__ == '__main__':
jd_ydy_renewal_to_do = JDYToYDRenewalToDo()
jd_ydy_renewal_to_do.main()