from module import Module from log_config import configure_task_logger, configure_error_task_logger logger = configure_task_logger() error_task_logger = configure_error_task_logger() def execute_task(task_id) -> bool: """ 执行指定任务ID对应的任务函数。 :param task_id: 任务的唯一标识符 :return: 返回布尔值,表示任务是否成功执行 """ # 定义任务ID到函数的映射 task_functions = { 'NGV新增数据': Module.update_ngv_data, 'NGV更新数据': Module.daily_update_ngv_data, '新签客户回访': Module.new_services_revisit, '续约客户回访': Module.renew_services_revisit, '大客户回访': Module.key_services_revisit, '接车宝日常派发': Module.jcb_efficient_car_pickup_data, '接车宝异常派发': Module.jcb_abnormal_revisit_data, '私域小程序数据支撑': Module.data_Support_Private_Mini_Program, '小六提成数据支撑': Module.data_Support_Commission, '异业合作数据支撑': Module.data_Support_DifferentIndustries, '短信数据支撑': Module.data_Support_GroupNotification, '海外邮件推送': Module.data_Update_Email, '异常服务待办派发': Module.data_Exception_Task, '手动添加日常回访': Module.revisit_all_information, "宜搭FPO实例同步简道云": Module.yida_Fpo_Jandaoyun, "宜搭流程耗时写入BI": Module.get_process_time, "简道云海外项目CRM客户档案迁移BI": Module.CRMDataProcessor, "简道云员工ID表更新": Module.update_ID_form, "安装服务历史派发": Module.install_event_dispatcher, "新签客户回访测试": Module.test, "分子报备调整": Module.update_denominator_reporting_adjustment, "分母报备调整": Module.update_molecule_reporting_adjustment_to_bi, "履约表数据支撑": Module.import_performance_data, "字段监控": Module.data_monitor, "测试3": Module.text3, "经销商新签服务单转BI": Module.new_dealer_service_order_to_bi, "合伙人结算登记同步到BI": Module.partner_settlement_to_BI, "非标业绩提报转BI": Module.non_standar_performance_to_BI, "高德匹配手机号": Module.GD_match_phone_number, # 添加更多任务函数映射... } # 尝试找到对应的任务函数并执行它 task_function = task_functions.get(task_id) if task_function: try: result = task_function() if result: logger.info(f"任务 {task_id} 执行成功: {result}") return True else: return False except Exception as e: error_task_logger.error(f"任务 {task_id} 执行失败: {e}") return False else: logger.warning(f"未找到任务 {task_id} 对应的函数。") return False