Files
saas/test/续约待办宜搭传给简道云-1day.py
2026-04-09 09:53:47 +08:00

792 lines
31 KiB
Python

import os
from datetime import datetime, timedelta, timezone
import pandas as pd
from tqdm import tqdm
from datetime import datetime, timezone
import pandas as pd
import os
from typing import Dict
import requests
import json
import time
import numpy as np # 导入numpy库用于处理numpy数组
output_dir = "output" # 设置输出目录
os.makedirs(output_dir, exist_ok=True)
class Config:
JIANDAOYUN_API_TOKEN = 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN' # token
class API:
def entry_data_list(self, data: dict, replace: bool = False, max_retries: int = 20) -> Dict: # 获取多条表单数据
"""
获取多条表单数据
:param max_retries: 最大重试次数
:param replace: 是否替换字段
:param data:
api_key: 应用id
entry_id: 表单id
:return:
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 app_key
'Content-Type': 'application/json'
}
all_data_batches = [] # 用于存储每次请求返回的数据批次
last_data_id = None
exit_flag = False
while True:
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"limit": 90,
"data_id": last_data_id,
"filter": data.get('filter', None)
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
if data_get["data"]:
all_data_batches.extend(data_get['data'])
last_data_id = data_get['data'][-1].get('_id')
print(f"已获取 {len(all_data_batches)} 条数据")
break # 成功则跳出循环
else:
if 'data' not in data_get or len(data_get['data']) == 0:
exit_flag = True
break
# logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
if retries > max_retries:
# error_task_logger.error(f"任务 {last_data_id}组 连续{max_retries}次请求失败,放弃此次请求。")
all_data_batches.append(None) # 或者可以选择记录失败的payload以便后续处理
if exit_flag:
break
# 构建最终返回的字典
final_data = {
'data': all_data_batches # 'data' 键对应的值是列表的列表
}
# logger.info(f"获取了{len(all_data_batches)}条数据")
if replace:
print("进行了替换")
return_data = self.field_replacement(data, final_data) # 字段替换,由id替换为标签名
return return_data
else:
return final_data
def field_replacement(self, data: dict, data_get: dict) -> dict:
"""
字段替换,将id替换为标签名,即唯一值替换为表单中显示字段的名字
:param data: 简道云插件发送过来的data,包含表单id、数据id、应用id
:param data_get: 简道云请求的数据,一般是根据数据id获取到表单的数据
:return: 将根据数据id获取到的表单数据,进行替换,返回替换后的数据
"""
# 获取表单对应字段标签名称
widget_list = self.entry_widget_list(data)
# 检查widget_list是否有效
if not widget_list or 'widgets' not in widget_list or not isinstance(widget_list['widgets'], list):
raise ValueError("映射表没有接受到数据")
# 创建一个映射表,将_widget_名称映射到label
name_to_label = {widget['name']: widget['label'] for widget in widget_list['widgets']}
def replace_keys(obj):
"""递归替换字典中的键名"""
if isinstance(obj, dict):
new_dict = {}
for key, value in obj.items():
new_key = name_to_label.get(key, key)
new_dict[new_key] = replace_keys(value)
return new_dict
elif isinstance(obj, list):
return [replace_keys(item) for item in obj]
else:
return obj
# 复制 data_get,避免修改原始数据
data_get_copy = json.loads(json.dumps(data_get)) # 深拷贝
if 'data' in data_get_copy:
data_get_copy['data'] = replace_keys(data_get_copy['data'])
return data_get_copy
@staticmethod
def workflow_instance_get(data: dict, max_retries: int = 20) -> dict:
"""
查询实例流程信息
:param max_retries:
:param data: 简道云插件发送过来的data,包含应用id
:return: 查询简道云流程实例信息返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v6/workflow/instance/get'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"instance_id": data['data_id'],
"tasks_type": 1
}
)
print("payload:", payload)
data_get = None
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
# res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
# print( "返回结果:", data_get)
if res.status_code == 200:
break # 成功则跳出循环
else:
# logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
if retries > max_retries:
# error_task_logger.error(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。")
print("请求失败")
return data_get
@staticmethod
def entry_data_update(data: dict, max_retries: int = 20) -> dict: # 修改数据
"""
修改数据
:param max_retries: 最大重试次数,此处设置100次
:param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息
:return: 修改数据后简道云返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/update'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"data_id": data['data_id'], # 数据ID
"data": data['data'],
"is_start_trigger": True
}
)
data_get = None
retries = 0
while retries <= max_retries:
try:
res: requests.Response = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
# print(data_get)
if res.status_code == 200:
break # 成功则跳出循环
else:
# logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(10) # 在重试之间稍作停顿
if retries > max_retries:
# error_task_logger.error(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。")
continue
return data_get
@staticmethod
def workflow_instance_end(data: dict, max_retries: int = 20) -> dict:
"""
关闭流程
:param max_retries:
:param data: 简道云插件发送过来的data,包含应用id
:return: 查询简道云流程实例信息返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v1/workflow/instance/close'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"instance_id": data['data_id'],
}
)
print("payload:", payload)
data_get = None
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
# res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
print("返回结果:", data_get)
if res.status_code == 200:
break # 成功则跳出循环
else:
retries += 1
time.sleep(3) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
return data_get
class NpEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(NpEncoder, self).default(obj)
class YDAPI:
appKey = "ding5kqocon5s9oph5uq"
appSecret = "HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_"
@staticmethod
def get_ids_query(token, formUuid, appType, systemToken, formInstanceIdList=None, max_retries=10, delay=2):
"""
函数功能:读取表单的所有数据,并加入重试机制。
Args:
token (str): 登录验证token,用于API调用的身份验证。
formUuid (str): 表单唯一标识符,用于指定需要读取哪个表单的实例数据。
page (int): 分页参数,指定请求的数据页码。
n (int): 每页显示的数据条数。
appType (str): 应用类型标识符,默认为 "APP_UYZ0KG6L0CCNV80GZ66O"
systemToken (str): 系统token,默认为固定值
instanceStatus (str): 流程实例状态,默认为"RUNNING"
max_retries (int): 最大重试次数,默认为10次
delay (int): 每次重试之间的延迟秒数,默认为2秒
Returns:
dict: 返回从API获取的流程表单实例数据的JSON解析结果。
Raises:
Exception: 如果达到最大重试次数仍未成功,则抛出异常。
"""
attempt = 0
api = f'https://api.dingtalk.com/v1.0/yida/forms/instances/ids/query'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
formData = {
"appType": appType,
"systemToken": systemToken,
"userId": "yida_pub_account", # 超级管理员账号
"language": "zh_CN",
"formUuid": formUuid,
"formInstanceIdList": formInstanceIdList,
}
# print(formData)
while True:
if attempt >= max_retries:
break
try:
res = requests.post(api, headers=headers, json=formData)
# print(res.json())
res.raise_for_status() # 如果返回状态码不是2xx,抛出异常
return res.json()
except requests.exceptions.RequestException as e:
time.sleep(delay)
attempt += 1
def generateToken(self) -> str:
"""
函数功能:生成访问令牌(token)
Returns:
str: 返回生成的访问令牌字符串。此token用于后续API调用的身份验证。
"""
token_api = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
data = {
"appKey": f"{self.appKey}",
"appSecret": f'{self.appSecret}'
}
res = requests.post(token_api, json=data)
token = res.json().get('accessToken')
return token
def read_processes_instances(self, token, formUuid, page, n, appType="APP_UYZ0KG6L0CCNV80GZ66O",
systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", instanceStatus="RUNNING",
max_retries=10, delay=2, createFromTimeGMT=None, createToTimeGMT=None,
modifiedFromTimeGMT=None,
modifiedToTimeGMT=None, searchFieldJson={}):
"""
函数功能:读取流程表单的所有数据,并加入重试机制。
Args:
token (str): 登录验证token,用于API调用的身份验证。
formUuid (str): 表单唯一标识符,用于指定需要读取哪个表单的实例数据。
page (int): 分页参数,指定请求的数据页码。
n (int): 每页显示的数据条数。
appType (str): 应用类型标识符,默认为 "APP_UYZ0KG6L0CCNV80GZ66O"
systemToken (str): 系统token,默认为固定值
instanceStatus (str): 流程实例状态,默认为"RUNNING"
max_retries (int): 最大重试次数,默认为10次
delay (int): 每次重试之间的延迟秒数,默认为2秒
Returns:
dict: 返回从API获取的流程表单实例数据的JSON解析结果。
Raises:
Exception: 如果达到最大重试次数仍未成功,则抛出异常。
"""
attempt = 0
api = f'https://api.dingtalk.com/v1.0/yida/processes/instances?pageNumber={page}&pageSize={n}'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
formData = {
"appType": appType,
"systemToken": systemToken,
"userId": "yida_pub_account", # 超级管理员账号
"language": "zh_CN",
"formUuid": formUuid,
"instanceStatus": instanceStatus, # 运行中
"createFromTimeGMT": createFromTimeGMT,
"createToTimeGMT": createToTimeGMT,
"modifiedFromTimeGMT": modifiedFromTimeGMT,
"modifiedToTimeGMT": modifiedToTimeGMT,
"searchFieldJson": json.dumps(
searchFieldJson
)
}
# print(formData)
while True:
if attempt >= max_retries:
# error_task_logger.error(f"请求失败,已达最大重试次数 {max_retries},无法获取流程实例数据,跳过本次请求。")
break
try:
res = requests.post(api, headers=headers, json=formData)
# print(res.json())
res.raise_for_status() # 如果返回状态码不是2xx,抛出异常
return res.json()
except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常: {e},正在尝试第 {attempt + 1} 次重试...")
time.sleep(delay)
attempt += 1
def update_from(self, token, formInstanceId, data_new):
"""
函数功能:更新表单内容
Args:
token (str): 登录验证token,用于API调用的身份验证。
formInstanceId (str): 表单实例ID,读文件获取。
data_new (dict): 新的数据内容,用于替换现有表单实例中的数据。读文件获取。
Returns:
Response: 返回API请求的响应对象。
"""
api = f'https://api.dingtalk.com//v1.0/yida/forms/instances'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
payload = {
"appType": "APP_UYZ0KG6L0CCNV80GZ66O",
"systemToken": "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"userId": "yida_pub_account", # 曹伟 id
"language": "zh_CN",
"useLatestVersion": "false",
"formInstanceId": formInstanceId,
"updateFormDataJson": json.dumps(data_new, cls=NpEncoder),
}
res = requests.put(api, headers=headers, json=payload)
return res
def get_approval_records(self, token: str, processInstanceId: str, appType="APP_UYZ0KG6L0CCNV80GZ66O",
systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", max_retries=10, delay=2):
"""
函数功能:获取流程表单的审批记录,适用于"F6客户服务"应用,并且包含重试机制。
Args:
token (str): 登录验证token,用于API调用的身份验证。
processInstanceId (str): 流程实例ID,用于标识需要获取审批记录的具体流程实例。
appType (str): 应用类型标识符,默认为 "APP_UYZ0KG6L0CCNV80GZ66O"
systemToken (str): 系统token,默认为固定值
max_retries (int): 最大重试次数,默认为10次
delay (int): 每次重试之间的延迟秒数,默认为2秒
Returns:
dict: 返回从API获取的审批记录的JSON解析结果。通常包括审批步骤、审批人、审批时间等信息。
"""
attempt = 0
userId = "yida_pub_account"
api = f'https://api.dingtalk.com/v1.0/yida/processes/operationRecords?appType={appType}&systemToken={systemToken}&userId={userId}&language=zh_CN&processInstanceId={processInstanceId}'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
while True:
if attempt >= max_retries:
# error_task_logger.error(f"请求失败,已达最大重试次数 {max_retries},无法获取审批数据,跳过本次请求。")
break
try:
res = requests.get(api, headers=headers)
res.raise_for_status() # 如果响应状态码不是2xx,则抛出HTTPError
return res.json()
except (requests.exceptions.RequestException, Exception) as e:
# logger.warning(f"请求出现异常: {e}, 正在重试({attempt + 1}/{max_retries})...")
time.sleep(delay) # 等待指定的延迟时间后再次尝试
attempt += 1
def aggree_approval(self, token: str, taskId: str, processInstanceId: str, formData: dict, res_new):
"""_summary_
函数功能:同意审批节点 --F6客户服务 应用
Args:
token (str): 登录验证token
taskId (str): 获取到的审批节点ID
processInstanceId (str): 读取文件获得的实例ID
formData (dict): 数据样式
res_new (响应值): 从员工ID表里获取到员工名对应的员工ID
Returns:
响应值: 返回请求结果
"""
api = 'https://api.dingtalk.com/v1.0/yida/tasks/execute'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
payload = {
"outResult": "AGREE",
"appType": "APP_UYZ0KG6L0CCNV80GZ66O",
"systemToken": "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"remark": "同意(接口自动)",
"formDataJson": json.dumps(formData, cls=NpEncoder),
"processInstanceId": processInstanceId,
"userId": res_new,
"language": "zh_CN",
"taskId": int(taskId)
}
res = requests.post(api, headers=headers, json=payload)
return res
api_instance = API()
yd_api_instance = YDAPI()
class YDToJDYRenewalToDo(object):
def __init__(self):
self.FORMID = "FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22"
self.appType = "APP_UYZ0KG6L0CCNV80GZ66O"
self.systemToken = "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2"
self.token = yd_api_instance.generateToken()
def load_all_data(self):
end_dt = datetime.now() + timedelta(days=1)
start_dt = end_dt - timedelta(days=90)
start_time = start_dt.strftime("%Y-%m-%d")
end_time = end_dt.strftime("%Y-%m-%d")
yd_data = yd_api_instance.read_processes_instances(
token=self.token,
formUuid=self.FORMID,
page=1,
n=100,
appType=self.appType,
systemToken=self.systemToken,
instanceStatus="",
modifiedFromTimeGMT=start_time,
modifiedToTimeGMT=end_time,
)
all_process_list = []
PAGES_two = yd_data.get('totalCount') // 100 + 1
for a in tqdm(range(1, PAGES_two + 1)):
try:
yd_data = yd_api_instance.read_processes_instances(
token=self.token,
formUuid=self.FORMID,
page=a,
n=100,
appType=self.appType,
systemToken=self.systemToken,
instanceStatus="",
modifiedFromTimeGMT=start_time,
modifiedToTimeGMT=end_time,
)
all_process_list = all_process_list + yd_data.get("data")
except Exception as e:
print(f"获取流程实例数据时出错: {e}")
continue
df_current = pd.DataFrame(all_process_list)
current_file = f"{output_dir}/{start_time}_{end_time}_all_process_list.csv"
# === 新增:读取上次文件并计算差值 ===
if os.path.exists(current_file):
try:
df_last = pd.read_csv(current_file)
except Exception as e:
print(f"读取历史文件失败: {e}")
df_last = pd.DataFrame()
else:
df_last = None # 明确标记:无历史文件
id_col = 'processInstanceId'
if df_last is not None and not df_last.empty and not df_current.empty and id_col in df_current.columns and id_col in df_last.columns:
# 有历史文件,计算新增
last_ids = set(df_last[id_col].astype(str))
current_ids = set(df_current[id_col].astype(str))
new_ids = current_ids - last_ids
diff_records = df_current[df_current[id_col].astype(str).isin(new_ids)].to_dict('records')
else:
# 没有历史文件 或 无法比对 → 返回全部当前数据
diff_records = df_current.to_dict('records') # ← 关键修改点
# 保存当前全量数据(覆盖)
df_current.to_csv(current_file, index=False)
return diff_records
def filter_renewal_data(self, all_process_list):
update_data_list = []
for item in all_process_list:
if item.get("data").get("textField_kto3q3ev"):
org_id = item.get("data").get("textField_ksydghqw")
order_code = item.get("data").get("textField_kto3q3ev") # 订单编码
pay_time = item.get("data").get("dateField_kto3q3ex") # 订单支付日期
res = yd_api_instance.get_ids_query(
token=self.token,
formInstanceIdList=[item.get("processInstanceId")],
formUuid="FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22",
appType="APP_UYZ0KG6L0CCNV80GZ66O",
systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
)
payment_amount = None
bussiness_type = None
# print(res)
if res.get("result"):
form_data = res["result"][0]["formData"]
payment_amount = form_data.get("textField_kyjy1kkm") # "9987"
bussiness_type = form_data.get("textField_kyjy1kkn") # "续约"
# payment_amount = res.get("data").get("textField_kyjy1kkm") # 支付金额
# bussiness_type = res.get("data").get("textField_kyjy1kkn") # 业务类型
if pay_time:
# 确保是整数
timestamp_ms = int(pay_time)
# 转为 UTC datetime 对象
pay_datetime_utc = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
print(pay_datetime_utc) # 例如: 2024-04-05 12:34:38.901000+00:00
else:
pay_datetime_utc = None
update_data_list.append({
"order_code": order_code,
"pay_time": pay_time,
"payment_amount": payment_amount,
"bussiness_type": bussiness_type,
"org_id": org_id
})
return update_data_list
def check_jd_ydy_data(self, update_data_list):
def extract_widget_value(record, widget_id):
value = record.get(widget_id)
if isinstance(value, dict) and "value" in value:
return value.get("value")
return value
def has_value(value):
if value is None:
return False
if isinstance(value, str):
return value.strip() != ""
if isinstance(value, (list, dict)):
return len(value) > 0
return True
def pick_best_record(records):
if not records:
return None
time_fields = [
"_updateTime",
"_updated_at",
"_createTime",
"_created_at",
"updateTime",
"updated_at",
"createTime",
"created_at",
]
def extract_sort_value(record):
for field in time_fields:
value = record.get(field)
if value is None:
continue
try:
return int(value)
except Exception:
try:
return int(float(value))
except Exception:
continue
return 0
return max(records, key=extract_sort_value)
def query_jdy_by_org_id(org_id, flow_state=None):
cond = [{
"field": "_widget_1764820541661",
"type": "text",
"method": "eq",
"value": [org_id]
}]
if flow_state is not None:
cond.append({
"field": "flowState",
"type": "flowstate",
"method": "eq",
"value": [flow_state]
})
payload = {
"api_key": "675b900991ad2491c69389ca",
"entry_id": "6931063d64187eaf6b927557",
"filter": {
"rel": "and",
"cond": cond
}
}
return api_instance.entry_data_list(payload).get("data", [])
for item in update_data_list:
data_list = query_jdy_by_org_id(item.get("org_id"), flow_state=0)
if not data_list:
data_list = query_jdy_by_org_id(item.get("org_id"), flow_state=None)
result = pick_best_record(data_list)
data_id = result.get("_id") if result else None
if not data_id:
# print(f"未找到订单 {item.get('order_code')} 的简道云数据,跳过。")
continue
# 查询实例状态
instance_status = api_instance.workflow_instance_get({"data_id": data_id})
task_status = instance_status.get("status", -1)
print(f"订单 {item.get('order_code')} 的简道云流程状态为 {task_status}")
if task_status == 0:
print("简道云流程正在进行中,执行流程关闭")
api_instance.workflow_instance_end({"data_id": data_id})
target_values = {
"_widget_1764820541674": item.get("order_code"),
"_widget_1764820541679": item.get("pay_time"),
"_widget_1764820541676": item.get("payment_amount"),
"_widget_1764820541680": item.get("bussiness_type"),
}
data_to_update = {}
for widget_id, new_value in target_values.items():
existing_value = extract_widget_value(result, widget_id)
if has_value(existing_value):
continue
if not has_value(new_value):
continue
data_to_update[widget_id] = {"value": new_value}
if not data_to_update:
print(f"订单 {item.get('order_code')} 简道云字段已有值或无可写入内容,跳过同步")
continue
print("开始同步数据")
update_payload = {
"api_key": "675b900991ad2491c69389ca",
"entry_id": "6931063d64187eaf6b927557",
"data_id": data_id,
"data": data_to_update
}
print(update_payload)
api_instance.entry_data_update(update_payload)
print("数据同步完成")
def main(self):
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
# step1 获取简道云与宜搭数据
jd_ydy_data = self.load_all_data()
if jd_ydy_data:
# step2 过滤已经续约的单子
update_data_list = self.filter_renewal_data(jd_ydy_data)
# step3 校验简道云是否有进行中的单子并关闭
self.check_jd_ydy_data(update_data_list)
else:
print("本次执行无处理数据")
except Exception as e:
print(e)
if __name__ == '__main__':
jd_ydy_renewal_to_do = YDToJDYRenewalToDo()
jd_ydy_renewal_to_do.main()