From ab434f6c4c2364b6dd6a58f558468f1d40274e7d Mon Sep 17 00:00:00 2001 From: z66 <1415243231@qq.com> Date: Thu, 25 Dec 2025 14:56:45 +0800 Subject: [PATCH] =?UTF-8?q?=E9=9D=9E=E6=A0=87=E4=B8=9A=E7=BB=A9=E6=8F=90?= =?UTF-8?q?=E6=8A=A5=E3=80=81=E5=90=88=E4=BC=99=E4=BA=BA=E7=BB=93=E7=AE=97?= =?UTF-8?q?=E7=99=BB=E8=AE=B0=E5=AD=97=E6=AE=B5=E4=BF=AE=E6=94=B9=20?= =?UTF-8?q?=E7=BB=AD=E7=BA=A6=E5=9B=9E=E8=AE=BF=20=E5=AE=9C=E6=90=AD?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E7=AE=80=E9=81=93=E4=BA=91=E8=BE=85=E5=8A=A9?= =?UTF-8?q?=E8=84=9A=E6=9C=AC=20=E7=AE=80=E9=81=93=E4=BA=91=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E5=AE=9C=E6=90=AD=E8=BE=85=E5=8A=A9=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/csv-editor.xml | 14 + .idea/inspectionProfiles/Project_Default.xml | 1 + api.py | 42 + .../non_standar_performance_to_BI.py | 13 +- .../partner_settlement_to_BI.py | 23 +- test/宜搭修改数据.py | 20 + test/宜搭修改数据测试.ipynb | 71 ++ test/宜搭请求数据.py | 16 + test/续约待办宜搭传给简道云.py | 693 +++++++++++++++ test/续约待办派发.py | 13 +- test/续约待办派发测试数据.py | 516 +++++++++++ test/续约待办简道云回传宜搭.py | 833 ++++++++++++++++++ tools/BI.ipynb | 12 +- tools/保存common_module数据库链接内容.py | 5 +- yd_api.py | 93 +- 15 files changed, 2327 insertions(+), 38 deletions(-) create mode 100644 test/宜搭修改数据.py create mode 100644 test/宜搭修改数据测试.ipynb create mode 100644 test/宜搭请求数据.py create mode 100644 test/续约待办宜搭传给简道云.py create mode 100644 test/续约待办简道云回传宜搭.py diff --git a/.idea/csv-editor.xml b/.idea/csv-editor.xml index dc000ef..c9450ab 100644 --- a/.idea/csv-editor.xml +++ b/.idea/csv-editor.xml @@ -24,6 +24,13 @@ + + + + + + @@ -31,6 +38,13 @@ + + + + + + diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml index b1fb88e..e64c2d6 100644 --- a/.idea/inspectionProfiles/Project_Default.xml +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -22,6 +22,7 @@ diff --git a/api.py b/api.py index ffd2423..986650b 100644 --- a/api.py +++ b/api.py @@ -587,6 +587,48 @@ class API: return data_get + @staticmethod + def workflow_instance_end(data: dict, max_retries: int = 20) -> dict: + """ + 关闭流程 + :param max_retries: + :param data: 简道云插件发送过来的data,包含应用id + :return: 查询简道云流程实例信息返回的结果 + """ + url = 'https://api.jiandaoyun.com/api/v1/workflow/instance/close' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey + 'Content-Type': 'application/json' + } + + payload = json.dumps({ + "instance_id": data['data_id'], + } + ) + print("payload:", payload) + data_get = None + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + # res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常 + data_get = res.json() + print( "返回结果:", data_get) + if res.status_code == 200: + break # 成功则跳出循环 + else: + logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(3) # 在重试之间稍作停顿 + except requests.exceptions.RequestException as e: + logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(0.1) # 在重试之间稍作停顿 + if retries > max_retries: + error_task_logger.error(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。") + + return data_get @staticmethod diff --git a/back_ground_module/non_standar_performance_to_BI.py b/back_ground_module/non_standar_performance_to_BI.py index 7200f24..afbffff 100644 --- a/back_ground_module/non_standar_performance_to_BI.py +++ b/back_ground_module/non_standar_performance_to_BI.py @@ -29,7 +29,6 @@ class NonStandardPerformanceToBI: self.field_mapping = { "报备类型": "_widget_1753770875899", "协作内容": "_widget_1753770875915", - "订单类型": "_widget_1753770875966", "情况说明": "_widget_1753770875944", "订单编号": "_widget_1753770875887", "实付金额": "_widget_1753770875889", @@ -68,6 +67,14 @@ class NonStandardPerformanceToBI: "业绩类型-聚合":"_widget_1758706882564", "业绩分组":"_widget_1762417447169", "商品名称":"_widget_1762219744898", + "履约金额":"_widget_1762220516367", + "业绩归属日期":"_widget_1762417447127", + "公司名称":"_widget_1762420723743", + "公司ID":"_widget_1762420723744", + "报备业绩金额-区域提交":"_widget_1766375035236", + "业绩归属小六-区域提交":"_widget_1766461143813", + "业绩归属月":"_widget_1766375035265", + "是否同步衡石":"_widget_1766484337844", "提交人": "creator", "提交时间": "createTime", "更新时间": "updateTime" @@ -130,13 +137,13 @@ class NonStandardPerformanceToBI: df = df[df["流程是否结束"] == "是"] # 2.成员字段取值 - user_columns = ["报备业绩归属小六", "报备业绩归属区域经理", "原业绩归属人", "原业绩归属区域经理", "运营专家"] + user_columns = ["报备业绩归属小六", "报备业绩归属区域经理", "原业绩归属人", "原业绩归属区域经理", "运营专家","业绩归属小六-区域提交"] for col in user_columns: df[col] = df[col].map(lambda x: x.get("name", "") if isinstance(x, dict) else "") # 3.日期字段转为北京时间 - time_columns = ["支付日期", "开户/处理日期","提交时间","更新时间"] + time_columns = ["支付日期", "开户/处理日期","提交时间","更新时间","业绩归属月","业绩归属日期"] df[time_columns] = df[time_columns].apply( lambda col: pd.to_datetime(col, errors='coerce') diff --git a/back_ground_module/partner_settlement_to_BI.py b/back_ground_module/partner_settlement_to_BI.py index 0d332f5..a09a671 100644 --- a/back_ground_module/partner_settlement_to_BI.py +++ b/back_ground_module/partner_settlement_to_BI.py @@ -53,6 +53,17 @@ class PartnerSettlementToBI: "特殊情况备注": "_widget_1712805391035", "合伙人介绍证明(微信聊天截图等)": "_widget_1712815331256", "合伙人类型": "_widget_1753957844818", + "小程序签约状态": "_widget_1756087218860", + "订单登记表.订单支付时间": "_widget_1712803222905._widget_1762918516630", + "小程序签约状态-核实": "_widget_1756084913318", + "签约状态-手机号匹配": "_widget_1756195470603", + "签约状态-姓名匹配": "_widget_1756195470602", + "是否重名": "_widget_1756195470601", + "结算月份": "_widget_1756704906867", + "订单支付时间-核实": "_widget_1756804675274", + "结算状态": "_widget_1756804412410", + "提成动作": "_widget_1758529175921", + "是否同步": "_widget_1762855878035", "提交时间": "createTime", "更新时间": "updateTime" } @@ -68,6 +79,7 @@ class PartnerSettlementToBI: "_widget_1753952737266": "佣金", "_widget_1753952737267": "理论佣金", "_widget_1712807001396": "佣金比例", + "_widget_1762918516630": "订单支付时间", }, # 可以在这里添加其他列表字段的配置 # "另一个列表字段": { @@ -127,7 +139,7 @@ class PartnerSettlementToBI: df[col] = df[col].map(lambda x: x.get("name", "") if isinstance(x, dict) else "") # 3.日期字段转为北京时间 - time_columns = ["提交时间", "更新时间"] + time_columns = ["提交时间", "更新时间","订单支付时间-核实","结算月份"] df[time_columns] = df[time_columns].apply( lambda col: pd.to_datetime(col, errors='coerce') @@ -154,6 +166,15 @@ class PartnerSettlementToBI: lambda x: x.get(field) if isinstance(x, dict) else None ) + time_columns_nested = ["订单支付时间"] # 来自订单登记表等嵌套结构 + + # 时间字段标准化:处理订单支付时间 + for col in time_columns_nested: + if col in df_exploded.columns: + df_exploded[col] = pd.to_datetime(df_exploded[col], errors='coerce') \ + .dt.tz_localize(None) \ + .dt.strftime('%Y-%m-%d %H:%M:%S') + # 删除原始的订单登记表列 df_exploded = df_exploded.drop(columns=["订单登记表"]) diff --git a/test/宜搭修改数据.py b/test/宜搭修改数据.py new file mode 100644 index 0000000..2ea5996 --- /dev/null +++ b/test/宜搭修改数据.py @@ -0,0 +1,20 @@ + +from yd_api import YDAPI + +yd_api_instance = YDAPI() +token = yd_api_instance.generateToken() + +update_json = { + + "textField_kto3q3ev": "594561", + "dateField_kto3q3ex": "1766557929000", + "textField_kyjy1kkm": "9987", + "textField_kyjy1kkn": "续约", + +} +res = yd_api_instance.update_from( + token=token, + formInstanceId="ef7aabe7-4931-4271-823f-f9a43bc516b2", + data_new=update_json, + ) +print(res.json()) \ No newline at end of file diff --git a/test/宜搭修改数据测试.ipynb b/test/宜搭修改数据测试.ipynb new file mode 100644 index 0000000..7c8738f --- /dev/null +++ b/test/宜搭修改数据测试.ipynb @@ -0,0 +1,71 @@ +{ + "cells": [ + { + "cell_type": "code", + "id": "initial_id", + "metadata": { + "collapsed": true, + "ExecuteTime": { + "end_time": "2025-12-24T06:28:49.052568Z", + "start_time": "2025-12-24T06:28:48.336385200Z" + } + }, + "source": [ + "\n", + "from yd_api import YDAPI\n", + "\n", + "yd_api_instance = YDAPI()\n", + "token = yd_api_instance.generateToken()\n", + "\n", + "update_json = {\n", + " \"data\": {\n", + " \"textField_kto3q3ev\": \"594561\",\n", + " \"dateField_kto3q3ex\": \"1766557690\",\n", + " \"textField_kyjy1kkm\": \"9987\",\n", + " \"textField_kyjy1kkn\": \"续约\",\n", + " }\n", + "}\n", + "yd_api_instance.update_from(\n", + " token=token,\n", + " formInstanceId=\"ef7aabe7-4931-4271-823f-f9a43bc516b2\",\n", + " data_new=update_json,\n", + " )" + ], + "outputs": [ + { + "ename": "ModuleNotFoundError", + "evalue": "No module named 'yd_api'", + "output_type": "error", + "traceback": [ + "\u001B[31m---------------------------------------------------------------------------\u001B[39m", + "\u001B[31mModuleNotFoundError\u001B[39m Traceback (most recent call last)", + "\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[1]\u001B[39m\u001B[32m, line 1\u001B[39m\n\u001B[32m----> \u001B[39m\u001B[32m1\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01myd_api\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m YDAPI\n\u001B[32m 3\u001B[39m yd_api_instance = YDAPI()\n\u001B[32m 4\u001B[39m token = yd_api_instance.generateToken()\n", + "\u001B[31mModuleNotFoundError\u001B[39m: No module named 'yd_api'" + ] + } + ], + "execution_count": 1 + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/test/宜搭请求数据.py b/test/宜搭请求数据.py new file mode 100644 index 0000000..e07ecbc --- /dev/null +++ b/test/宜搭请求数据.py @@ -0,0 +1,16 @@ +from yd_api import YDAPI + +yd_api_instance = YDAPI() +token = yd_api_instance.generateToken() + +{'appType': 'APP_UYZ0KG6L0CCNV80GZ66O', 'systemToken': 'XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2', + 'userId': 'yida_pub_account', 'language': 'zh_CN', 'formUuid': 'FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22', + 'formInstanceIdList': ['CHS444555666']} +res = yd_api_instance.get_ids_query( + token=token, + formInstanceIdList=["ef7aabe7-4931-4271-823f-f9a43bc516b2"], + formUuid="FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22", + appType="APP_UYZ0KG6L0CCNV80GZ66O", + systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", +) +print(res) diff --git a/test/续约待办宜搭传给简道云.py b/test/续约待办宜搭传给简道云.py new file mode 100644 index 0000000..8afbb93 --- /dev/null +++ b/test/续约待办宜搭传给简道云.py @@ -0,0 +1,693 @@ +import os +from datetime import datetime, timedelta, timezone +import pandas as pd +from tqdm import tqdm +from datetime import datetime, timezone +import pandas as pd +import os +from typing import Dict +import requests +import json +import time +import numpy as np # 导入numpy库用于处理numpy数组 + +output_dir = "output" # 设置输出目录 +os.makedirs(output_dir, exist_ok=True) + + +class Config: + JIANDAOYUN_API_TOKEN = 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN' # token + + +class API: + def entry_data_list(self, data: dict, replace: bool = False, max_retries: int = 20) -> Dict: # 获取多条表单数据 + """ + 获取多条表单数据 + :param max_retries: 最大重试次数 + :param replace: 是否替换字段 + :param data: + api_key: 应用id + entry_id: 表单id + :return: + """ + + url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 app_key + 'Content-Type': 'application/json' + } + all_data_batches = [] # 用于存储每次请求返回的数据批次 + last_data_id = None + exit_flag = False + while True: + payload = json.dumps({ + "app_id": data['api_key'], # 应用ID + "entry_id": data['entry_id'], # 表单ID + "limit": 90, + "data_id": last_data_id, + "filter": data.get('filter', None) + }) + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常 + data_get = res.json() + if data_get["data"]: + all_data_batches.extend(data_get['data']) + last_data_id = data_get['data'][-1].get('_id') + print(f"已获取 {len(all_data_batches)} 条数据") + break # 成功则跳出循环 + else: + if 'data' not in data_get or len(data_get['data']) == 0: + exit_flag = True + break + # logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(0.1) # 在重试之间稍作停顿 + except requests.exceptions.RequestException as e: + # logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(0.1) # 在重试之间稍作停顿 + if retries > max_retries: + # error_task_logger.error(f"任务 {last_data_id}组 连续{max_retries}次请求失败,放弃此次请求。") + all_data_batches.append(None) # 或者可以选择记录失败的payload以便后续处理 + + if exit_flag: + break + + # 构建最终返回的字典 + final_data = { + 'data': all_data_batches # 'data' 键对应的值是列表的列表 + } + # logger.info(f"获取了{len(all_data_batches)}条数据") + if replace: + print("进行了替换") + return_data = self.field_replacement(data, final_data) # 字段替换,由id替换为标签名 + + return return_data + else: + return final_data + + def field_replacement(self, data: dict, data_get: dict) -> dict: + """ + 字段替换,将id替换为标签名,即唯一值替换为表单中显示字段的名字 + :param data: 简道云插件发送过来的data,包含表单id、数据id、应用id + :param data_get: 简道云请求的数据,一般是根据数据id获取到表单的数据 + :return: 将根据数据id获取到的表单数据,进行替换,返回替换后的数据 + """ + + # 获取表单对应字段标签名称 + widget_list = self.entry_widget_list(data) + + # 检查widget_list是否有效 + if not widget_list or 'widgets' not in widget_list or not isinstance(widget_list['widgets'], list): + raise ValueError("映射表没有接受到数据") + + # 创建一个映射表,将_widget_名称映射到label + name_to_label = {widget['name']: widget['label'] for widget in widget_list['widgets']} + + def replace_keys(obj): + """递归替换字典中的键名""" + if isinstance(obj, dict): + new_dict = {} + for key, value in obj.items(): + new_key = name_to_label.get(key, key) + new_dict[new_key] = replace_keys(value) + return new_dict + elif isinstance(obj, list): + return [replace_keys(item) for item in obj] + else: + return obj + + # 复制 data_get,避免修改原始数据 + data_get_copy = json.loads(json.dumps(data_get)) # 深拷贝 + + if 'data' in data_get_copy: + data_get_copy['data'] = replace_keys(data_get_copy['data']) + + return data_get_copy + + @staticmethod + def workflow_instance_get(data: dict, max_retries: int = 20) -> dict: + """ + 查询实例流程信息 + :param max_retries: + :param data: 简道云插件发送过来的data,包含应用id + :return: 查询简道云流程实例信息返回的结果 + """ + url = 'https://api.jiandaoyun.com/api/v6/workflow/instance/get' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey + 'Content-Type': 'application/json' + } + + payload = json.dumps({ + "instance_id": data['data_id'], + "tasks_type": 1 + } + ) + print("payload:", payload) + data_get = None + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + # res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常 + data_get = res.json() + # print( "返回结果:", data_get) + if res.status_code == 200: + break # 成功则跳出循环 + else: + # logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(3) # 在重试之间稍作停顿 + except requests.exceptions.RequestException as e: + # logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(0.1) # 在重试之间稍作停顿 + if retries > max_retries: + # error_task_logger.error(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。") + print("请求失败") + + return data_get + + @staticmethod + def entry_data_update(data: dict, max_retries: int = 20) -> dict: # 修改数据 + """ + 修改数据 + :param max_retries: 最大重试次数,此处设置100次 + :param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息 + :return: 修改数据后简道云返回的结果 + """ + url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/update' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey + 'Content-Type': 'application/json' + } + + payload = json.dumps({ + "app_id": data['api_key'], # 应用ID + "entry_id": data['entry_id'], # 表单ID + "data_id": data['data_id'], # 数据ID + "data": data['data'] + } + ) + + data_get = None + retries = 0 + while retries <= max_retries: + try: + res: requests.Response = requests.post(url=url, data=payload, headers=headers, timeout=10) + res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常 + data_get = res.json() + # print(data_get) + if res.status_code == 200: + break # 成功则跳出循环 + else: + # logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(3) # 在重试之间稍作停顿 + except requests.exceptions.RequestException as e: + # logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(10) # 在重试之间稍作停顿 + if retries > max_retries: + # error_task_logger.error(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。") + continue + return data_get + + @staticmethod + def workflow_instance_end(data: dict, max_retries: int = 20) -> dict: + """ + 关闭流程 + :param max_retries: + :param data: 简道云插件发送过来的data,包含应用id + :return: 查询简道云流程实例信息返回的结果 + """ + url = 'https://api.jiandaoyun.com/api/v1/workflow/instance/close' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey + 'Content-Type': 'application/json' + } + + payload = json.dumps({ + "instance_id": data['data_id'], + } + ) + print("payload:", payload) + data_get = None + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + # res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常 + data_get = res.json() + print("返回结果:", data_get) + if res.status_code == 200: + break # 成功则跳出循环 + else: + + retries += 1 + time.sleep(3) # 在重试之间稍作停顿 + except requests.exceptions.RequestException as e: + + retries += 1 + time.sleep(0.1) # 在重试之间稍作停顿 + + return data_get + + +class NpEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + return float(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + else: + return super(NpEncoder, self).default(obj) + + +class YDAPI: + appKey = "ding5kqocon5s9oph5uq" + appSecret = "HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_" + + @staticmethod + def get_ids_query(token, formUuid, appType, systemToken, formInstanceIdList=None, max_retries=10, delay=2): + """ + 函数功能:读取表单的所有数据,并加入重试机制。 + + Args: + token (str): 登录验证token,用于API调用的身份验证。 + formUuid (str): 表单唯一标识符,用于指定需要读取哪个表单的实例数据。 + page (int): 分页参数,指定请求的数据页码。 + n (int): 每页显示的数据条数。 + appType (str): 应用类型标识符,默认为 "APP_UYZ0KG6L0CCNV80GZ66O" + systemToken (str): 系统token,默认为固定值 + instanceStatus (str): 流程实例状态,默认为"RUNNING" + max_retries (int): 最大重试次数,默认为10次 + delay (int): 每次重试之间的延迟秒数,默认为2秒 + + Returns: + dict: 返回从API获取的流程表单实例数据的JSON解析结果。 + + Raises: + Exception: 如果达到最大重试次数仍未成功,则抛出异常。 + """ + + attempt = 0 + api = f'https://api.dingtalk.com/v1.0/yida/forms/instances/ids/query' + headers = { + "Content-Type": "application/json", + "x-acs-dingtalk-access-token": token + } + formData = { + "appType": appType, + "systemToken": systemToken, + "userId": "yida_pub_account", # 超级管理员账号 + "language": "zh_CN", + "formUuid": formUuid, + "formInstanceIdList": formInstanceIdList, + } + # print(formData) + + while True: + if attempt >= max_retries: + break + + try: + res = requests.post(api, headers=headers, json=formData) + # print(res.json()) + res.raise_for_status() # 如果返回状态码不是2xx,抛出异常 + return res.json() + + except requests.exceptions.RequestException as e: + + time.sleep(delay) + attempt += 1 + + def generateToken(self) -> str: + """ + 函数功能:生成访问令牌(token) + + Returns: + str: 返回生成的访问令牌字符串。此token用于后续API调用的身份验证。 + """ + token_api = 'https://api.dingtalk.com/v1.0/oauth2/accessToken' + data = { + "appKey": f"{self.appKey}", + "appSecret": f'{self.appSecret}' + } + res = requests.post(token_api, json=data) + token = res.json().get('accessToken') + return token + + def read_processes_instances(self, token, formUuid, page, n, appType="APP_UYZ0KG6L0CCNV80GZ66O", + systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", instanceStatus="RUNNING", + max_retries=10, delay=2, createFromTimeGMT=None, createToTimeGMT=None, + modifiedFromTimeGMT=None, + modifiedToTimeGMT=None, searchFieldJson={}): + """ + 函数功能:读取流程表单的所有数据,并加入重试机制。 + + Args: + token (str): 登录验证token,用于API调用的身份验证。 + formUuid (str): 表单唯一标识符,用于指定需要读取哪个表单的实例数据。 + page (int): 分页参数,指定请求的数据页码。 + n (int): 每页显示的数据条数。 + appType (str): 应用类型标识符,默认为 "APP_UYZ0KG6L0CCNV80GZ66O" + systemToken (str): 系统token,默认为固定值 + instanceStatus (str): 流程实例状态,默认为"RUNNING" + max_retries (int): 最大重试次数,默认为10次 + delay (int): 每次重试之间的延迟秒数,默认为2秒 + + Returns: + dict: 返回从API获取的流程表单实例数据的JSON解析结果。 + + Raises: + Exception: 如果达到最大重试次数仍未成功,则抛出异常。 + """ + + attempt = 0 + api = f'https://api.dingtalk.com/v1.0/yida/processes/instances?pageNumber={page}&pageSize={n}' + headers = { + "Content-Type": "application/json", + "x-acs-dingtalk-access-token": token + } + formData = { + "appType": appType, + "systemToken": systemToken, + "userId": "yida_pub_account", # 超级管理员账号 + "language": "zh_CN", + "formUuid": formUuid, + "instanceStatus": instanceStatus, # 运行中 + "createFromTimeGMT": createFromTimeGMT, + "createToTimeGMT": createToTimeGMT, + "modifiedFromTimeGMT": modifiedFromTimeGMT, + "modifiedToTimeGMT": modifiedToTimeGMT, + "searchFieldJson": json.dumps( + searchFieldJson + ) + } + # print(formData) + + while True: + if attempt >= max_retries: + # error_task_logger.error(f"请求失败,已达最大重试次数 {max_retries},无法获取流程实例数据,跳过本次请求。") + break + + try: + res = requests.post(api, headers=headers, json=formData) + # print(res.json()) + res.raise_for_status() # 如果返回状态码不是2xx,抛出异常 + return res.json() + + except requests.exceptions.RequestException as e: + # logger.warning(f"请求异常: {e},正在尝试第 {attempt + 1} 次重试...") + time.sleep(delay) + attempt += 1 + + def update_from(self, token, formInstanceId, data_new): + """ + 函数功能:更新表单内容 + + Args: + token (str): 登录验证token,用于API调用的身份验证。 + formInstanceId (str): 表单实例ID,读文件获取。 + data_new (dict): 新的数据内容,用于替换现有表单实例中的数据。读文件获取。 + + Returns: + Response: 返回API请求的响应对象。 + """ + + api = f'https://api.dingtalk.com//v1.0/yida/forms/instances' + + headers = { + "Content-Type": "application/json", + "x-acs-dingtalk-access-token": token + } + + payload = { + "appType": "APP_UYZ0KG6L0CCNV80GZ66O", + "systemToken": "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", + "userId": "yida_pub_account", # 曹伟 id + "language": "zh_CN", + "useLatestVersion": "false", + "formInstanceId": formInstanceId, + "updateFormDataJson": json.dumps(data_new, cls=NpEncoder), + + } + + res = requests.put(api, headers=headers, json=payload) + return res + + def get_approval_records(self, token: str, processInstanceId: str, appType="APP_UYZ0KG6L0CCNV80GZ66O", + systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", max_retries=10, delay=2): + """ + 函数功能:获取流程表单的审批记录,适用于"F6客户服务"应用,并且包含重试机制。 + + Args: + token (str): 登录验证token,用于API调用的身份验证。 + processInstanceId (str): 流程实例ID,用于标识需要获取审批记录的具体流程实例。 + appType (str): 应用类型标识符,默认为 "APP_UYZ0KG6L0CCNV80GZ66O" + systemToken (str): 系统token,默认为固定值 + max_retries (int): 最大重试次数,默认为10次 + delay (int): 每次重试之间的延迟秒数,默认为2秒 + + Returns: + dict: 返回从API获取的审批记录的JSON解析结果。通常包括审批步骤、审批人、审批时间等信息。 + """ + attempt = 0 + userId = "yida_pub_account" + api = f'https://api.dingtalk.com/v1.0/yida/processes/operationRecords?appType={appType}&systemToken={systemToken}&userId={userId}&language=zh_CN&processInstanceId={processInstanceId}' + headers = { + "Content-Type": "application/json", + "x-acs-dingtalk-access-token": token + } + + while True: + if attempt >= max_retries: + # error_task_logger.error(f"请求失败,已达最大重试次数 {max_retries},无法获取审批数据,跳过本次请求。") + break + + try: + res = requests.get(api, headers=headers) + res.raise_for_status() # 如果响应状态码不是2xx,则抛出HTTPError + return res.json() + except (requests.exceptions.RequestException, Exception) as e: + # logger.warning(f"请求出现异常: {e}, 正在重试({attempt + 1}/{max_retries})...") + time.sleep(delay) # 等待指定的延迟时间后再次尝试 + attempt += 1 + + def aggree_approval(self, token: str, taskId: str, processInstanceId: str, formData: dict, res_new): + """_summary_ + + 函数功能:同意审批节点 --F6客户服务 应用 + + Args: + token (str): 登录验证token + taskId (str): 获取到的审批节点ID + processInstanceId (str): 读取文件获得的实例ID + formData (dict): 数据样式 + res_new (响应值): 从员工ID表里获取到员工名对应的员工ID + + Returns: + 响应值: 返回请求结果 + """ + api = 'https://api.dingtalk.com/v1.0/yida/tasks/execute' + headers = { + "Content-Type": "application/json", + "x-acs-dingtalk-access-token": token + } + payload = { + "outResult": "AGREE", + "appType": "APP_UYZ0KG6L0CCNV80GZ66O", + "systemToken": "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", + "remark": "同意(接口自动)", + "formDataJson": json.dumps(formData, cls=NpEncoder), + "processInstanceId": processInstanceId, + "userId": res_new, + "language": "zh_CN", + "taskId": int(taskId) + } + + res = requests.post(api, headers=headers, json=payload) + return res + + +api_instance = API() +yd_api_instance = YDAPI() + + +class YDToJDYRenewalToDo(object): + def __init__(self): + self.FORMID = "FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22" + self.appType = "APP_UYZ0KG6L0CCNV80GZ66O" + self.systemToken = "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2" + self.token = yd_api_instance.generateToken() + + def load_all_data(self): + today_midnight = datetime.now() + timedelta(days=1) + yesterday_midnight = today_midnight - timedelta(days=2) + start_time = yesterday_midnight.strftime("%Y-%m-%d") # 昨天 + end_time = today_midnight.strftime("%Y-%m-%d") # 今天 + + yd_data = yd_api_instance.read_processes_instances( + token=self.token, + formUuid=self.FORMID, + page=1, + n=100, + appType=self.appType, + systemToken=self.systemToken, + instanceStatus="", + modifiedFromTimeGMT=start_time, + modifiedToTimeGMT=end_time, + ) + + all_process_list = [] + + PAGES_two = yd_data.get('totalCount') // 100 + 1 + + for a in tqdm(range(1, PAGES_two + 1)): + try: + yd_data = yd_api_instance.read_processes_instances( + token=self.token, + formUuid=self.FORMID, + page=a, + n=100, + appType=self.appType, + systemToken=self.systemToken, + instanceStatus="", + modifiedFromTimeGMT=start_time, + modifiedToTimeGMT=end_time, + ) + all_process_list = all_process_list + yd_data.get("data") + except Exception as e: + print(f"获取流程实例数据时出错: {e}") + continue + + df = pd.DataFrame(all_process_list) + df.to_csv(f"{output_dir}/{start_time}_{end_time}_all_process_list.csv", index=False) + + return all_process_list + + def filter_renewal_data(self, all_process_list): + update_data_list = [] + for item in all_process_list: + if item.get("data").get("textField_kto3q3ev"): + org_id = item.get("data").get("textField_ksydghqw") + order_code = item.get("data").get("textField_kto3q3ev") # 订单编码 + pay_time = item.get("data").get("dateField_kto3q3ex") # 订单支付日期 + + res = yd_api_instance.get_ids_query( + token=self.token, + formInstanceIdList=[item.get("processInstanceId")], + formUuid="FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22", + appType="APP_UYZ0KG6L0CCNV80GZ66O", + systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", + ) + + payment_amount = None + bussiness_type = None + # print(res) + if res.get("result"): + form_data = res["result"][0]["formData"] + payment_amount = form_data.get("textField_kyjy1kkm") # "9987" + bussiness_type = form_data.get("textField_kyjy1kkn") # "续约" + # payment_amount = res.get("data").get("textField_kyjy1kkm") # 支付金额 + # bussiness_type = res.get("data").get("textField_kyjy1kkn") # 业务类型 + + if pay_time: + # 确保是整数 + timestamp_ms = int(pay_time) + # 转为 UTC datetime 对象 + pay_datetime_utc = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) + print(pay_datetime_utc) # 例如: 2024-04-05 12:34:38.901000+00:00 + else: + pay_datetime_utc = None + + update_data_list.append({ + "order_code": order_code, + "pay_time": pay_time, + "payment_amount": payment_amount, + "bussiness_type": bussiness_type, + "org_id": org_id + }) + return update_data_list + + def check_jd_ydy_data(self, update_data_list): + for item in update_data_list: + # 简道云搜索门店编码 + payload = { + "api_key": "675b900991ad2491c69389ca", + "entry_id": "6931063d64187eaf6b927557", + "filter": { + "rel": "and", + "cond": [{ + "field": "_widget_1764820541661", + "type": "text", + "method": "eq", + "value": [item.get("org_id")] + }, {"field": "flowState", + "type": "flowstate", + "method": "eq", + "value": [0]}] + } + } + # print(payload) + data_list = api_instance.entry_data_list(payload).get("data", []) + result = data_list[0] if data_list else None + data_id = result.get("_id") if result else None + + if not data_id: + # print(f"未找到订单 {item.get('order_code')} 的简道云数据,跳过。") + continue + + # 查询实例状态 + instance_status = api_instance.workflow_instance_get({"data_id": data_id}) + task_status = instance_status.get("status", -1) + print(f"订单 {item.get('order_code')} 的简道云流程状态为 {task_status}") + + if task_status == 0: + print("简道云流程正在进行中,执行流程关闭") + api_instance.workflow_instance_end({"data_id": data_id}) + + # 同步宜搭四个字段 + print("开始同步数据") + update_payload = { + "api_key": "675b900991ad2491c69389ca", + "entry_id": "6931063d64187eaf6b927557", + "data_id": data_id, + "data": { + "_widget_1764820541674": {"value": item.get("order_code")}, # 订单编码 + "_widget_1764820541679": {"value": item.get("pay_time")}, # 订单支付日期 + "_widget_1764820541676": {"value": item.get("payment_amount")}, # 支付金额 + "_widget_1764820541680": {"value": item.get("bussiness_type")}, # 业务类型 + } + } + print(update_payload) + api_instance.entry_data_update(update_payload) + print("数据同步完成") + + def main(self): + task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + try: + # step1 获取简道云与宜搭数据 + jd_ydy_data = self.load_all_data() + # step2 过滤已经续约的单子 + update_data_list = self.filter_renewal_data(jd_ydy_data) + # step3 校验简道云是否有进行中的单子并关闭 + self.check_jd_ydy_data(update_data_list) + except Exception as e: + print(e) + + +if __name__ == '__main__': + jd_ydy_renewal_to_do = YDToJDYRenewalToDo() + jd_ydy_renewal_to_do.main() diff --git a/test/续约待办派发.py b/test/续约待办派发.py index c42e1e3..93247ef 100644 --- a/test/续约待办派发.py +++ b/test/续约待办派发.py @@ -83,6 +83,8 @@ class RenewalToDo: "当前所处节点": "_widget_1765352838609", "流程状态": "_widget_1765352838610", "经营模式": "_widget_1765964381952", + "公司等级": "_widget_1766130435561", + "公司id": "_widget_1766631811839", "提交人": "creator", "提交时间": "createTime", "更新时间": "updateTime" @@ -97,8 +99,10 @@ class RenewalToDo: "contacts": "联系人", "contact_mobile": "联系手机号", "service_impl_principal": "专属运营顾问", + "group_grade": "公司等级", "technician": "运营专家", "manage_model": "经营模式", + "id_own_group": "公司id", } self.subform_field_map = { "商品名称": "_widget_1764820541719", @@ -153,8 +157,8 @@ class RenewalToDo: self.province_staff_id_list = [] # 获取已派发续约待办(进行中) - payload = {"api_key": "6694d3c4fcb69ca9a111a6c4", - "entry_id": "6769204a1902c9341340a1bc", + payload = {"api_key": "675b900991ad2491c69389ca", + "entry_id": "6931063d64187eaf6b927557", "filter": {"rel": "and", "cond": [{"field": "flowState", "type": "flowstate", "method": "eq", "value": [0]}]}, } @@ -360,7 +364,7 @@ class RenewalToDo: pd.to_numeric(df_lp['价格'], errors='coerce') .round().fillna(0).astype(int).astype(str) ) - df_lp['类型_价格'] = df_lp['类型'] + df_lp['价格'] + df_lp['类型_价格'] = df_lp['类型'] + ':' + df_lp['价格'] # 2. 按门店聚合,分号连接 agg_df = df_lp.groupby('门店编码', as_index=False)['类型_价格'].apply(';'.join) @@ -499,9 +503,6 @@ class RenewalToDo: data_NGV = self.process_data() # step3:数据派发 self.dispatch_task(data_NGV) - # step4:过期日发生变化更新已有表单 - - # step5:自动同意原表单 common_module.send_task_status(task_start_time, "续约回访待办") except Exception as e: diff --git a/test/续约待办派发测试数据.py b/test/续约待办派发测试数据.py index e69de29..9b28bf1 100644 --- a/test/续约待办派发测试数据.py +++ b/test/续约待办派发测试数据.py @@ -0,0 +1,516 @@ +import os +from datetime import datetime +import pandas as pd +from api import API +from back_ground_module import CommonModule +from log_config import configure_task_logger, configure_error_task_logger +from collections import defaultdict + +logger = configure_task_logger() +error_task_logger = configure_error_task_logger() +api_instance = API() +common_module = CommonModule() +output_dir = "output" # 设置输出目录 +os.makedirs(output_dir, exist_ok=True) + + +class RenewalToDo: + def __init__(self): + self.renewal_data_list = None + self.cyclic_increasing = None + self.franchisee = None + self.last_price = None + self.province_staff_id_list = None + self.json_list = None + self.data_NGV = None + self.staff_id_list = None + self.NGV_data_list = None + self.field_map = { + "关联数据": "_widget_1764820541663", + "公司名称": "_widget_1764820541616", + "门店名称": "_widget_1764820541617", + "门店编码": "_widget_1764820541661", + "加盟商": "_widget_1764820541618", + "过期日": "_widget_1764820541672", + "Saas版本": "_widget_1764820541623", + "上次购买价格": "_widget_1764820541624", + "联系人": "_widget_1764820541621", + "联系手机号": "_widget_1764820541622", + "专属运营顾问": "_widget_1764820541625", + "区域客服": "_widget_1764820541715", + "运营专家": "_widget_1764820541678", + "120天是否跟进": "_widget_1764820541628", + "120天处理人": "_widget_1764820541634", + "120天跟进时间": "_widget_1765352838631", + "60天是否跟进": "_widget_1764820541630", + "60天处理人": "_widget_1764820541635", + "60天跟进时间": "_widget_1765352838632", + "30天是否跟进": "_widget_1764820541632", + "30天处理人": "_widget_1764820541636", + "30天跟进时间": "_widget_1765352838633", + "是否联系上客户": "_widget_1764820541638", + "客户现阶段问题分类": "_widget_1764820541641", + "未联系上原因字段": "_widget_1765330820509", + "联系情况及问题说明": "_widget_1764820541653", + "潜在商机": "_widget_1764820541657", + "商机详情": "_widget_1764820541659", + "门店续约意愿": "_widget_1764820541654", + "不续约原因": "_widget_1764820541700", + "产品原因": "_widget_1764820541707", + "服务问题": "_widget_1764820541709", + "门店原因": "_widget_1764820541711", + "价格原因": "_widget_1764820541713", + "不续约具体情况说明": "_widget_1764820541702", + "回访完成方式": "_widget_1764820541697", + "周期性增购": "_widget_1764820541717", + "周期性增购.商品名称": "_widget_1764820541717._widget_1764820541719", + "周期性增购.分母金额": "_widget_1764820541717._widget_1764820541720", + "周期性增购.应续约日": "_widget_1764820541717._widget_1764820541721", + "周期性增购.上次购买数量": "_widget_1764820541717._widget_1764820541722", + "周期性增购.不续约原因": "_widget_1764820541717._widget_1764820541723", + "周期性增购.是否愿意续约": "_widget_1764820541717._widget_1764820541724", + "周期性增购.续约后订单编码": "_widget_1764820541717._widget_1764820541725", + "订单编码": "_widget_1764820541674", + "订单支付日期": "_widget_1764820541679", + "本次-实付金额(元)": "_widget_1764820541676", + "业务类型(续约、升级)": "_widget_1764820541680", + "连锁门店待办同步处理": "_widget_1764820541681", + "选择需要同步的门店名称": "_widget_1765330820391", + "120天自动流转时间": "_widget_1764820541865", + "60天自动流转时间": "_widget_1765964381895", + "30天自动流转时间": "_widget_1765964381896", + "0天自动流转时间": "_widget_1765964381897", + "当前所处节点": "_widget_1765352838609", + "流程状态": "_widget_1765352838610", + "经营模式": "_widget_1765964381952", + "公司等级": "_widget_1766130435561", + "提交人": "creator", + "提交时间": "createTime", + "更新时间": "updateTime" + } + self.cn_field_map = { + "related_data": "关联数据", + "group_name": "公司名称", + "org_name": "门店名称", + "org_code": "门店编码", + "expiry_time": "过期日", + "saas_edition_fmt": "Saas版本", + "contacts": "联系人", + "contact_mobile": "联系手机号", + "service_impl_principal": "专属运营顾问", + "group_grade": "公司等级", + "technician": "运营专家", + "manage_model": "经营模式", + } + self.subform_field_map = { + "商品名称": "_widget_1764820541719", + "分母金额": "_widget_1764820541720", + "应续约日": "_widget_1764820541721", + "上次购买数量": "_widget_1764820541722", + "不续约原因": "_widget_1764820541723", + "是否愿意续约": "_widget_1764820541724", + "续约后订单编码": "_widget_1764820541725", + # 根据实际需要添加更多字段 + } + self.renewal_list_map ={ + + } + + def load_all_data(self): + """ + 从各类来源加载数据上加载数据 + :return: + """ + + # 数据库获取续约回访数据 + self.data_NGV = pd.read_csv(os.path.join(output_dir, "data_NGV.csv"), encoding="gbk") + + # 获取加盟商信息 + self.franchisee = common_module.get_renewal_franchisee_details() + self.franchisee.to_csv(os.path.join(output_dir, "franchisee.csv")) + # 获取上次购买价格 + self.last_price = common_module.get_renewal_last_price_details() + self.last_price.to_csv(os.path.join(output_dir, "last_price.csv")) + # 周期性增购 + self.cyclic_increasing = common_module.get_cyclic_increasing_renewal_details() + self.cyclic_increasing.to_csv(os.path.join(output_dir, "cyclic_increasing.csv")) + + # 获取NGV数据 + payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "675bb02bd2d53c2034c665e4"} + self.NGV_data_list = api_instance.entry_data_list(payload).get("data") + + # 获取简道云员工id + payload = {"api_key": "6694d3c4fcb69ca9a111a6c4", + "entry_id": "6769204a1902c9341340a1bc", + } + staff_id = api_instance.entry_data_list(payload) + self.staff_id_list = staff_id.get("data") # api请求格式,将数据封装在data字典里 + + # 省市区人员关系表 + payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "676512ac3e54dc3159460c0a"} + json_dict = api_instance.entry_data_list(payload) + if json_dict and "data" in json_dict: + self.province_staff_id_list = json_dict.get("data") + else: + print("加载省市区人员关系表失败") + self.province_staff_id_list = [] + + # 获取已派发续约待办(进行中) + payload = {"api_key": "675b900991ad2491c69389ca", + "entry_id": "6931063d64187eaf6b927557", + "filter": {"rel": "and", + "cond": [{"field": "flowState", "type": "flowstate", "method": "eq", "value": [0]}]}, + } + renewal = api_instance.entry_data_list(payload) + self.renewal_data_list = renewal.get("data") + + @staticmethod + def replace_names_with_staff_ids(df, name_columns, staff_id_list): + """ + 将 DataFrame 中多个姓名列替换为对应的员工ID。 + + :param staff_id_list: 简道云获取到员工id + :param df: pandas.DataFrame + :param name_columns: list[str],需要替换的姓名列名列表,例如 ["col1", "col2"] + :return: 修改后的 DataFrame(原列被替换) + """ + # 1. 构建姓名 -> 员工ID 的映射字典(只做一次) + name_to_id = {} + for item in staff_id_list or []: + name = item.get("_widget_1734942794144") + staff_id = item.get("_widget_1734942794145") + if name and staff_id: + name_to_id[str(name).strip()] = str(staff_id) + + # 2. 对每个指定的列进行替换 + df = df.copy() # 避免修改原始数据 + for col in name_columns: + if col not in df.columns: + continue # 跳过不存在的列 + # 替换:姓名 → ID,找不到的保留原值(可改为 fillna(None)) + df[col] = ( + df[col] + .astype(str) + .str.strip() + .map(name_to_id) + .fillna(df[col]) + ) + return df + + @staticmethod + def row_to_dict(row, field_mapping): + """将一行数据转换为指定格式的字典""" + result = {} + for col_name, widget_id in field_mapping.items(): + if col_name not in row: + continue + value = row[col_name] + + # 处理:如果 value 是容器类型(list, dict, tuple, np.ndarray),不进行 pd.isna 判断 + if isinstance(value, (list, dict, tuple)) or (hasattr(value, '__len__') and not isinstance(value, str)): + clean_value = value + else: + # 标量类型:安全使用 pd.isna + if pd.isna(value): + clean_value = None + elif isinstance(value, pd.Timestamp): + clean_value = value.strftime('%Y-%m-%dT%H:%M:%SZ') + else: + clean_value = value + + # 所有字段统一包 {"value": ...},包括子表单 + result[widget_id] = {"value": clean_value} + return result + + @staticmethod + def en_row_to_cn_row(en_row, en_to_cn_map): + """ + 将英文字段的行数据转换为中文字段的行数据 + + :param en_row: dict 或 pandas.Series,key 为英文字段 + :param en_to_cn_map: dict, 英文字段名 -> 中文字段名 + :return: dict,key 为中文字段名 + """ + cn_row = {} + for en_key, value in en_row.items(): + if en_key in en_to_cn_map: + cn_key = en_to_cn_map[en_key] + cn_row[cn_key] = value + # 可选:忽略无法映射的字段,或记录警告 + return cn_row + + @staticmethod + def get_customer_service_by_location(province_name, city_name, area_name, staff_id_list): + """ + 直接遍历 self.staff_id_list,根据省市区匹配续约回访客服。 + + :return: 客服用户名(str),未找到则返回提示信息 + """ + if not all([province_name, city_name, area_name]): + return "数据缺失: 省市区不完整" + + for item in staff_id_list or []: + try: + prov = item.get('_widget_1734677164861', '').strip() + city = item.get('_widget_1734677164862', '').strip() + area = item.get('_widget_1734677164863', '').strip() + + if (prov == province_name.strip() and + city == city_name.strip() and + area == area_name.strip()): + # 提取客服用户名 + staff_info = item.get('_widget_1734677164869', {}) # 续约回访客服 + username = staff_info.get('username') + return username if username else "数据缺失: 客服用户名为空" + except Exception: + continue # 跳过格式异常的记录 + + return "数据缺失: 未找到对应的续约回访客服" + + def build_subform_records( + self, + df: pd.DataFrame, + group_by_col: str, + field_mapping: dict, + ) -> dict: + """ + 通用子表单预处理函数:将子表单 DataFrame 转换为 {group_key: [subform_record1, subform_record2, ...]} 的字典。 + + :param df: 子表单数据 DataFrame,列名为中文(如 "商品名称", "分母金额") + :param group_by_col: 用于分组的列名(如 "门店编码") + :param field_mapping: 字段映射字典,{中文字段名: widget_id},例如 {"商品名称": "_widget_xxx"} + :return: dict,key 为 group_by_col 的值,value 为该组对应的子表单记录列表, + 每条记录是 {widget_id: {"value": clean_value}} 的 dict + """ + if df.empty: + return defaultdict(list) + + result = defaultdict(list) + target_fields = set(field_mapping.keys()) + + for _, row in df.iterrows(): + row_dict = row.to_dict() + group_key = row_dict.get(group_by_col) + + if not group_key or (isinstance(group_key, str) and group_key.strip() == ""): + warning_msg = f"子表单行缺少分组字段 '{group_by_col}',跳过: {row_dict}" + + # 构建单条子表单记录 + sub_record = {} + for field_cn, widget_id in field_mapping.items(): + val = row_dict.get(field_cn) + + # 清理值 + if pd.isna(val): + clean_val = None + elif hasattr(val, 'to_eng_string'): # Decimal + try: + clean_val = float(val) + except (ValueError, TypeError): + clean_val = str(val) + elif isinstance(val, pd.Timestamp): + clean_val = val.strftime('%Y-%m-%d %H:%M:%S') + else: + clean_val = val + + sub_record[widget_id] = {"value": clean_val} + + result[group_key].append(sub_record) + + return result + + def process_data(self): + """ + 数据处理加工 + :return: 处理后的 DataFrame,列名为中文 + """ + data_NGV = self.data_NGV.copy() # 避免修改原始数据 + + # === 将英文字段名替换为中文字段名 === + # 但只重命名存在的列 + rename_map = {en: cn for en, cn in self.cn_field_map.items() if en in data_NGV.columns} + data_NGV.rename(columns=rename_map, inplace=True) + + # 日期字段处理(使用中文列名) + time_columns = ['过期日'] + data_NGV[time_columns] = data_NGV[time_columns].apply( + lambda col: pd.to_datetime(col, errors='coerce') + .dt.tz_localize('Asia/Shanghai') + .dt.tz_convert('UTC') + ) + + # 新增4列:辅助时间字段 + data_NGV['120天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=60) + data_NGV['60天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=30) + data_NGV['30天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=0) + data_NGV['0天自动流转时间'] = data_NGV['过期日'] + pd.Timedelta(days=90) + # 格式化为字符串(去掉时区) + for col in ['过期日', '120天自动流转时间', '60天自动流转时间', '30天自动流转时间', '0天自动流转时间']: + data_NGV[col] = data_NGV[col].dt.strftime('%Y-%m-%d %H:%M:%S') + + # 新增加盟商列 + data_NGV = data_NGV.merge( + self.franchisee[['门店编码', '加盟商']], + on='门店编码', + how='left' + ) + + # 新增上次购买价格列 + # 1. 清洗并拼接类型+价格 + df_lp = self.last_price[['门店编码', '类型', '价格']].copy() + df_lp['类型'] = df_lp['类型'].fillna('').astype(str) + df_lp['价格'] = ( + pd.to_numeric(df_lp['价格'], errors='coerce') + .round().fillna(0).astype(int).astype(str) + ) + df_lp['类型_价格'] = df_lp['类型'] + df_lp['价格'] + + # 2. 按门店聚合,分号连接 + agg_df = df_lp.groupby('门店编码', as_index=False)['类型_价格'].apply(';'.join) + + # 3. 合并回主表 + data_NGV = data_NGV.merge(agg_df, on='门店编码', how='left').fillna({'类型_价格': ''}) + data_NGV.rename(columns={'类型_价格': '上次购买价格'}, inplace=True) + + # 4. 处理没有匹配记录的门店(填空或默认值) + data_NGV['上次购买价格'] = data_NGV['上次购买价格'].fillna('') + + # 成员字段替换(现在列名是中文) + staff_name_cols = [ + "专属运营顾问", + "运营专家", + ] + data_NGV = self.replace_names_with_staff_ids(data_NGV, staff_name_cols, self.staff_id_list) + + return data_NGV + + def dispatch_task(self, data_NGV): + """ + 拆分为三个独立动作(输入 data_NGV 列名为中文): + 1. 获取关联数据(NGV_data_id) + 2. 获取区域客服(regional_customer_service) + 3. 字段映射与格式化(中文 → widget),正确处理子表单 + """ + records = [] + no_customer_service_data = [] + + # === 使用通用函数预处理周期性增购子表单 === + cyclic_subforms = self.build_subform_records( + df=self.cyclic_increasing, + group_by_col="门店编码", + field_mapping=self.subform_field_map, + ) + + # === Step 1: 构建 门店编码 → NGV 数据ID 映射 === + org_code_to_ngv_id = {} + for ngv_item in self.NGV_data_list or []: + org_code = ngv_item.get("_widget_1734062123071") + ngv_id = ngv_item.get("_id") + if org_code and ngv_id: + org_code_to_ngv_id[org_code] = ngv_id + + # === Step 2: 定义获取区域客服的函数 === + def get_regional_customer_service(row): + province = row.get("省份") or row.get("province_name") + city = row.get("城市") or row.get("city_name") + area = row.get("区县") or row.get("district_name") or row.get("area_name") + org_code = row.get("门店编码") + + # 若省市区缺失,尝试从 NGV 补全 + if not all([province, city, area]) or any( + v in [None, '', 'None', 'NA'] for v in [province, city, area] + ): + ngv_record = next( + (item for item in self.NGV_data_list + if item.get("_widget_1734062123071") == org_code), + None + ) + if ngv_record: + province = ngv_record.get("_widget_1734062123090") + city = ngv_record.get("_widget_1734062123092") + area = ngv_record.get("_widget_1734062123094") + logger.info(f"【从NGV补全省市区】门店 {org_code}: {province}, {city}, {area}") + + if not all([province, city, area]) or any( + v in [None, '', 'None', 'NA'] for v in [province, city, area] + ): + logger.warning(f"【省市区信息缺失】门店 {org_code} 省市区不完整,客服设为空") + return None + + customer_service = self.get_customer_service_by_location( + str(province).strip(), + str(city).strip(), + str(area).strip(), + self.province_staff_id_list + ) + + if customer_service and "数据缺失" not in str(customer_service): + logger.info(f"【派发客服】门店 {org_code} 派发给客服: {customer_service}") + return customer_service + else: + logger.warning(f"未找到区域客服,请检查门店编码: {org_code}") + return None + + # === Step 3: 遍历主表每一行,构建最终提交记录 === + for _, row in data_NGV.iterrows(): + row_dict = row.to_dict() + + # 3.1 关联数据(NGV ID) + org_code = row_dict.get("门店编码") + ngv_id = org_code_to_ngv_id.get(org_code) + row_dict["关联数据"] = ngv_id if ngv_id else None + if not ngv_id: + logger.warning(f"未找到关联数据,请检查门店编码: {org_code}") + + # 3.2 区域客服 + customer_service = get_regional_customer_service(row_dict) + row_dict["区域客服"] = customer_service + if not customer_service: + no_customer_service_data.append(row_dict) + + # 3.3 注入周期性增购子表单 + row_dict["周期性增购"] = cyclic_subforms.get(org_code, []) + + # 3.4 转换为 widget 格式 + widget_record = self.row_to_dict(row_dict, self.field_map) + records.append(widget_record) + + # === Step 4: 批量提交 === + if not records: + logger.info("无数据需要派发") + return + + payload = { + "api_key": "675b900991ad2491c69389ca", + "entry_id": "6931063d64187eaf6b927557", + "data_list": records + } + print(payload) + + api_instance.entry_data_batch_create(payload) + logger.info(f"已提交 {len(records)} 条数据进行派发") + + def main(self): + + task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + try: + logger.info("任务开始") + # step1: 获取数据 + self.load_all_data() + logger.info("加载数据完成") + # step2:数据处理 + data_NGV = self.process_data() + # step3:数据派发 + self.dispatch_task(data_NGV) + # step4:过期日发生变化更新已有表单 + + # step5:自动同意原表单 + + common_module.send_task_status(task_start_time, "续约回访待办") + except Exception as e: + error_task_logger.error(f"续约回访待办发生错误{e}") + # common_module.send_task_error(task_start_time, "续约回访待办", str(e)) + + +if __name__ == '__main__': + RenewalToDo().main() diff --git a/test/续约待办简道云回传宜搭.py b/test/续约待办简道云回传宜搭.py new file mode 100644 index 0000000..5ae9e3b --- /dev/null +++ b/test/续约待办简道云回传宜搭.py @@ -0,0 +1,833 @@ +from datetime import datetime, timezone +import pandas as pd +import os +import pytz +from typing import Dict +import requests +import json +import time +import numpy as np # 导入numpy库用于处理numpy数组 + +output_dir = "output" # 设置输出目录 +os.makedirs(output_dir, exist_ok=True) + +class Config: + JIANDAOYUN_API_TOKEN = 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN' # token + + +class API: + def entry_data_list(self, data: dict, replace: bool = False, max_retries: int = 20) -> Dict: # 获取多条表单数据 + """ + 获取多条表单数据 + :param max_retries: 最大重试次数 + :param replace: 是否替换字段 + :param data: + api_key: 应用id + entry_id: 表单id + :return: + """ + + url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 app_key + 'Content-Type': 'application/json' + } + all_data_batches = [] # 用于存储每次请求返回的数据批次 + last_data_id = None + exit_flag = False + while True: + payload = json.dumps({ + "app_id": data['api_key'], # 应用ID + "entry_id": data['entry_id'], # 表单ID + "limit": 90, + "data_id": last_data_id, + "filter": data.get('filter', None) + }) + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常 + data_get = res.json() + if data_get["data"]: + all_data_batches.extend(data_get['data']) + last_data_id = data_get['data'][-1].get('_id') + print(f"已获取 {len(all_data_batches)} 条数据") + break # 成功则跳出循环 + else: + if 'data' not in data_get or len(data_get['data']) == 0: + exit_flag = True + break + # logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(0.1) # 在重试之间稍作停顿 + except requests.exceptions.RequestException as e: + # logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(0.1) # 在重试之间稍作停顿 + if retries > max_retries: + # error_task_logger.error(f"任务 {last_data_id}组 连续{max_retries}次请求失败,放弃此次请求。") + all_data_batches.append(None) # 或者可以选择记录失败的payload以便后续处理 + + if exit_flag: + break + + # 构建最终返回的字典 + final_data = { + 'data': all_data_batches # 'data' 键对应的值是列表的列表 + } + # logger.info(f"获取了{len(all_data_batches)}条数据") + if replace: + print("进行了替换") + return_data = self.field_replacement(data, final_data) # 字段替换,由id替换为标签名 + + return return_data + else: + return final_data + + def field_replacement(self, data: dict, data_get: dict) -> dict: + """ + 字段替换,将id替换为标签名,即唯一值替换为表单中显示字段的名字 + :param data: 简道云插件发送过来的data,包含表单id、数据id、应用id + :param data_get: 简道云请求的数据,一般是根据数据id获取到表单的数据 + :return: 将根据数据id获取到的表单数据,进行替换,返回替换后的数据 + """ + + # 获取表单对应字段标签名称 + widget_list = self.entry_widget_list(data) + + # 检查widget_list是否有效 + if not widget_list or 'widgets' not in widget_list or not isinstance(widget_list['widgets'], list): + raise ValueError("映射表没有接受到数据") + + # 创建一个映射表,将_widget_名称映射到label + name_to_label = {widget['name']: widget['label'] for widget in widget_list['widgets']} + + def replace_keys(obj): + """递归替换字典中的键名""" + if isinstance(obj, dict): + new_dict = {} + for key, value in obj.items(): + new_key = name_to_label.get(key, key) + new_dict[new_key] = replace_keys(value) + return new_dict + elif isinstance(obj, list): + return [replace_keys(item) for item in obj] + else: + return obj + + # 复制 data_get,避免修改原始数据 + data_get_copy = json.loads(json.dumps(data_get)) # 深拷贝 + + if 'data' in data_get_copy: + data_get_copy['data'] = replace_keys(data_get_copy['data']) + + return data_get_copy + + @staticmethod + def workflow_instance_get(data: dict, max_retries: int = 20) -> dict: + """ + 查询实例流程信息 + :param max_retries: + :param data: 简道云插件发送过来的data,包含应用id + :return: 查询简道云流程实例信息返回的结果 + """ + url = 'https://api.jiandaoyun.com/api/v6/workflow/instance/get' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey + 'Content-Type': 'application/json' + } + + payload = json.dumps({ + "instance_id": data['data_id'], + "tasks_type": 1 + } + ) + print("payload:", payload) + data_get = None + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + # res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常 + data_get = res.json() + # print( "返回结果:", data_get) + if res.status_code == 200: + break # 成功则跳出循环 + else: + # logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(3) # 在重试之间稍作停顿 + except requests.exceptions.RequestException as e: + # logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(0.1) # 在重试之间稍作停顿 + if retries > max_retries: + # error_task_logger.error(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。") + print("请求失败") + + return data_get + + @staticmethod + def entry_data_update(data: dict, max_retries: int = 20) -> dict: # 修改数据 + """ + 修改数据 + :param max_retries: 最大重试次数,此处设置100次 + :param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息 + :return: 修改数据后简道云返回的结果 + """ + url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/update' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey + 'Content-Type': 'application/json' + } + + payload = json.dumps({ + "app_id": data['api_key'], # 应用ID + "entry_id": data['entry_id'], # 表单ID + "data_id": data['data_id'], # 数据ID + "data": data['data'] + } + ) + + data_get = None + retries = 0 + while retries <= max_retries: + try: + res: requests.Response = requests.post(url=url, data=payload, headers=headers, timeout=10) + res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常 + data_get = res.json() + # print(data_get) + if res.status_code == 200: + break # 成功则跳出循环 + else: + # logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(3) # 在重试之间稍作停顿 + except requests.exceptions.RequestException as e: + # logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(10) # 在重试之间稍作停顿 + if retries > max_retries: + # error_task_logger.error(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。") + continue + return data_get + + +class NpEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + return float(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + else: + return super(NpEncoder, self).default(obj) + + +class YDAPI: + appKey = "ding5kqocon5s9oph5uq" + appSecret = "HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_" + + def generateToken(self) -> str: + """ + 函数功能:生成访问令牌(token) + + Returns: + str: 返回生成的访问令牌字符串。此token用于后续API调用的身份验证。 + """ + token_api = 'https://api.dingtalk.com/v1.0/oauth2/accessToken' + data = { + "appKey": f"{self.appKey}", + "appSecret": f'{self.appSecret}' + } + res = requests.post(token_api, json=data) + token = res.json().get('accessToken') + return token + + def read_processes_instances(self, token, formUuid, page, n, appType="APP_UYZ0KG6L0CCNV80GZ66O", + systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", instanceStatus="RUNNING", + max_retries=10, delay=2, createFromTimeGMT=None, createToTimeGMT=None, + modifiedFromTimeGMT=None, + modifiedToTimeGMT=None, searchFieldJson={}): + """ + 函数功能:读取流程表单的所有数据,并加入重试机制。 + + Args: + token (str): 登录验证token,用于API调用的身份验证。 + formUuid (str): 表单唯一标识符,用于指定需要读取哪个表单的实例数据。 + page (int): 分页参数,指定请求的数据页码。 + n (int): 每页显示的数据条数。 + appType (str): 应用类型标识符,默认为 "APP_UYZ0KG6L0CCNV80GZ66O" + systemToken (str): 系统token,默认为固定值 + instanceStatus (str): 流程实例状态,默认为"RUNNING" + max_retries (int): 最大重试次数,默认为10次 + delay (int): 每次重试之间的延迟秒数,默认为2秒 + + Returns: + dict: 返回从API获取的流程表单实例数据的JSON解析结果。 + + Raises: + Exception: 如果达到最大重试次数仍未成功,则抛出异常。 + """ + + attempt = 0 + api = f'https://api.dingtalk.com/v1.0/yida/processes/instances?pageNumber={page}&pageSize={n}' + headers = { + "Content-Type": "application/json", + "x-acs-dingtalk-access-token": token + } + formData = { + "appType": appType, + "systemToken": systemToken, + "userId": "yida_pub_account", # 超级管理员账号 + "language": "zh_CN", + "formUuid": formUuid, + "instanceStatus": instanceStatus, # 运行中 + "createFromTimeGMT": createFromTimeGMT, + "createToTimeGMT": createToTimeGMT, + "modifiedFromTimeGMT": modifiedFromTimeGMT, + "modifiedToTimeGMT": modifiedToTimeGMT, + "searchFieldJson": json.dumps( + searchFieldJson + ) + } + # print(formData) + + while True: + if attempt >= max_retries: + # error_task_logger.error(f"请求失败,已达最大重试次数 {max_retries},无法获取流程实例数据,跳过本次请求。") + break + + try: + res = requests.post(api, headers=headers, json=formData) + # print(res.json()) + res.raise_for_status() # 如果返回状态码不是2xx,抛出异常 + return res.json() + + except requests.exceptions.RequestException as e: + # logger.warning(f"请求异常: {e},正在尝试第 {attempt + 1} 次重试...") + time.sleep(delay) + attempt += 1 + + def update_from(self, token, formInstanceId, data_new): + """ + 函数功能:更新表单内容 + + Args: + token (str): 登录验证token,用于API调用的身份验证。 + formInstanceId (str): 表单实例ID,读文件获取。 + data_new (dict): 新的数据内容,用于替换现有表单实例中的数据。读文件获取。 + + Returns: + Response: 返回API请求的响应对象。 + """ + + api = f'https://api.dingtalk.com//v1.0/yida/forms/instances' + + headers = { + "Content-Type": "application/json", + "x-acs-dingtalk-access-token": token + } + + payload = { + "appType": "APP_UYZ0KG6L0CCNV80GZ66O", + "systemToken": "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", + "userId": "yida_pub_account", # 曹伟 id + "language": "zh_CN", + "useLatestVersion": "false", + "formInstanceId": formInstanceId, + "updateFormDataJson": json.dumps(data_new, cls=NpEncoder), + + } + + res = requests.put(api, headers=headers, json=payload) + return res + + def get_approval_records(self, token: str, processInstanceId: str, appType="APP_UYZ0KG6L0CCNV80GZ66O", + systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", max_retries=10, delay=2): + """ + 函数功能:获取流程表单的审批记录,适用于"F6客户服务"应用,并且包含重试机制。 + + Args: + token (str): 登录验证token,用于API调用的身份验证。 + processInstanceId (str): 流程实例ID,用于标识需要获取审批记录的具体流程实例。 + appType (str): 应用类型标识符,默认为 "APP_UYZ0KG6L0CCNV80GZ66O" + systemToken (str): 系统token,默认为固定值 + max_retries (int): 最大重试次数,默认为10次 + delay (int): 每次重试之间的延迟秒数,默认为2秒 + + Returns: + dict: 返回从API获取的审批记录的JSON解析结果。通常包括审批步骤、审批人、审批时间等信息。 + """ + attempt = 0 + userId = "yida_pub_account" + api = f'https://api.dingtalk.com/v1.0/yida/processes/operationRecords?appType={appType}&systemToken={systemToken}&userId={userId}&language=zh_CN&processInstanceId={processInstanceId}' + headers = { + "Content-Type": "application/json", + "x-acs-dingtalk-access-token": token + } + + while True: + if attempt >= max_retries: + # error_task_logger.error(f"请求失败,已达最大重试次数 {max_retries},无法获取审批数据,跳过本次请求。") + break + + try: + res = requests.get(api, headers=headers) + res.raise_for_status() # 如果响应状态码不是2xx,则抛出HTTPError + return res.json() + except (requests.exceptions.RequestException, Exception) as e: + # logger.warning(f"请求出现异常: {e}, 正在重试({attempt + 1}/{max_retries})...") + time.sleep(delay) # 等待指定的延迟时间后再次尝试 + attempt += 1 + + def aggree_approval(self, token: str, taskId: str, processInstanceId: str, formData: dict, res_new): + """_summary_ + + 函数功能:同意审批节点 --F6客户服务 应用 + + Args: + token (str): 登录验证token + taskId (str): 获取到的审批节点ID + processInstanceId (str): 读取文件获得的实例ID + formData (dict): 数据样式 + res_new (响应值): 从员工ID表里获取到员工名对应的员工ID + + Returns: + 响应值: 返回请求结果 + """ + api = 'https://api.dingtalk.com/v1.0/yida/tasks/execute' + headers = { + "Content-Type": "application/json", + "x-acs-dingtalk-access-token": token + } + payload = { + "outResult": "AGREE", + "appType": "APP_UYZ0KG6L0CCNV80GZ66O", + "systemToken": "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", + "remark": "同意(接口自动)", + "formDataJson": json.dumps(formData, cls=NpEncoder), + "processInstanceId": processInstanceId, + "userId": res_new, + "language": "zh_CN", + "taskId": int(taskId) + } + + res = requests.post(api, headers=headers, json=payload) + return res + + +api_instance = API() +yd_api_instance = YDAPI() + + +class JDYToYDRenewalToDo(object): + def __init__(self): + self.yd_renewal_data_df = None + self.renewal_data_df = None + self.yd_renewal_data_list = None + self.renewal_data_list = None + self.FORMID = "FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22" + self.appType = "APP_UYZ0KG6L0CCNV80GZ66O" + self.systemToken = "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2" + self.token = yd_api_instance.generateToken() + # 简道云/宜搭节点顺序(越靠近 0 天数值越大,表示流程更靠后) + # 120 → 60 → 30 → 0 + self.stage_order = {"120": 0, "60": 1, "30": 2, "0": 3} + self.follow_up_fields = { + "120天是否跟进": "_widget_1764820541628", + "120天处理人": "_widget_1764820541634", + "120天跟进时间": "_widget_1765352838631", + "60天是否跟进": "_widget_1764820541630", + "60天处理人": "_widget_1764820541635", + "60天跟进时间": "_widget_1765352838632", + "30天是否跟进": "_widget_1764820541632", + "30天处理人": "_widget_1764820541636", + "30天跟进时间": "_widget_1765352838633", + "数据ID": "_id" + } + + def load_all_data(self): + # 获取简道云已派发续约待办,若无数据直接返回 + today_utc = datetime.now(timezone.utc).strftime("%Y-%m-%d") + payload = {"api_key": "675b900991ad2491c69389ca", + "entry_id": "6931063d64187eaf6b927557", + "filter": {"rel": "and", + "cond": [{"field": "updateTime", "type": "range", "method": "eq", "value": [today_utc]}, + {"field": "_widget_1766469131897", "type": "range", "method": "eq", + "value": ["否"]}]}, + } + renewal = api_instance.entry_data_list(payload) + self.renewal_data_list = renewal.get("data") or [] + if not self.renewal_data_list: + self.renewal_data_df = pd.DataFrame() + return + self.renewal_data_df = pd.DataFrame(self.renewal_data_list) + self.renewal_data_df.to_csv(os.path.join(output_dir, "renewal_data_list.csv")) + + def search_yd_renewal_data(self): + # Step 1: 根据简道云获取宜搭数据信息 + if self.renewal_data_df is None or self.renewal_data_df.empty: + print("简道云无待办数据") + return [] + + all_data = [] + for _, row in self.renewal_data_df.iterrows(): + yd_data = yd_api_instance.read_processes_instances( + token=self.token, + formUuid=self.FORMID, + page=1, + n=100, + appType=self.appType, + systemToken=self.systemToken, + instanceStatus="", + searchFieldJson={"textField_ksydghqw": row["_widget_1764820541661"]}, + ).get("data", []) + + for record in yd_data: + enriched = {**record} + enriched.update({k: row.get(v, "") for k, v in self.follow_up_fields.items()}) + all_data.append(enriched) + + if not all_data: + print("未获取到任何宜搭数据") + return [] + + df = pd.DataFrame(all_data) + df.to_csv(os.path.join(output_dir, "yd_renewal_data.csv"), index=False) + + # Step 2: 过滤进行中的待办 + df = df[df["instanceStatus"] == "RUNNING"].copy() + + if df.empty: + print("没有 RUNNING 状态的流程实例") + return [] + + # Step 3: 提取过期日期和 org_code + df['expire_date'] = pd.to_datetime( + df['data'].apply(lambda x: x.get('dateField_ksirro5l')), + unit='ms', + errors='coerce' + ) + df['org_code'] = df['data'].apply(lambda x: x.get('textField_ksydghqw')) + + # 按 org_code 分组,每组取 expire_date 最晚的一条 + yd_update_list = [] # 用于存储最终要更新的数据 + + for org_code, group in df.groupby('org_code'): + group_valid = group.dropna(subset=['expire_date']) + if group_valid.empty: + continue + latest_row = group_valid.loc[group_valid['expire_date'].idxmax()] + + fields_to_map = {"120天是否跟进", "60天是否跟进", "30天是否跟进"} + + # 组装要修改的信息:包含 processInstanceId + 所有跟进字段 + update_info = { + "processInstanceId": latest_row["processInstanceId"], + "org_code": org_code, + } + # 添加跟进字段(这些已经在 enriched_record 中) + # 添加跟进字段,并对特定字段做映射 + for field_name in self.follow_up_fields.keys(): + value = latest_row.get(field_name) + + # 处理 NaN 或 None + if pd.isna(value): + final_value = "" + else: + str_val = str(value).strip() + # 如果是需要映射的字段,进行转换 + if field_name in fields_to_map: + if str_val == "主动": + final_value = "小六" + elif str_val == "自动": + final_value = "系统" + else: + final_value = str_val # 保留原始值(如“是”、“否”等) + else: + final_value = str_val + + update_info[field_name] = final_value + + yd_update_list.append(update_info) + + # 可选:保存待更新清单 + if yd_update_list: + update_df = pd.DataFrame(yd_update_list) + update_df.to_csv(os.path.join(output_dir, "yd_to_update.csv"), index=False) + + return yd_update_list # 返回结构化数据供后续调用更新接口 + + def yd_process_data(self, yd_update_list): + + TIME_FIELDS = ["120天跟进时间", "60天跟进时间", "30天跟进时间"] + # === 4. 直接遍历 yd_update_list 中的 item(字段在顶层)=== + for item in yd_update_list: + # 转换UTC时间字符串为时间戳 + for field in TIME_FIELDS: + if field in item and item[field]: + utc_str = str(item[field]).strip() + try: + if utc_str.endswith("Z"): + utc_str = utc_str[:-1] + "+00:00" + dt = datetime.fromisoformat(utc_str) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=pytz.UTC) + else: + dt = dt.astimezone(pytz.UTC) + item[field] = int(dt.timestamp() * 1000) # ⏰ 转为秒级时间戳 + except Exception as e: + print(f"[时间解析错误] 字段: {field}, 值: {utc_str}, 错误: {e}") + + return yd_update_list + + def update_yd_renewal_data(self, yd_update_list): + print(yd_update_list) + import ast + + def extract_user_id(user_obj): + # 如果是字符串,尝试解析为 dict + if isinstance(user_obj, str): + if user_obj.strip().startswith("{"): + try: + user_obj = ast.literal_eval(user_obj) + except (ValueError, SyntaxError): + return "" # 无法解析,返回空 + + if isinstance(user_obj, dict): + return user_obj.get("name") + elif isinstance(user_obj, str): + return user_obj # 可能已经是 + else: + return "" + + yd_update_list = self.yd_process_data(yd_update_list) + for item in yd_update_list: + field_mapping = { + "120天是否跟进": "radioField_kuntp6fm", + "120天处理人": "textField_livc8bjj", + "120天跟进时间": "dateField_lifr1fdv", + "60天是否跟进": "radioField_kurxyhvp", + "60天处理人": "textField_livc8bjl", + "60天跟进时间": "dateField_lifr1fdx", + "30天是否跟进": "radioField_kurxyhvq", + "30天处理人": "textField_livc8bjm", + "30天跟进时间": "dateField_lifr1fdy", + } + item["120天处理人"] = extract_user_id(item["120天处理人"]) + item["60天处理人"] = extract_user_id(item["60天处理人"]) + item["30天处理人"] = extract_user_id(item["30天处理人"]) + update_json = {v: item.get(k, "") for k, v in field_mapping.items()} + print(f"更新数据:{update_json}") + yd_api_instance.update_from( + token=self.token, + formInstanceId=item["processInstanceId"], + data_new=update_json, + ) + + def update_jd_ydy_renewal_to_do_status(self, yd_update_list): + """ + 简道云进度为准:流程节点分 120/60/30/0。若宜搭落后则在宜搭侧自动同意推进到与简道云一致;否则不操作。 + """ + + def parse_dt(val): + try: + if val is None or val == "": + return datetime.min.replace(tzinfo=timezone.utc) + # 兼容带 Z 的 UTC 写法 + dt = datetime.fromisoformat(str(val).replace("Z", "+00:00")) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + except Exception: + return datetime.min.replace(tzinfo=timezone.utc) + + def extract_stage_from_text(text: str): + """ + 根据节点名称中包含的数字段(120/60/30/0)识别阶段,兼容“联系情况/处理情况”等后缀。 + """ + text = (text or "").strip() + # 区间节点映射:120-60 视为已进入 60 阶段;60-30 视为 30;30-0 视为 0 + if "120-60" in text: + return "60" + if "60-30" in text: + return "30" + if "30-0" in text: + return "0" + for key in self.stage_order.keys(): + if key in text: + return key + if f"{key}天" in text: + return key + if f"{key}天联系" in text: + return key + if f"{key}天处理" in text: + return key + return None + + def fallback_stage_from_obj(obj): + """ + 在未知结构的返回中深度搜索含阶段数字的字符串,返回“最靠后的阶段”(stage_order 值最大)。 + 防止找到第一个 120 就返回,导致明明已进入 60 却误判为 120。 + """ + found = [] + + def _iter(o): + if isinstance(o, dict): + for v in o.values(): + yield from _iter(v) + elif isinstance(o, list): + for v in o: + yield from _iter(v) + elif isinstance(o, str): + yield o + else: + yield str(o) + + for text in _iter(obj): + stage = extract_stage_from_text(text) + if stage is not None: + found.append(stage) + if not found: + return None + # 取 stage_order 中“值最大”的阶段,表示流程更靠后 + return max(found, key=lambda s: self.stage_order.get(s, -1)) + + for item in yd_update_list: + # 1) 简道云流程日志(权威) + data = {"data_id": item["数据ID"]} + jdy_workflow_data = api_instance.workflow_instance_get(data) or {} + print("简道云流程日志:", jdy_workflow_data) + jdy_logs = jdy_workflow_data.get("logs", []) + jdy_logs = sorted( + jdy_logs, + key=lambda x: parse_dt(x.get("finish_time") or x.get("create_time")), + reverse=True, + ) + tasks = jdy_workflow_data.get("tasks", []) or [] + pending = [t for t in tasks if t.get("status") == 0] + task_candidate = (pending[0] if pending else None) or (sorted( + tasks, + key=lambda x: parse_dt(x.get("create_time")), + reverse=True + )[0] if tasks else {}) + jdy_result_records = jdy_workflow_data.get("result", []) or [] + + jdy_stage_candidates = [ + extract_stage_from_text(str(jdy_logs[0].get("flow_name", ""))) if jdy_logs else None, + extract_stage_from_text( + str(task_candidate.get("flow_name", "")) + str(task_candidate.get("title", "")) + ) if task_candidate else None, + ] + jdy_stage_candidates.extend( + extract_stage_from_text(str(rec.get("showName", "")) + str(rec.get("activityId", ""))) + for rec in jdy_result_records + ) + jdy_stage_candidates.append(fallback_stage_from_obj(jdy_workflow_data)) + jdy_stage = next((s for s in jdy_stage_candidates if s), None) + + # 2) 宜搭流程日志 + yd_workflow_data = yd_api_instance.get_approval_records( + token=self.token, + processInstanceId=item["processInstanceId"], + appType=self.appType, + systemToken=self.systemToken + ) or {} + print("宜搭流程日志:", yd_workflow_data) + yd_results = yd_workflow_data.get("result", []) or [] + + # 展开宜搭 domainList 以获取所有动作 + yd_records = [ + child + for rec in yd_results + for child in ([rec] + (rec.get("domainList", []) or [])) + ] + yd_records = sorted( + yd_records, + key=lambda x: parse_dt(x.get("operateTimeGMT") or x.get("activeTimeGMT")), + reverse=True, + ) + # 优先使用当前待办(type == TODO),否则用最新一条 + yd_todo = next((r for r in yd_records if str(r.get("type")).upper() == "TODO"), None) + yd_latest = yd_todo or (yd_records[0] if yd_records else {}) + yd_stage = extract_stage_from_text( + str(yd_latest.get("showName", "")) + str(yd_latest.get("activityId", "")) + ) + + # 若无法识别阶段,跳过防止误操作 + if jdy_stage is None or yd_stage is None: + print(f"无法识别阶段,跳过同步 data_id={item.get('数据ID')}, jdy_stage={jdy_stage}, yd_stage={yd_stage}") + continue + + # 3) 比较阶段:仅在简道云领先时才跳转;否则直接跳过 + jdy_idx = self.stage_order.get(jdy_stage) + yd_idx = self.stage_order.get(yd_stage) + if jdy_idx is None or yd_idx is None: + print(f"阶段映射缺失,跳过 data_id={item.get('数据ID')}, jdy_stage={jdy_stage}, yd_stage={yd_stage}") + continue + if jdy_idx == yd_idx: + print(f"阶段一致,无需跳转 data_id={item.get('数据ID')}, stage={jdy_stage}") + continue + if jdy_idx < yd_idx: + print(f"简道云未领先或已落后,跳过 data_id={item.get('数据ID')}, jdy={jdy_stage}, yd={yd_stage}") + continue + + # jdy_idx > yd_idx,宜搭落后,执行自动同意 + task_id = yd_latest.get("taskId") + operator_user_id = yd_latest.get("operatorUserId") or yd_latest.get("operator") + if not task_id or not operator_user_id: + print(f"缺少 taskId 或 operatorUserId,无法自动同意 processInstanceId={item['processInstanceId']}") + continue + try: + yd_api_instance.aggree_approval( + token=self.token, + taskId=task_id, + processInstanceId=item["processInstanceId"], + formData={}, # 无需修改表单时传空字典 + res_new=operator_user_id + ) + print(f"宜搭已自动同意至 下一个 节点,processInstanceId={item['processInstanceId']}") + except Exception as e: + print(f"宜搭自动同意失败 processInstanceId={item['processInstanceId']} err={e}") + + def retrun_jdy(self, yd_update_list): + for item in yd_update_list: + data = { + "api_key": "675b900991ad2491c69389ca", + "entry_id": "6931063d64187eaf6b927557", + "data_id": item.get('数据ID'), + "data": + { + "_widget_1766469131897": {"value": "是"}, + } + } + api_instance.entry_data_update(data) + print(f"已返回 data_id={item.get('数据ID')}") + + def main(self): + task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + try: + # step1 获取简道云与宜搭数据 + self.load_all_data() + # step2 根据简道云门店编码精确搜索宜搭数据 + yd_update_list = self.search_yd_renewal_data() + # step3 api修改宜搭数据 + self.update_yd_renewal_data(yd_update_list) + # step4 简道云与宜搭流程节点保持一致 + self.update_jd_ydy_renewal_to_do_status(yd_update_list) + # step5 简道云标记已处理 + self.retrun_jdy(yd_update_list) + + except Exception as e: + print(e) + + +if __name__ == '__main__': + jd_ydy_renewal_to_do = JDYToYDRenewalToDo() + jd_ydy_renewal_to_do.main() diff --git a/tools/BI.ipynb b/tools/BI.ipynb index 6d3c80f..dd36879 100644 --- a/tools/BI.ipynb +++ b/tools/BI.ipynb @@ -257,8 +257,8 @@ { "metadata": { "ExecuteTime": { - "end_time": "2025-11-07T02:53:28.920026Z", - "start_time": "2025-11-07T02:53:28.652830Z" + "end_time": "2025-12-25T06:47:43.233398500Z", + "start_time": "2025-12-25T06:47:42.837753600Z" } }, "cell_type": "code", @@ -274,8 +274,8 @@ " } # 衡时数据库链接配置-mysql\n", "# table_name = \"new_dealer_service_order_to_bi\" # 替换为你的实际表名\n", "\n", - "table_name = \"non_standard_performance_to_BI\"\n", - "column_name = \"商品名称\"\n", + "table_name = \"partner_settlement_to_BI\"\n", + "column_name = \"是否同步\"\n", "new_column_type = \"VARCHAR(255)\" # 目标数据类型\n", "# new_column_type = \"DATETIME\" # 目标数据类型\n", "\n", @@ -336,12 +336,12 @@ "name": "stdout", "output_type": "stream", "text": [ - "✅ 成功添加字段: `商品名称`\n", + "✅ 成功添加字段: `是否同步`\n", "数据库连接已关闭\n" ] } ], - "execution_count": 3 + "execution_count": 24 }, { "metadata": {}, diff --git a/tools/保存common_module数据库链接内容.py b/tools/保存common_module数据库链接内容.py index 05a4c69..3ff1565 100644 --- a/tools/保存common_module数据库链接内容.py +++ b/tools/保存common_module数据库链接内容.py @@ -22,8 +22,9 @@ def save_data_yichang_to_csv(): print("开始获取异常明细数据...") # 获取前一天的异常数据 - data_yichang_S = common_module.get_yichang_details(days_back=1) - + # data_yichang_S = common_module.get_yichang_details(days_back=1) + data_yichang_S = common_module.get_ngv_details(days_back=1) # ngv + if data_yichang_S is None or data_yichang_S.empty: print("未获取到数据或数据为空") return diff --git a/yd_api.py b/yd_api.py index f6f4e6c..4b2e94a 100644 --- a/yd_api.py +++ b/yd_api.py @@ -32,7 +32,7 @@ class YDAPI: token = res.json().get('accessToken') return token - def update_from(self, token, formInstanceId, data_new,delay=2, max_retries=10): + def update_from(self, token, formInstanceId, data_new): """ 函数功能:更新表单内容 @@ -62,19 +62,9 @@ class YDAPI: "updateFormDataJson": json.dumps(data_new, cls=NpEncoder), } - attempt = 0 - while True: - if attempt >= max_retries: - error_task_logger.error(f"请求失败,已达最大重试次数 {max_retries},无法更新表单数据,跳过本次请求。") - break - try: - res = requests.get(api, headers=headers,timeout=10) - res.raise_for_status() # 如果响应状态码不是2xx,则抛出HTTPError - return res.json() - except (requests.exceptions.RequestException, Exception) as e: - print(f"请求出现异常: {e}, 正在重试({attempt + 1}/{max_retries})...") - time.sleep(delay) # 等待指定的延迟时间后再次尝试 - attempt += 1 + + res = requests.put(api, headers=headers, json=payload) + return res def processes_instancesInfos(self, token, id, appType="APP_UYZ0KG6L0CCNV80GZ66O", systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", delay=2, max_retries=10): @@ -115,7 +105,7 @@ class YDAPI: attempt += 1 def read_processes(self, token, formUuid, page, n, appType="APP_UYZ0KG6L0CCNV80GZ66O", - systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", max_retries=10,delay=2): + systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", max_retries=10, delay=2): """ 函数功能:读取普通表单的所有数据 @@ -151,9 +141,9 @@ class YDAPI: try: res = requests.post(api, headers=headers, json=formData) - logger.info(f"HTTP Status Code: {res.status_code}") - logger.info(f"Response Headers: {dict(res.headers)}") - logger.info(f"Raw Response Text: {res.text}") # 注意:不要在线上环境打印敏感数据! + # logger.info(f"HTTP Status Code: {res.status_code}") + # logger.info(f"Response Headers: {dict(res.headers)}") + # logger.info(f"Raw Response Text: {res.text}") # 注意:不要在线上环境打印敏感数据! return res.json() except requests.exceptions.RequestException as e: @@ -163,7 +153,9 @@ class YDAPI: def read_processes_instances(self, token, formUuid, page, n, appType="APP_UYZ0KG6L0CCNV80GZ66O", systemToken="XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", instanceStatus="RUNNING", - max_retries=10, delay=2, createFromTimeGMT=None, createToTimeGMT=None): + max_retries=10, delay=2, createFromTimeGMT=None, createToTimeGMT=None, + modifiedFromTimeGMT=None, + modifiedToTimeGMT=None, searchFieldJson={}): """ 函数功能:读取流程表单的所有数据,并加入重试机制。 @@ -200,7 +192,13 @@ class YDAPI: "instanceStatus": instanceStatus, # 运行中 "createFromTimeGMT": createFromTimeGMT, "createToTimeGMT": createToTimeGMT, + "modifiedFromTimeGMT": modifiedFromTimeGMT, + "modifiedToTimeGMT": modifiedToTimeGMT, + "searchFieldJson": json.dumps( + searchFieldJson + ) } + # print(formData) while True: if attempt >= max_retries: @@ -209,6 +207,62 @@ class YDAPI: try: res = requests.post(api, headers=headers, json=formData) + # print(res.json()) + res.raise_for_status() # 如果返回状态码不是2xx,抛出异常 + return res.json() + + except requests.exceptions.RequestException as e: + logger.warning(f"请求异常: {e},正在尝试第 {attempt + 1} 次重试...") + time.sleep(delay) + attempt += 1 + + @staticmethod + def get_ids_query(token, formUuid, appType, systemToken, formInstanceIdList=None, max_retries=10, delay=2): + """ + 函数功能:读取表单的所有数据,并加入重试机制。 + + Args: + token (str): 登录验证token,用于API调用的身份验证。 + formUuid (str): 表单唯一标识符,用于指定需要读取哪个表单的实例数据。 + page (int): 分页参数,指定请求的数据页码。 + n (int): 每页显示的数据条数。 + appType (str): 应用类型标识符,默认为 "APP_UYZ0KG6L0CCNV80GZ66O" + systemToken (str): 系统token,默认为固定值 + instanceStatus (str): 流程实例状态,默认为"RUNNING" + max_retries (int): 最大重试次数,默认为10次 + delay (int): 每次重试之间的延迟秒数,默认为2秒 + + Returns: + dict: 返回从API获取的流程表单实例数据的JSON解析结果。 + + Raises: + Exception: 如果达到最大重试次数仍未成功,则抛出异常。 + """ + + attempt = 0 + api = f'https://api.dingtalk.com/v1.0/yida/forms/instances/ids/query' + headers = { + "Content-Type": "application/json", + "x-acs-dingtalk-access-token": token + } + formData = { + "appType": appType, + "systemToken": systemToken, + "userId": "yida_pub_account", # 超级管理员账号 + "language": "zh_CN", + "formUuid": formUuid, + "formInstanceIdList": formInstanceIdList, + } + # print(formData) + + while True: + if attempt >= max_retries: + error_task_logger.error(f"请求失败,已达最大重试次数 {max_retries},无法获取流程实例数据,跳过本次请求。") + break + + try: + res = requests.post(api, headers=headers, json=formData) + # print(res.json()) res.raise_for_status() # 如果返回状态码不是2xx,抛出异常 return res.json() @@ -277,7 +331,6 @@ class YDAPI: headers = { # "Content-Type": "application/json", "x-acs-dingtalk-access-token": token - } while True: