from yd_api import YDAPI from api import API import pandas as pd from tqdm import tqdm import time from datetime import datetime, timedelta from config import Config from back_ground_module import CommonModule import logging from log_config import configure_task_logger, configure_error_task_logger # 获取已经配置好的常规日志记录器 logger = configure_task_logger() # 获取已经配置好的错误任务日志记录器 error_task_logger = configure_error_task_logger() # 初始化 API 实例和 Token api_instanceyd = YDAPI() api_instance = API() common_module = CommonModule() TOKEN = api_instanceyd.generateToken() # 配置常量 FORMID = "FORM-JD8668C1O2MV388PX5YIQGKMBEPS1EXV0UCWK0" # FPO需求提交 appType = "APP_FE5IWP670JPRC5ZA6HK0" # FPO产品运营 systemToken = "HP666C71ZLASJ0MPWC5ZOUA4AGDP17QU7TPRK92" # FPO产品运营 BASE_URL = "https://f6car.aliwork.com" # 基础URL # 目标表单配置 TARGET_API_KEY = "675b900991ad2491c69389ca" TARGET_ENTRY_ID = "683fa187aeaf99ebb29faa63" class YDFpoJiandaoyun: """ 宜搭FPO迁移到简道云 """ def __init__(self): self.all_dict = pd.DataFrame() # 存储流程实例数据 self.data_NGV = pd.DataFrame() # 存储NGV数据 self.all_data = [] # 存储需要派发的数据 self.success_count = 0 self.fail_count = 0 self.max_retries = 3 # API请求最大重试次数 self.retry_delay = 1 # 重试延迟(秒) def fetch_process_data(self): """获取所有流程实例数据""" try: # 获取时间范围 today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) yesterday_midnight = today_midnight - timedelta(days=1) start_time = yesterday_midnight.strftime("%Y-%m-%d") # 昨天0点 end_time = today_midnight.strftime("%Y-%m-%d") # 今天0点 logger.info(f"开始时间: {start_time}, 结束时间: {end_time}") # 创建DataFrame columns = ["超链接", "门店名称", "需求编号", "需求类型", "场景描述", "需求描述", "需求解决状态", "需求评审意见", "需求提出日期", "计划解决日期", "实际解决日期"] all_dict = pd.DataFrame(columns=columns) # 获取总页数 form_data = self._api_request_with_retry( api_instanceyd.read_processes, token=TOKEN, formUuid=FORMID, page=1, n=100, appType=appType, systemToken=systemToken ) if not form_data: error_task_logger.error("获取流程实例数据失败") return pd.DataFrame() total_count = form_data.get('totalCount', 0) total_pages = total_count // 100 + (1 if total_count % 100 > 0 else 0) logger.info(f"总共有 {total_count} 条记录,分为 {total_pages} 页") # 分页获取数据 for page in tqdm(range(1, total_pages + 1)): form_data = self._api_request_with_retry( api_instanceyd.read_processes, token=TOKEN, formUuid=FORMID, page=page, n=100, appType=appType, systemToken=systemToken ) if not form_data: error_task_logger.error(f"获取第 {page} 页数据失败") continue self._process_page_data(form_data, all_dict) logger.info(f"成功获取 {len(all_dict)} 条流程实例数据") return all_dict except Exception as e: error_task_logger.error(f"获取流程实例数据时发生错误: {e}", exc_info=True) return pd.DataFrame() def _process_page_data(self, form_data, all_dict): """处理单页流程实例数据""" for file in form_data.get("data", []): try: # 构建URL url = f"{BASE_URL}/{appType}/formDetail/{FORMID}?formInstId={file['formInstanceId']}" # 添加新行 new_row = pd.Series([ url, file['formData'].get('selectField_kwcxm069', ''), file['formData'].get('textField_krydg6af', ''), file['formData'].get('selectField_kwd5rp7s', ''), file['formData'].get('textareaField_krrhmeuc', ''), file['formData'].get('textareaField_krrhmeud', ''), file['formData'].get('selectField_krrhmevo', ''), file['formData'].get('textareaField_krrhmevm', ''), file['formData'].get('dateField_krrhmeue', ''), file['formData'].get('dateField_krrhmex6', ''), file['formData'].get('dateField_krrhmex9', '') ], index=all_dict.columns) all_dict.loc[len(all_dict)] = new_row except KeyError as e: error_task_logger.error(f"处理记录时缺少字段: {e}") except Exception as e: error_task_logger.error(f"处理记录时出错: {e}") def NGV_data(self): """获取NGV数据""" try: logger.info("开始获取NGV数据") data_NGV = common_module.get_ngv_details(days_back=1) logger.info(f"成功获取 {len(data_NGV)} 条NGV数据") return data_NGV except Exception as e: error_task_logger.error(f"获取NGV数据时发生错误: {e}", exc_info=True) return pd.DataFrame() def match_and_update(self): """根据门店名称匹配两个DataFrame,并更新all_dict""" try: if self.all_dict.empty or self.data_NGV.empty: logger.warning("all_dict 或 data_NGV 为空,无法进行匹配") return self.all_dict # 创建一个映射字典,键是org_name,值是(id_own_org, id_own_group)元组 mapping_dict = {} for _, row in self.data_NGV.iterrows(): mapping_dict[row['org_name']] = (row['id_own_org'], row['id_own_group']) # 创建新的列来存储匹配结果 self.all_dict['id_own_org'] = None self.all_dict['id_own_group'] = None # 匹配计数 match_count = 0 # 遍历all_dict,根据门店名称查找对应的id for idx, row in self.all_dict.iterrows(): store_name = row['门店名称'] if store_name in mapping_dict: org_id, group_id = mapping_dict[store_name] self.all_dict.at[idx, 'id_own_org'] = org_id self.all_dict.at[idx, 'id_own_group'] = group_id match_count += 1 logger.info(f"匹配完成: 共 {len(self.all_dict)} 条记录,成功匹配 {match_count} 条") return self.all_dict except Exception as e: error_task_logger.error(f"匹配数据时发生错误: {e}", exc_info=True) return self.all_dict def write_in(self): """将处理后的数据写入表单""" if self.all_dict.empty: logger.warning("all_dict 为空,没有数据可写入") return self.success_count = 0 self.fail_count = 0 for index_num, row in tqdm(self.all_dict.iterrows(), total=len(self.all_dict)): try: payload_dict = self._build_payload_dict(row) self.all_data.append(payload_dict) self.success_count += 1 except Exception as e: self.fail_count += 1 error_task_logger.error(f"处理记录 #{index_num} 时发生错误: {e}", exc_info=True) return self.all_data def _build_payload_dict(self, row): """构建请求数据字典""" return { "_widget_1749000583446": {"value": row["门店名称"]}, "_widget_1749000583447": {"value": row["超链接"]}, "_widget_1749006049130": {"value": row["需求编号"]}, "_widget_1749006049131": {"value": row["需求类型"]}, "_widget_1749006049132": {"value": row["场景描述"]}, "_widget_1749006049133": {"value": row["需求描述"]}, "_widget_1749006049134": {"value": row["需求解决状态"]}, "_widget_1749006049135": {"value": row["id_own_org"]}, "_widget_1749006049136": {"value": row["id_own_group"]}, "_widget_1749019791181": {"value": row["需求评审意见"]}, "_widget_1749019791182": {"value": row["需求提出日期"]}, "_widget_1749019791183": {"value": row["计划解决日期"]}, "_widget_1749019791184": {"value": row["实际解决日期"]} } def _build_request_payload(self, payload_dict): """构建完整请求负载""" UUid = time.strftime("%Y%m%d%H%M%S", time.localtime()) return { "api_key": TARGET_API_KEY, "entry_id": TARGET_ENTRY_ID, "is_start_workflow": "true", "data": payload_dict, "transaction_id": UUid } def _api_request_with_retry(self, api_func, *args, **kwargs): """API请求重试机制""" for attempt in range(self.max_retries): try: result = api_func(*args, **kwargs) return result except Exception as e: if attempt < self.max_retries - 1: logger.warning(f"API请求失败,正在重试 ({attempt + 1}/{self.max_retries}): {e}") # time.sleep(self.retry_delay) self.retry_delay *= 2 # 指数退避 else: error_task_logger.error(f"API请求失败,已达到最大重试次数: {e}", exc_info=True) return None def clear_existing_data(self): """清除目标表单中的现有数据""" try: logger.info("开始清除现有数据") # 获取现有数据ID payload = {"api_key": TARGET_API_KEY, "entry_id": TARGET_ENTRY_ID} get_ids = self._api_request_with_retry( api_instance.entry_data_list, payload ) if not get_ids: logger.warning("获取现有数据ID失败") return data_list = get_ids.get("data", []) if not data_list: logger.info("目标表单中没有现有数据") return delete_ids = [item["_id"] for item in data_list] logger.info(f"将删除 {len(delete_ids)} 条现有记录") # 批量删除 delete_payload = { "api_key": TARGET_API_KEY, "entry_id": TARGET_ENTRY_ID, "data_ids": delete_ids } delete_result = self._api_request_with_retry( api_instance.entry_data_batch_delete, delete_payload ) except Exception as e: error_task_logger.error(f"清除现有数据时发生错误: {e}", exc_info=True) def batch_create_entries(self, task_start_time): """批量创建条目""" try: if not self.all_data: logger.warning("没有数据可创建条目") return False UUid = time.strftime("%Y%m%d%H%M%S", time.localtime()) # task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") routine_follow_up_payload = { "api_key": TARGET_API_KEY, "entry_id": TARGET_ENTRY_ID, "is_start_workflow": "true", "data_list": self.all_data, "transaction_id": UUid } res = api_instance.entry_data_batch_create(routine_follow_up_payload) logger.info(f"创建结果:{res}") # 发送任务状态 common_module.send_task_status(task_start_time, "宜搭FPO实例同步简道云") return True except Exception as e: error_task_logger.error(f"批量创建条目时发生错误: {e}", exc_info=True) return False def main(self): """主函数,协调各个处理步骤""" task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") logger.info("=" * 50) logger.info("开始执行时间消耗流程处理") logger.info("=" * 50) try: # Step 1: 获取流程实例 logger.info("步骤1: 获取流程实例数据") self.all_dict = self.fetch_process_data() if self.all_dict.empty: logger.warning("未获取到流程实例数据,终止执行") return # Step 2: 获取NGV数据 logger.info("步骤2: 获取NGV数据") self.data_NGV = self.NGV_data() if self.data_NGV.empty: logger.warning("未获取到NGV数据,终止执行") return # Step 3: 匹配数据 logger.info("步骤3: 匹配流程实例与NGV数据") self.all_dict = self.match_and_update() # Step 4: 清除之前的数据 logger.info("步骤4: 清除目标表单中的现有数据") self.clear_existing_data() # Step 5: 写入表单 logger.info("步骤5: 将处理后的数据写入表单") self.write_in() # Step 6: 批量创建条目 logger.info("步骤6: 批量创建条目") success = self.batch_create_entries(task_start_time) logger.info("=" * 50) logger.info( f"处理完成: 共处理 {len(self.all_dict)} 条记录,成功 {self.success_count} 条,失败 {self.fail_count} 条") logger.info(f"批量创建结果: {'成功' if success else '失败'}") logger.info("=" * 50) except Exception as e: error_task_logger.error(f"执行过程中发生错误: {e}", exc_info=True) common_module.send_task_error(task_start_time, "宜搭FPO实例同步简道云", str(e)) if __name__ == '__main__': processor = YDFpoJiandaoyun() processor.main()