diff --git a/back_ground_module/__init__.py b/back_ground_module/__init__.py index 7e80102..df05fb0 100644 --- a/back_ground_module/__init__.py +++ b/back_ground_module/__init__.py @@ -28,3 +28,4 @@ from back_ground_module.non_standar_performance_to_BI import NonStandardPerforma from back_ground_module.partner_settlement_to_BI import PartnerSettlementToBI from back_ground_module.GD_match_phone_number import GDMatchPhoneNumber from back_ground_module.province_city_person_relation_to_bi import ProvinceCityPersonRelationToBI +from back_ground_module.renewal_to_do import RenewalToDo \ No newline at end of file diff --git a/back_ground_module/renewal_to_do.py b/back_ground_module/renewal_to_do.py new file mode 100644 index 0000000..f808829 --- /dev/null +++ b/back_ground_module/renewal_to_do.py @@ -0,0 +1,535 @@ +import os +from datetime import datetime +import pandas as pd +from api import API +from back_ground_module import CommonModule +from log_config import configure_task_logger, configure_error_task_logger +from collections import defaultdict + +logger = configure_task_logger() +error_task_logger = configure_error_task_logger() +api_instance = API() +common_module = CommonModule() +output_dir = "output" # 设置输出目录 +os.makedirs(output_dir, exist_ok=True) + + +class RenewalToDo: + """续约回访待办派发""" + def __init__(self): + self.renewal_data_list = None + self.cyclic_increasing = None + self.franchisee = None + self.last_price = None + self.province_staff_id_list = None + self.json_list = None + self.data_NGV = None + self.staff_id_list = None + self.NGV_data_list = None + self.field_map = { + "关联数据": "_widget_1764820541663", + "公司名称": "_widget_1764820541616", + "门店名称": "_widget_1764820541617", + "门店编码": "_widget_1764820541661", + "加盟商": "_widget_1764820541618", + "过期日": "_widget_1764820541672", + "Saas版本": "_widget_1764820541623", + "上次购买价格": "_widget_1764820541624", + "联系人": "_widget_1764820541621", + "联系手机号": "_widget_1764820541622", + "专属运营顾问": "_widget_1764820541625", + "区域客服": "_widget_1764820541715", + "运营专家": "_widget_1764820541678", + "120天是否跟进": "_widget_1764820541628", + "120天处理人": "_widget_1764820541634", + "120天跟进时间": "_widget_1765352838631", + "60天是否跟进": "_widget_1764820541630", + "60天处理人": "_widget_1764820541635", + "60天跟进时间": "_widget_1765352838632", + "30天是否跟进": "_widget_1764820541632", + "30天处理人": "_widget_1764820541636", + "30天跟进时间": "_widget_1765352838633", + "是否联系上客户": "_widget_1764820541638", + "客户现阶段问题分类": "_widget_1764820541641", + "未联系上原因字段": "_widget_1765330820509", + "联系情况及问题说明": "_widget_1764820541653", + "潜在商机": "_widget_1764820541657", + "商机详情": "_widget_1764820541659", + "门店续约意愿": "_widget_1764820541654", + "不续约原因": "_widget_1764820541700", + "产品原因": "_widget_1764820541707", + "服务问题": "_widget_1764820541709", + "门店原因": "_widget_1764820541711", + "价格原因": "_widget_1764820541713", + "不续约具体情况说明": "_widget_1764820541702", + "回访完成方式": "_widget_1764820541697", + "周期性增购": "_widget_1764820541717", + "周期性增购.商品名称": "_widget_1764820541717._widget_1764820541719", + "周期性增购.分母金额": "_widget_1764820541717._widget_1764820541720", + "周期性增购.应续约日": "_widget_1764820541717._widget_1764820541721", + "周期性增购.上次购买数量": "_widget_1764820541717._widget_1764820541722", + "周期性增购.不续约原因": "_widget_1764820541717._widget_1764820541723", + "周期性增购.是否愿意续约": "_widget_1764820541717._widget_1764820541724", + "周期性增购.续约后订单编码": "_widget_1764820541717._widget_1764820541725", + "订单编码": "_widget_1764820541674", + "订单支付日期": "_widget_1764820541679", + "本次-实付金额(元)": "_widget_1764820541676", + "业务类型(续约、升级)": "_widget_1764820541680", + "连锁门店待办同步处理": "_widget_1764820541681", + "选择需要同步的门店名称": "_widget_1765330820391", + "120天自动流转时间": "_widget_1764820541865", + "60天自动流转时间": "_widget_1765964381895", + "30天自动流转时间": "_widget_1765964381896", + "0天自动流转时间": "_widget_1765964381897", + "当前所处节点": "_widget_1765352838609", + "流程状态": "_widget_1765352838610", + "经营模式": "_widget_1765964381952", + "公司等级": "_widget_1766130435561", + "公司id": "_widget_1766631811839", + "订单商品名称":"_widget_1766730385209", + "提交人": "creator", + "提交时间": "createTime", + "更新时间": "updateTime" + } + self.cn_field_map = { + "related_data": "关联数据", + "group_name": "公司名称", + "org_name": "门店名称", + "org_code": "门店编码", + "expiry_time": "过期日", + "saas_edition_fmt": "Saas版本", + "contacts": "联系人", + "contact_mobile": "联系手机号", + "service_impl_principal": "专属运营顾问", + "group_grade": "公司等级", + "technician": "运营专家", + "manage_model": "经营模式", + "id_own_group": "公司id", + } + self.subform_field_map = { + "商品名称": "_widget_1764820541719", + "分母金额": "_widget_1764820541720", + "应续约日": "_widget_1764820541721", + "上次购买数量": "_widget_1764820541722", + "不续约原因": "_widget_1764820541723", + "是否愿意续约": "_widget_1764820541724", + "续约后订单编码": "_widget_1764820541725", + # 根据实际需要添加更多字段 + } + self.renewal_list_map ={ + + } + + def load_all_data(self): + """ + 从各类来源加载数据上加载数据 + :return: + """ + + # 数据库获取续约回访数据 + self.data_NGV = common_module.get_renewal_details() + # 获取加盟商信息 + self.franchisee = common_module.get_renewal_franchisee_details() + self.franchisee.to_csv(os.path.join(output_dir, "franchisee.csv")) + # 获取上次购买价格 + self.last_price = common_module.get_renewal_last_price_details() + self.last_price.to_csv(os.path.join(output_dir, "last_price.csv")) + # 周期性增购 + self.cyclic_increasing = common_module.get_cyclic_increasing_renewal_details() + self.cyclic_increasing.to_csv(os.path.join(output_dir, "cyclic_increasing.csv")) + + # 获取NGV数据 + payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "675bb02bd2d53c2034c665e4"} + self.NGV_data_list = api_instance.entry_data_list(payload).get("data") + + # 获取简道云员工id + payload = {"api_key": "6694d3c4fcb69ca9a111a6c4", + "entry_id": "6769204a1902c9341340a1bc", + } + staff_id = api_instance.entry_data_list(payload) + self.staff_id_list = staff_id.get("data") # api请求格式,将数据封装在data字典里 + + # 省市区人员关系表 + payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "676512ac3e54dc3159460c0a"} + json_dict = api_instance.entry_data_list(payload) + if json_dict and "data" in json_dict: + self.province_staff_id_list = json_dict.get("data") + else: + print("加载省市区人员关系表失败") + self.province_staff_id_list = [] + + # 获取已派发续约待办(进行中) + payload = {"api_key": "675b900991ad2491c69389ca", + "entry_id": "6931063d64187eaf6b927557", + "filter": {"rel": "and", + "cond": [{"field": "flowState", "type": "flowstate", "method": "eq", "value": [0]}]}, + } + renewal = api_instance.entry_data_list(payload) + self.renewal_data_list = renewal.get("data") + + @staticmethod + def replace_names_with_staff_ids(df, name_columns, staff_id_list): + """ + 将 DataFrame 中多个姓名列替换为对应的员工ID。 + + :param staff_id_list: 简道云获取到员工id + :param df: pandas.DataFrame + :param name_columns: list[str],需要替换的姓名列名列表,例如 ["col1", "col2"] + :return: 修改后的 DataFrame(原列被替换) + """ + # 1. 构建姓名 -> 员工ID 的映射字典(只做一次) + name_to_id = {} + for item in staff_id_list or []: + name = item.get("_widget_1734942794144") + staff_id = item.get("_widget_1734942794145") + if name and staff_id: + name_to_id[str(name).strip()] = str(staff_id) + + # 2. 对每个指定的列进行替换 + df = df.copy() # 避免修改原始数据 + for col in name_columns: + if col not in df.columns: + continue # 跳过不存在的列 + # 替换:姓名 → ID,找不到的保留原值(可改为 fillna(None)) + df[col] = ( + df[col] + .astype(str) + .str.strip() + .map(name_to_id) + .fillna(df[col]) + ) + return df + + @staticmethod + def row_to_dict(row, field_mapping): + """将一行数据转换为指定格式的字典""" + result = {} + for col_name, widget_id in field_mapping.items(): + if col_name not in row: + continue + value = row[col_name] + + # 处理:如果 value 是容器类型(list, dict, tuple, np.ndarray),不进行 pd.isna 判断 + if isinstance(value, (list, dict, tuple)) or (hasattr(value, '__len__') and not isinstance(value, str)): + clean_value = value + else: + # 标量类型:安全使用 pd.isna + if pd.isna(value): + clean_value = None + elif isinstance(value, pd.Timestamp): + clean_value = value.strftime('%Y-%m-%dT%H:%M:%SZ') + else: + clean_value = value + + # 所有字段统一包 {"value": ...},包括子表单 + result[widget_id] = {"value": clean_value} + return result + + @staticmethod + def en_row_to_cn_row(en_row, en_to_cn_map): + """ + 将英文字段的行数据转换为中文字段的行数据 + + :param en_row: dict 或 pandas.Series,key 为英文字段 + :param en_to_cn_map: dict, 英文字段名 -> 中文字段名 + :return: dict,key 为中文字段名 + """ + cn_row = {} + for en_key, value in en_row.items(): + if en_key in en_to_cn_map: + cn_key = en_to_cn_map[en_key] + cn_row[cn_key] = value + # 可选:忽略无法映射的字段,或记录警告 + return cn_row + + @staticmethod + def get_customer_service_by_location(province_name, city_name, area_name, staff_id_list): + """ + 直接遍历 self.staff_id_list,根据省市区匹配续约回访客服。 + + :return: 客服用户名(str),未找到则返回提示信息 + """ + if not all([province_name, city_name, area_name]): + return "数据缺失: 省市区不完整" + + for item in staff_id_list or []: + try: + prov = item.get('_widget_1734677164861', '').strip() + city = item.get('_widget_1734677164862', '').strip() + area = item.get('_widget_1734677164863', '').strip() + + if (prov == province_name.strip() and + city == city_name.strip() and + area == area_name.strip()): + # 提取客服用户名 + staff_info = item.get('_widget_1734677164869', {}) # 续约回访客服 + username = staff_info.get('username') + return username if username else "数据缺失: 客服用户名为空" + except Exception: + continue # 跳过格式异常的记录 + + return "数据缺失: 未找到对应的续约回访客服" + + def build_subform_records( + self, + df: pd.DataFrame, + group_by_col: str, + field_mapping: dict, + ) -> dict: + """ + 通用子表单预处理函数:将子表单 DataFrame 转换为 {group_key: [subform_record1, subform_record2, ...]} 的字典。 + + :param df: 子表单数据 DataFrame,列名为中文(如 "商品名称", "分母金额") + :param group_by_col: 用于分组的列名(如 "门店编码") + :param field_mapping: 字段映射字典,{中文字段名: widget_id},例如 {"商品名称": "_widget_xxx"} + :return: dict,key 为 group_by_col 的值,value 为该组对应的子表单记录列表, + 每条记录是 {widget_id: {"value": clean_value}} 的 dict + """ + if df.empty: + return defaultdict(list) + + result = defaultdict(list) + target_fields = set(field_mapping.keys()) + + for _, row in df.iterrows(): + row_dict = row.to_dict() + group_key = row_dict.get(group_by_col) + + if not group_key or (isinstance(group_key, str) and group_key.strip() == ""): + warning_msg = f"子表单行缺少分组字段 '{group_by_col}',跳过: {row_dict}" + + # 构建单条子表单记录 + sub_record = {} + for field_cn, widget_id in field_mapping.items(): + val = row_dict.get(field_cn) + + # 清理值 + if pd.isna(val): + clean_val = None + elif hasattr(val, 'to_eng_string'): # Decimal + try: + clean_val = float(val) + except (ValueError, TypeError): + clean_val = str(val) + elif isinstance(val, pd.Timestamp): + clean_val = val.strftime('%Y-%m-%d %H:%M:%S') + else: + clean_val = val + + sub_record[widget_id] = {"value": clean_val} + + result[group_key].append(sub_record) + + return result + + def process_data(self): + """ + 数据处理加工 + :return: 处理后的 DataFrame,列名为中文 + """ + data_NGV = self.data_NGV.copy() # 避免修改原始数据 + + # === 将英文字段名替换为中文字段名 === + # 但只重命名存在的列 + rename_map = {en: cn for en, cn in self.cn_field_map.items() if en in data_NGV.columns} + data_NGV.rename(columns=rename_map, inplace=True) + + # 日期字段处理(使用中文列名) + time_columns = ['过期日'] + data_NGV[time_columns] = data_NGV[time_columns].apply( + lambda col: pd.to_datetime(col, errors='coerce') + .dt.tz_localize('Asia/Shanghai') + .dt.tz_convert('UTC') + ) + + # 新增4列:辅助时间字段 + data_NGV['120天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=60) + data_NGV['60天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=30) + data_NGV['30天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=0) + data_NGV['0天自动流转时间'] = data_NGV['过期日'] + pd.Timedelta(days=90) + + data_NGV['120天是否跟进'] = "主动" + data_NGV['60天是否跟进']= "主动" + data_NGV['30天是否跟进']= "主动" + # 格式化为字符串(去掉时区) + for col in ['过期日', '120天自动流转时间', '60天自动流转时间', '30天自动流转时间', '0天自动流转时间']: + data_NGV[col] = data_NGV[col].dt.strftime('%Y-%m-%d %H:%M:%S') + + # 新增加盟商列 + data_NGV = data_NGV.merge( + self.franchisee[['门店编码', '加盟商']], + on='门店编码', + how='left' + ) + + # 新增上次购买价格列 + # 1. 清洗数据 + df_lp = self.last_price[['门店编码', '类型', '订单商品名称', '价格']].copy() + + # 处理“类型”和“订单商品名称”的缺失值 + df_lp['类型'] = df_lp['类型'].fillna('').astype(str) + df_lp['订单商品名称'] = df_lp['订单商品名称'].fillna('').astype(str) + + # 处理价格:转数字、四舍五入、填0、转字符串 + df_lp['价格'] = ( + pd.to_numeric(df_lp['价格'], errors='coerce') + .round().fillna(0).astype(int).astype(str) + ) + + # 2. 拼接“类型:价格” + df_lp['类型_价格'] = df_lp['类型'] + ':' + df_lp['价格'] + + # 3. 按门店聚合两列 + agg_df = df_lp.groupby('门店编码', as_index=False).agg({ + '类型_价格': lambda x: ';'.join(x), + '订单商品名称': lambda x: ';'.join(x) + }) + + # 4. 合并回主表 + data_NGV = data_NGV.merge(agg_df, on='门店编码', how='left') + + # 5. 填充缺失值为空字符串,并重命名列 + data_NGV['类型_价格'] = data_NGV['类型_价格'].fillna('') + data_NGV['订单商品名称'] = data_NGV['订单商品名称'].fillna('') + + data_NGV.rename(columns={ + '类型_价格': '上次购买价格', + '订单商品名称': '订单商品名称' + }, inplace=True) + + # 成员字段替换(现在列名是中文) + staff_name_cols = [ + "专属运营顾问", + "运营专家", + ] + data_NGV = self.replace_names_with_staff_ids(data_NGV, staff_name_cols, self.staff_id_list) + + return data_NGV + + def dispatch_task(self, data_NGV): + """ + 拆分为三个独立动作(输入 data_NGV 列名为中文): + 1. 获取关联数据(NGV_data_id) + 2. 获取区域客服(regional_customer_service) + 3. 字段映射与格式化(中文 → widget),正确处理子表单 + """ + records = [] + no_customer_service_data = [] + + # === 使用通用函数预处理周期性增购子表单 === + cyclic_subforms = self.build_subform_records( + df=self.cyclic_increasing, + group_by_col="门店编码", + field_mapping=self.subform_field_map, + ) + + # === Step 1: 构建 门店编码 → NGV 数据ID 映射 === + org_code_to_ngv_id = {} + for ngv_item in self.NGV_data_list or []: + org_code = ngv_item.get("_widget_1734062123071") + ngv_id = ngv_item.get("_id") + if org_code and ngv_id: + org_code_to_ngv_id[org_code] = ngv_id + + # === Step 2: 定义获取区域客服的函数 === + def get_regional_customer_service(row): + province = row.get("省份") or row.get("province_name") + city = row.get("城市") or row.get("city_name") + area = row.get("区县") or row.get("district_name") or row.get("area_name") + org_code = row.get("门店编码") + + # 若省市区缺失,尝试从 NGV 补全 + if not all([province, city, area]) or any( + v in [None, '', 'None', 'NA'] for v in [province, city, area] + ): + ngv_record = next( + (item for item in self.NGV_data_list + if item.get("_widget_1734062123071") == org_code), + None + ) + if ngv_record: + province = ngv_record.get("_widget_1734062123090") + city = ngv_record.get("_widget_1734062123092") + area = ngv_record.get("_widget_1734062123094") + logger.info(f"【从NGV补全省市区】门店 {org_code}: {province}, {city}, {area}") + + if not all([province, city, area]) or any( + v in [None, '', 'None', 'NA'] for v in [province, city, area] + ): + logger.warning(f"【省市区信息缺失】门店 {org_code} 省市区不完整,客服设为空") + return None + + customer_service = self.get_customer_service_by_location( + str(province).strip(), + str(city).strip(), + str(area).strip(), + self.province_staff_id_list + ) + + if customer_service and "数据缺失" not in str(customer_service): + logger.info(f"【派发客服】门店 {org_code} 派发给客服: {customer_service}") + return customer_service + else: + logger.warning(f"未找到区域客服,请检查门店编码: {org_code}") + return None + + # === Step 3: 遍历主表每一行,构建最终提交记录 === + for _, row in data_NGV.iterrows(): + row_dict = row.to_dict() + + # 3.1 关联数据(NGV ID) + org_code = row_dict.get("门店编码") + ngv_id = org_code_to_ngv_id.get(org_code) + row_dict["关联数据"] = ngv_id if ngv_id else None + if not ngv_id: + logger.warning(f"未找到关联数据,请检查门店编码: {org_code}") + + # 3.2 区域客服 + customer_service = get_regional_customer_service(row_dict) + row_dict["区域客服"] = customer_service + if not customer_service: + no_customer_service_data.append(row_dict) + + # 3.3 注入周期性增购子表单 + row_dict["周期性增购"] = cyclic_subforms.get(org_code, []) + + # 3.4 转换为 widget 格式 + widget_record = self.row_to_dict(row_dict, self.field_map) + records.append(widget_record) + + # === Step 4: 批量提交 === + if not records: + logger.info("无数据需要派发") + return + + payload = { + "api_key": "675b900991ad2491c69389ca", + "entry_id": "6931063d64187eaf6b927557", + "data_list": records + } + print(payload) + + api_instance.entry_data_batch_create(payload) + logger.info(f"已提交 {len(records)} 条数据进行派发") + + def main(self): + + task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + try: + logger.info("任务开始") + # step1: 获取数据 + self.load_all_data() + logger.info("加载数据完成") + # step2:数据处理 + data_NGV = self.process_data() + # step3:数据派发 + self.dispatch_task(data_NGV) + + common_module.send_task_status(task_start_time, "续约回访待办") + except Exception as e: + error_task_logger.error(f"续约回访待办发生错误{e}") + common_module.send_task_error(task_start_time, "续约回访待办", str(e)) + + +if __name__ == '__main__': + RenewalToDo().main() diff --git a/module.py b/module.py index 9647e30..4a1c360 100644 --- a/module.py +++ b/module.py @@ -364,6 +364,18 @@ class Module: print("data_Exception_Task", e) return False + @staticmethod + def renewal_to_do(): + print("GD_match_phone_number") + try: + renewal_to_do = back_ground_module.RenewalToDo() + thread = threading.Thread(target=renewal_to_do.main) + thread.start() + return "data_Exception_Task" + except Exception as e: + print("data_Exception_Task", e) + return False + @staticmethod def text3(): print("text3") diff --git a/task_executor.py b/task_executor.py index d273e1e..bbf5c10 100644 --- a/task_executor.py +++ b/task_executor.py @@ -43,6 +43,7 @@ def execute_task(task_id) -> bool: "非标业绩提报转BI": Module.non_standar_performance_to_BI, "高德匹配手机号": Module.GD_match_phone_number, "省市区人员关系表转BI": Module.province_city_person_relation_to_bi, + "续约回访待办": Module.renewal_to_do, # 添加更多任务函数映射... } diff --git a/test/续约待办派发.py b/test/续约待办派发.py index e8f5664..aa4b6b6 100644 --- a/test/续约待办派发.py +++ b/test/续约待办派发.py @@ -527,7 +527,7 @@ class RenewalToDo: common_module.send_task_status(task_start_time, "续约回访待办") except Exception as e: error_task_logger.error(f"续约回访待办发生错误{e}") - # common_module.send_task_error(task_start_time, "续约回访待办", str(e)) + common_module.send_task_error(task_start_time, "续约回访待办", str(e)) if __name__ == '__main__':