From 2528a2778cf3e4f6059dee8896ca60bca3aba849 Mon Sep 17 00:00:00 2001 From: panda <1415243231@qq.com> Date: Sun, 4 Jan 2026 13:44:53 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A0=A1=E9=AA=8C=E5=94=AF=E4=B8=80=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=B7=BB=E5=8A=A0=E6=97=B6=E9=97=B4=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E7=BB=AD=E7=BA=A6=E4=BB=A3=E5=8A=9E=E8=AF=B7=E6=B1=82?= =?UTF-8?q?=E6=AC=A1=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ..._denominator_reporting_adjustment_to_bi.py | 2 - logs/error_task.log | 20 ++++++ logs/task.log | 48 ++++++++++++++ task_executor.py | 1 - tasks.csv | 1 + test/续约待办宜搭传给简道云.py | 42 ++++++++++-- utils.py | 66 +++++++++++-------- 7 files changed, 141 insertions(+), 39 deletions(-) diff --git a/back_ground_module/update_denominator_reporting_adjustment_to_bi.py b/back_ground_module/update_denominator_reporting_adjustment_to_bi.py index 58d5ada..e11fb31 100644 --- a/back_ground_module/update_denominator_reporting_adjustment_to_bi.py +++ b/back_ground_module/update_denominator_reporting_adjustment_to_bi.py @@ -205,11 +205,9 @@ class DenominatorReportingAdjustment: def main(self): task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: - # step1:获取宜搭数据 self.get_yida_data() logger.info("✅ 获取宜搭数据成功") - df = pd.DataFrame(self.denominator_data_list) # step2:清空BI数据表 diff --git a/logs/error_task.log b/logs/error_task.log index 30f2e0e..ce96b88 100644 --- a/logs/error_task.log +++ b/logs/error_task.log @@ -2534,3 +2534,23 @@ 2025-12-31 10:05:48,088 - log_config.py - error_task_logger - ERROR - 任务 经销商新签服务单转BI 超过执行窗口5分钟以上,标记为过期。 2025-12-31 10:05:48,088 - log_config.py - error_task_logger - ERROR - 任务 高德匹配手机号 超过执行窗口5分钟以上,标记为过期。 2025-12-31 10:05:48,089 - log_config.py - error_task_logger - ERROR - 任务 省市区人员关系表转BI 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,158 - log_config.py - error_task_logger - ERROR - 任务 NGV新增数据 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,159 - log_config.py - error_task_logger - ERROR - 任务 新签客户回访 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,160 - log_config.py - error_task_logger - ERROR - 任务 续约客户回访 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,161 - log_config.py - error_task_logger - ERROR - 任务 接车宝日常派发 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,161 - log_config.py - error_task_logger - ERROR - 任务 私域小程序数据支撑 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,162 - log_config.py - error_task_logger - ERROR - 任务 小六提成数据支撑 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,163 - log_config.py - error_task_logger - ERROR - 任务 异业合作数据支撑 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,164 - log_config.py - error_task_logger - ERROR - 任务 短信数据支撑 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,164 - log_config.py - error_task_logger - ERROR - 任务 海外邮件推送 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,165 - log_config.py - error_task_logger - ERROR - 任务 异常服务待办派发 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,166 - log_config.py - error_task_logger - ERROR - 任务 简道云海外项目CRM客户档案迁移BI 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,167 - log_config.py - error_task_logger - ERROR - 任务 安装服务历史派发 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,167 - log_config.py - error_task_logger - ERROR - 任务 分母报备调整 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,168 - log_config.py - error_task_logger - ERROR - 任务 分子报备调整 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,169 - log_config.py - error_task_logger - ERROR - 任务 履约表数据支撑 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,169 - log_config.py - error_task_logger - ERROR - 任务 字段监控 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,170 - log_config.py - error_task_logger - ERROR - 任务 经销商新签服务单转BI 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,171 - log_config.py - error_task_logger - ERROR - 任务 高德匹配手机号 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,171 - log_config.py - error_task_logger - ERROR - 任务 省市区人员关系表转BI 超过执行窗口5分钟以上,标记为过期。 +2026-01-04 10:29:26,172 - log_config.py - error_task_logger - ERROR - 任务 续约回访待办 超过执行窗口5分钟以上,标记为过期。 diff --git a/logs/task.log b/logs/task.log index 9e07a56..ea54cbf 100644 --- a/logs/task.log +++ b/logs/task.log @@ -187,3 +187,51 @@ 2025-12-31 10:05:48,090 - utils.py - task_logger - INFO - 任务 省市区人员关系表转BI 状态已更新为 过期。 2025-12-31 10:05:48,090 - utils.py - task_logger - INFO - 启动任务加载完成。 2025-12-31 10:05:48,090 - main.py - task_logger - INFO - 程序已启动... +2026-01-04 10:29:25,326 - utils.py - task_logger - INFO - 任务队列已从磁盘加载。 +2026-01-04 10:29:26,143 - api.py - task_logger - INFO - 获取了35条数据 +2026-01-04 10:29:26,157 - sample_cloud_modules.py - task_logger - INFO - 任务已从云端获取并保存到 tasks.csv 文件。 +2026-01-04 10:29:26,157 - main.py - task_logger - INFO - 任务列表已保存到 csv 文件中。 +2026-01-04 10:29:26,157 - utils.py - task_logger - INFO - 启动时加载并执行任务... +2026-01-04 10:29:26,158 - utils.py - task_logger - INFO - 任务已从磁盘加载到全局任务字典。 +2026-01-04 10:29:26,159 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,159 - utils.py - task_logger - INFO - 任务 NGV新增数据 状态已更新为 过期。 +2026-01-04 10:29:26,160 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,160 - utils.py - task_logger - INFO - 任务 新签客户回访 状态已更新为 过期。 +2026-01-04 10:29:26,160 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,161 - utils.py - task_logger - INFO - 任务 续约客户回访 状态已更新为 过期。 +2026-01-04 10:29:26,161 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,161 - utils.py - task_logger - INFO - 任务 接车宝日常派发 状态已更新为 过期。 +2026-01-04 10:29:26,162 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,162 - utils.py - task_logger - INFO - 任务 私域小程序数据支撑 状态已更新为 过期。 +2026-01-04 10:29:26,163 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,163 - utils.py - task_logger - INFO - 任务 小六提成数据支撑 状态已更新为 过期。 +2026-01-04 10:29:26,163 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,163 - utils.py - task_logger - INFO - 任务 异业合作数据支撑 状态已更新为 过期。 +2026-01-04 10:29:26,164 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,164 - utils.py - task_logger - INFO - 任务 短信数据支撑 状态已更新为 过期。 +2026-01-04 10:29:26,165 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,165 - utils.py - task_logger - INFO - 任务 海外邮件推送 状态已更新为 过期。 +2026-01-04 10:29:26,166 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,166 - utils.py - task_logger - INFO - 任务 异常服务待办派发 状态已更新为 过期。 +2026-01-04 10:29:26,166 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,166 - utils.py - task_logger - INFO - 任务 简道云海外项目CRM客户档案迁移BI 状态已更新为 过期。 +2026-01-04 10:29:26,167 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,167 - utils.py - task_logger - INFO - 任务 安装服务历史派发 状态已更新为 过期。 +2026-01-04 10:29:26,168 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,168 - utils.py - task_logger - INFO - 任务 分母报备调整 状态已更新为 过期。 +2026-01-04 10:29:26,169 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,169 - utils.py - task_logger - INFO - 任务 分子报备调整 状态已更新为 过期。 +2026-01-04 10:29:26,169 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,169 - utils.py - task_logger - INFO - 任务 履约表数据支撑 状态已更新为 过期。 +2026-01-04 10:29:26,170 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,170 - utils.py - task_logger - INFO - 任务 字段监控 状态已更新为 过期。 +2026-01-04 10:29:26,171 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,171 - utils.py - task_logger - INFO - 任务 经销商新签服务单转BI 状态已更新为 过期。 +2026-01-04 10:29:26,171 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,171 - utils.py - task_logger - INFO - 任务 高德匹配手机号 状态已更新为 过期。 +2026-01-04 10:29:26,172 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,172 - utils.py - task_logger - INFO - 任务 省市区人员关系表转BI 状态已更新为 过期。 +2026-01-04 10:29:26,173 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2026-01-04 10:29:26,173 - utils.py - task_logger - INFO - 任务 续约回访待办 状态已更新为 过期。 +2026-01-04 10:29:26,173 - utils.py - task_logger - INFO - 启动任务加载完成。 +2026-01-04 10:29:26,173 - main.py - task_logger - INFO - 程序已启动... diff --git a/task_executor.py b/task_executor.py index bbf5c10..759f1a6 100644 --- a/task_executor.py +++ b/task_executor.py @@ -37,7 +37,6 @@ def execute_task(task_id) -> bool: "分母报备调整": Module.update_molecule_reporting_adjustment_to_bi, "履约表数据支撑": Module.import_performance_data, "字段监控": Module.data_monitor, - "测试3": Module.text3, "经销商新签服务单转BI": Module.new_dealer_service_order_to_bi, "合伙人结算登记同步到BI": Module.partner_settlement_to_BI, "非标业绩提报转BI": Module.non_standar_performance_to_BI, diff --git a/tasks.csv b/tasks.csv index d22b120..6653766 100644 --- a/tasks.csv +++ b/tasks.csv @@ -29,3 +29,4 @@ NGV更新数据,12:30,True,待执行 合伙人结算登记同步到BI,17:02,True,待执行 高德匹配手机号,05:00,True,过期 省市区人员关系表转BI,08:00,True,过期 +续约回访待办,09:35,True,过期 diff --git a/test/续约待办宜搭传给简道云.py b/test/续约待办宜搭传给简道云.py index 8afbb93..cfbe10b 100644 --- a/test/续约待办宜搭传给简道云.py +++ b/test/续约待办宜搭传给简道云.py @@ -572,10 +572,35 @@ class YDToJDYRenewalToDo(object): print(f"获取流程实例数据时出错: {e}") continue - df = pd.DataFrame(all_process_list) - df.to_csv(f"{output_dir}/{start_time}_{end_time}_all_process_list.csv", index=False) + df_current = pd.DataFrame(all_process_list) + current_file = f"{output_dir}/{start_time}_{end_time}_all_process_list.csv" - return all_process_list + # === 新增:读取上次文件并计算差值 === + if os.path.exists(current_file): + try: + df_last = pd.read_csv(current_file) + except Exception as e: + print(f"读取历史文件失败: {e}") + df_last = pd.DataFrame() + else: + df_last = pd.DataFrame() + + # 假设唯一标识字段为 'instanceId',请根据实际字段名调整 + id_col = 'processInstanceId' + if not df_last.empty and not df_current.empty and id_col in df_current.columns and id_col in df_last.columns: + # 转为字符串确保可比 + last_ids = set(df_last[id_col].astype(str)) + current_ids = set(df_current[id_col].astype(str)) + new_ids = current_ids - last_ids + diff_records = df_current[df_current[id_col].astype(str).isin(new_ids)].to_dict('records') + else: + # 没有历史文件 或 无唯一ID → 返回空 list(按你要求) + diff_records = [] + + # 保存当前全量数据(覆盖) + df_current.to_csv(current_file, index=False) + + return diff_records def filter_renewal_data(self, all_process_list): update_data_list = [] @@ -680,10 +705,13 @@ class YDToJDYRenewalToDo(object): try: # step1 获取简道云与宜搭数据 jd_ydy_data = self.load_all_data() - # step2 过滤已经续约的单子 - update_data_list = self.filter_renewal_data(jd_ydy_data) - # step3 校验简道云是否有进行中的单子并关闭 - self.check_jd_ydy_data(update_data_list) + if jd_ydy_data: + # step2 过滤已经续约的单子 + update_data_list = self.filter_renewal_data(jd_ydy_data) + # step3 校验简道云是否有进行中的单子并关闭 + self.check_jd_ydy_data(update_data_list) + else: + print("本次执行无处理数据") except Exception as e: print(e) diff --git a/utils.py b/utils.py index 05d1e8d..e0a886f 100644 --- a/utils.py +++ b/utils.py @@ -15,7 +15,7 @@ logger = configure_task_logger() # 获取错误任务日志记录器 error_task_logger = configure_error_task_logger() -# 全局任务字典,使用 unique_id 作为键 +# 全局任务字典,使用 (unique_id, exec_time) 作为组合键,支持一个任务多个执行时间 all_tasks = {} @@ -86,14 +86,14 @@ class PersistentQueue: """将任务加入队列并保存到磁盘""" self.queue.put(task) self.save_to_disk() - logger.info(f"任务 {task['unique_id']} 已加入队列。") + logger.info(f"任务 {task['unique_id']} ({task['exec_time']}) 已加入队列。") def get(self): """从队列中获取任务并保存到磁盘""" if not self.queue.empty(): task = self.queue.get() self.save_to_disk() - logger.info(f"任务 {task['unique_id']} 已从队列中取出。") + logger.info(f"任务 {task['unique_id']} ({task['exec_time']}) 已从队列中取出。") return task else: logger.info("任务队列为空。") @@ -120,19 +120,20 @@ task_queue = PersistentQueue() def load_tasks_and_execute(): """ 从 tasks.csv 文件中加载任务到持久化队列中,仅包含待执行任务。 - 避免重复添加任务。 + 支持一个任务多个执行时间,避免重复添加任务。 """ try: load_tasks_from_csv() # 从 CSV 文件加载所有任务 now = datetime.now() - # 获取队列中已有任务ID(避免重复添加) - existing_task_ids = set() + # 获取队列中已有任务标识(使用 (unique_id, exec_time) 避免重复添加) + existing_task_keys = set() # 创建一个临时队列来遍历任务 temp_queue = Queue() while not task_queue.empty(): task = task_queue.get() - existing_task_ids.add(task['unique_id']) + task_key = (task['unique_id'], task['exec_time']) + existing_task_keys.add(task_key) temp_queue.put(task) # 将任务放回原队列 while not temp_queue.empty(): @@ -144,18 +145,21 @@ def load_tasks_and_execute(): if (now - exec_time_today) > timedelta(minutes=5): if task['status'] == '待执行': - error_task_logger.error(f"任务 {task['unique_id']} 超过执行窗口5分钟以上,标记为过期。") - update_task_status(task['unique_id'], '过期') + error_task_logger.error(f"任务 {task['unique_id']} ({task['exec_time']}) 超过执行窗口5分钟以上,标记为过期。") + update_task_status(task['unique_id'], task['exec_time'], '过期') continue # 不处理过期任务 + # 使用组合键判断任务是否已在队列中 + task_key = (task['unique_id'], task['exec_time']) + # 如果任务应该执行、状态是待执行且不在队列中 if (task['should_execute'](now) and task['status'] == '待执行' and - task['unique_id'] not in existing_task_ids): + task_key not in existing_task_keys): task_queue.put(task) - existing_task_ids.add(task['unique_id']) - logger.info(f"任务 {task['unique_id']} 加入队列。") + existing_task_keys.add(task_key) + logger.info(f"任务 {task['unique_id']} ({task['exec_time']}) 加入队列。") except Exception as e: error_task_logger.error(f"加载任务时出错: {e}") @@ -165,26 +169,26 @@ def load_tasks_and_execute(): def execute_single_task(): - """从队列中取出并执行单个任务""" + """从队列中取出并执行单个任务,支持一个任务多个执行时间""" if not task_queue.empty(): try: task = task_queue.get() if task: # 标记任务为正在执行 - update_task_status(task['unique_id'], '正在执行') + update_task_status(task['unique_id'], task['exec_time'], '正在执行') try: success = execute_task(task['unique_id']) if success: - update_task_status(task['unique_id'], '已完成') - logger.info(f"任务 {task['unique_id']} 执行成功。") + update_task_status(task['unique_id'], task['exec_time'], '已完成') + logger.info(f"任务 {task['unique_id']} ({task['exec_time']}) 执行成功。") else: - update_task_status(task['unique_id'], '失败') - error_task_logger.error(f"任务 {task['unique_id']} 执行失败。") + update_task_status(task['unique_id'], task['exec_time'], '失败') + error_task_logger.error(f"任务 {task['unique_id']} ({task['exec_time']}) 执行失败。") except Exception as e: - error_task_logger.error(f"任务 {task['unique_id']} 执行时发生异常: {e}") - update_task_status(task['unique_id'], '失败') + error_task_logger.error(f"任务 {task['unique_id']} ({task['exec_time']}) 执行时发生异常: {e}") + update_task_status(task['unique_id'], task['exec_time'], '失败') task_queue.task_done() @@ -193,7 +197,7 @@ def execute_single_task(): def load_tasks_from_csv(): - """从 tasks.csv 文件中加载任务到全局任务字典""" + """从 tasks.csv 文件中加载任务到全局任务字典,支持一个任务多个执行时间""" try: csv_file = Path('tasks.csv') if csv_file.is_file(): @@ -212,7 +216,9 @@ def load_tasks_from_csv(): exec_time = datetime.strptime(task['exec_time'], '%H:%M').time() task['should_execute'] = lambda now, et=exec_time: now.time() >= et - all_tasks[task['unique_id']] = task + # 使用 (unique_id, exec_time) 作为组合键,支持一个任务多个执行时间 + task_key = (task['unique_id'], task['exec_time']) + all_tasks[task_key] = task logger.info("任务已从磁盘加载到全局任务字典。") else: logger.warning("tasks.csv 文件未找到,初始化为空任务字典。") @@ -221,11 +227,12 @@ def load_tasks_from_csv(): def save_tasks_to_csv(): - """将所有任务的状态保存到 tasks.csv 文件""" + """将所有任务的状态保存到 tasks.csv 文件,支持一个任务多个执行时间""" try: with open('tasks.csv', 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=['unique_id', 'exec_time', 'is_switch_on', 'status']) writer.writeheader() + # 遍历所有任务实例(包括相同unique_id的不同exec_time) for task in all_tasks.values(): task_to_save = { 'unique_id': task['unique_id'], @@ -240,14 +247,15 @@ def save_tasks_to_csv(): error_task_logger.error(f"保存任务状态时出错: {e}") -def update_task_status(task_id, new_status): - """更新指定任务的状态,并保存到磁盘""" - if task_id in all_tasks: - all_tasks[task_id]['status'] = new_status +def update_task_status(task_id, exec_time, new_status): + """更新指定任务的状态,并保存到磁盘,支持一个任务多个执行时间""" + task_key = (task_id, exec_time) + if task_key in all_tasks: + all_tasks[task_key]['status'] = new_status save_tasks_to_csv() - logger.info(f"任务 {task_id} 状态已更新为 {new_status}。") + logger.info(f"任务 {task_id} ({exec_time}) 状态已更新为 {new_status}。") else: - error_task_logger.error(f"尝试更新不存在的任务 {task_id} 的状态。") + error_task_logger.error(f"尝试更新不存在的任务 {task_id} ({exec_time}) 的状态。") def load_tasks_on_start():