diff --git a/back_ground_module/common_module.py b/back_ground_module/common_module.py index d1647c8..1f03ae9 100644 --- a/back_ground_module/common_module.py +++ b/back_ground_module/common_module.py @@ -81,50 +81,77 @@ class CommonModule: def get_ngv_details(self, days_back=1): """ - 从固定的数据库中获取前几天的NGV明细。 - 参数 `days_back` 表示相对于今天的天数偏移量,默认为1(即前一天)。 - 返回包含NGV明细的pandas DataFrame。 + 重构后适配MySQL的NGV明细获取方法(仅处理saas_create_time字段,全字段保留文本类型) + 参数 `days_back`:相对于今天的天数偏移量,默认为1(前一天) + 返回:pandas DataFrame(所有字段为文本类型,仅saas_create_time做日期格式化),失败返回None """ + conn = None + cursor = None try: - # 获得连接 - conn = psycopg2.connect(**self.conn) + # 1. 建立MySQL连接(仅适配MySQL,参数与原逻辑对齐) + conn = pymysql.connect( + host=Config.BI_CONN_host, + database=Config.BI_CONN_INFO_database, + user=Config.BI_CONN_INFO_user, + password=Config.BI_CONN_INFO_password, + charset='utf8mb4', # MySQL中文兼容 + cursorclass=pymysql.cursors.DictCursor # 保持字典游标,字段名映射一致 + ) cursor = conn.cursor() - # 获取指定天数前的日期 + # 2. 日期计算逻辑(完全复用原始逻辑) now_time = datetime.now() - target_time = now_time + timedelta(days=-days_back) - target_date_id = int(target_time.strftime('%Y%m%d')) # 获取目标日期 + target_time = now_time - timedelta(days=days_back) + target_date_id = int(target_time.strftime('%Y%m%d')) - # sql语句查询 - sql = f""" - SELECT * FROM "public"."holo_ads_report_saas_profile_ngv_detail_d" WHERE "date_id" = '{target_date_id}' ; - """ - - # 执行语句并获取结果集 - cursor.execute(sql) + # 3. MySQL兼容的SQL(仅替换语法,逻辑不变) + sql = """ + SELECT * + FROM `jdy_ngv_data_source` + WHERE `date_id` = %s; + """ + cursor.execute(sql, (target_date_id,)) rows = cursor.fetchall() - all_fields = cursor.description - # 执行结果转化为dataframe - col = [i[0] for i in all_fields] - data_NGV = pd.DataFrame(rows, columns=col) + # 4. 数据转换:强制全字段为文本类型(匹配原始数据源特性) + if rows: + # 核心:所有字段转字符串,空值统一为'',避免后续处理异常 + data_NGV = pd.DataFrame(rows).astype(str).replace({'nan': '', 'NaT': ''}) + else: + data_NGV = pd.DataFrame() - # 尝试自动解析日期时间字符串 + # 5. 仅处理saas_create_time字段(完全复用原始转换逻辑) time_format = "%Y-%m-%d %H:%M:%S" if 'saas_create_time' in data_NGV.columns: - data_NGV['saas_create_time'] = pd.to_datetime(data_NGV['saas_create_time'], format=time_format, - errors='coerce') - data_NGV['saas_create_time'] = data_NGV['saas_create_time'].dt.strftime('%Y-%m-%d') + # 步骤1:解析为datetime(消除格式警告) + temp_dt = pd.to_datetime( + data_NGV['saas_create_time'], + format=time_format, # 指定格式,消除UserWarning + errors='coerce' # 解析失败设为NaT + ) + # 步骤2:转换为YYYY-MM-DD格式的字符串,覆盖原始列(与原逻辑一致) + data_NGV['saas_create_time'] = temp_dt.dt.strftime('%Y-%m-%d').fillna('') - # 关闭游标和连接 - cursor.close() - conn.close() + # 6. 其他时间字段完全保留原始文本格式(不做任何处理) + # (date_fmt/expiry_time等字段仅保留从数据库读取的原始字符串) return data_NGV except Exception as e: - error_task_logger.error(f"获取NGV明细失败: {e}") + error_task_logger.error(f"获取NGV明细失败(MySQL适配): {str(e)}", exc_info=True) return None + finally: + # 确保MySQL连接/游标关闭(资源释放) + if cursor: + try: + cursor.close() + except Exception as e: + error_task_logger.warning(f"关闭MySQL游标失败: {str(e)}") + if conn: + try: + conn.close() + except Exception as e: + error_task_logger.warning(f"关闭MySQL连接失败: {str(e)}") def get_yichang_details(self, days_back=1): """ @@ -180,7 +207,14 @@ class CommonModule: """ try: # 获得连接 - conn = psycopg2.connect(**self.conn) + conn = pymysql.connect( + host=Config.BI_CONN_host, + database=Config.BI_CONN_INFO_database, + user=Config.BI_CONN_INFO_user, + password=Config.BI_CONN_INFO_password, + charset='utf8mb4', # MySQL中文兼容 + cursorclass=pymysql.cursors.DictCursor # 保持字典游标,字段名映射一致 + ) cursor = conn.cursor() # 获取指定天数前的日期 @@ -195,15 +229,26 @@ class CommonModule: print("距离今天还有{}天的日期是:{}".format(days_to_add, future_date)) - sql = f"""SELECT * FROM "public"."holo_ads_report_saas_profile_ngv_detail_d" WHERE "date_id" = '{yes_time_nyr}' and "expiry_time" like '%{future_date}%';""" + sql = """ + SELECT * + FROM `jdy_ngv_data_source` + WHERE `date_id` = %s \ + AND `expiry_time` LIKE %s; \ + """ # 执行语句并获取结果集 - cursor.execute(sql) + like_pattern = f"%{future_date}%" + cursor.execute(sql, (yes_time_nyr, like_pattern)) rows = cursor.fetchall() - all_fields = cursor.description # 获取所有字段名 - # 执行结果转化为dataframe - col = [i[0] for i in all_fields] - data_NGV = pd.DataFrame(list(rows), columns=col) + if rows: + # data_NGV = pd.DataFrame(rows).astype(str).replace({'nan': '', 'NaT': ''}) + all_fields = cursor.description # 获取所有字段名 + + # 执行结果转化为dataframe + col = [i[0] for i in all_fields] + data_NGV = pd.DataFrame(list(rows), columns=col).astype(str).replace({'nan': '', 'NaT': ''}) + else: + data_NGV = pd.DataFrame() # 关闭数据库连接 cursor.close() diff --git a/test/ngv写入数据库.ipynb b/test/ngv写入数据库.ipynb new file mode 100644 index 0000000..82cf776 --- /dev/null +++ b/test/ngv写入数据库.ipynb @@ -0,0 +1,314 @@ +{ + "cells": [ + { + "cell_type": "code", + "id": "initial_id", + "metadata": { + "collapsed": true, + "ExecuteTime": { + "end_time": "2026-01-13T07:56:05.597737600Z", + "start_time": "2026-01-13T07:50:57.192717400Z" + } + }, + "source": [ + "import time\n", + "\n", + "import pandas as pd\n", + "import psycopg2\n", + "import mysql.connector\n", + "from mysql.connector import Error\n", + "from datetime import datetime, timedelta\n", + "import numpy as np\n", + "\n", + "# ========== 配置 ==========\n", + "PG_CONN_INFO = {\n", + " \"database\": \"f6_bi\",\n", + " \"user\": \"LTAI5tMJsijFA9BS1R6uBpUT\",\n", + " \"password\": \"PajEQMIRWNRcipd8mYvlud2KHWJr6N\",\n", + " \"host\": \"hgpostcn-cn-m1e4gikbu00l-cn-shanghai.hologres.aliyuncs.com\",\n", + " \"port\": \"80\"\n", + "}\n", + "\n", + "MYSQL_CONFIG = {\n", + " 'host': \"f6-public.rwlb.rds.aliyuncs.com\",\n", + " 'user': \"rw_operation_data_relay\",\n", + " 'password': \"m+q5Z4%IVuF9bf\",\n", + " 'database': \"f6operation_data_relay\"\n", + "}\n", + "\n", + "SOURCE_TABLE = '\"public\".\"holo_ads_report_saas_profile_ngv_detail_d\"'\n", + "PARTITION_COLUMN = \"date_id\"\n", + "TARGET_TABLE_MYSQL = \"jdy_ngv_data_source\"\n", + "BATCH_SIZE = 2000\n", + "\n", + "# ========== 辅助函数 ==========\n", + "def is_datetime_type(pg_type: str) -> bool:\n", + " if not pg_type:\n", + " return False\n", + " pg_type = pg_type.lower()\n", + " return any(kw in pg_type for kw in ['timestamp', 'datetime', 'date'])\n", + "\n", + "def clean_column_name(name, index):\n", + " \"\"\"将列名转为合法字符串,处理 None / nan / 空值\"\"\"\n", + " if name is None:\n", + " return f\"unknown_col_{index}\"\n", + " if isinstance(name, float) and pd.isna(name):\n", + " return f\"unknown_col_{index}\"\n", + " name_str = str(name).strip()\n", + " if not name_str or name_str.lower() in ('nan', 'none', 'null', ''):\n", + " return f\"unknown_col_{index}\"\n", + " return name_str\n", + "\n", + "def get_source_schema():\n", + " conn = psycopg2.connect(**PG_CONN_INFO)\n", + " cur = conn.cursor()\n", + " cur.execute(\"\"\"\n", + " SELECT column_name, data_type\n", + " FROM information_schema.columns\n", + " WHERE table_schema = 'public'\n", + " AND table_name = 'holo_ads_report_saas_profile_ngv_detail_d'\n", + " ORDER BY ordinal_position;\n", + " \"\"\")\n", + " raw_schema = cur.fetchall()\n", + " cur.close()\n", + " conn.close()\n", + "\n", + " # 清洗列名\n", + " cleaned_schema = []\n", + " for i, (col_name, data_type) in enumerate(raw_schema):\n", + " clean_name = clean_column_name(col_name, i)\n", + " cleaned_schema.append((clean_name, data_type or 'text'))\n", + " return cleaned_schema\n", + "\n", + "def create_ngv_table(cursor, schema):\n", + " col_defs = []\n", + " for col_name, pg_type in schema:\n", + " if is_datetime_type(pg_type):\n", + " col_defs.append(f\"`{col_name}` DATETIME\")\n", + " else:\n", + " col_defs.append(f\"`{col_name}` TEXT\") # ✅ 关键:用 TEXT 避免行大小超限\n", + " create_sql = f\"\"\"\n", + " CREATE TABLE IF NOT EXISTS `{TARGET_TABLE_MYSQL}` (\n", + " {\",\\n \".join(col_defs)}\n", + " ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC;\n", + " \"\"\"\n", + " cursor.execute(create_sql)\n", + " print(\"✅ MySQL 表 NGV 已创建(时间字段为 DATETIME,其余为 TEXT)\")\n", + "\n", + "def normalize_datetime_cols(df, datetime_cols):\n", + " df = df.copy()\n", + " for col in datetime_cols:\n", + " if col in df.columns:\n", + " df[col] = pd.to_datetime(df[col], errors='coerce')\n", + " df[col] = df[col].dt.strftime('%Y-%m-%d %H:%M:%S').where(df[col].notnull(), None)\n", + " return df.where(pd.notnull(df), None)\n", + "\n", + "# ========== 主流程 ==========\n", + "def main():\n", + " # 1. 生成最近10天的 date_id(字符串格式)\n", + " date_ids = [\n", + " (datetime.now().date() - timedelta(days=i)).strftime(\"%Y%m%d\")\n", + " for i in range(3)\n", + " ]\n", + " print(f\"将同步以下 date_id 分区: {date_ids}\")\n", + "\n", + " # 2. 获取并清洗源表结构\n", + " schema = get_source_schema()\n", + " column_names = [col for col, _ in schema]\n", + " datetime_cols = [col for col, typ in schema if is_datetime_type(typ)]\n", + "\n", + " print(f\"检测到 {len(column_names)} 个字段,其中时间字段: {datetime_cols[:3]}...\")\n", + "\n", + " # 3. 连接 MySQL\n", + " mysql_conn = mysql.connector.connect(**MYSQL_CONFIG)\n", + " mysql_cursor = mysql_conn.cursor()\n", + "\n", + " try:\n", + " # 4. 创建目标表\n", + " create_ngv_table(mysql_cursor, schema)\n", + "\n", + " # 5. 构造插入 SQL\n", + " placeholders = \", \".join([\"%s\"] * len(column_names))\n", + " cols_str = \", \".join([f\"`{c}`\" for c in column_names])\n", + " insert_sql = f\"INSERT INTO `{TARGET_TABLE_MYSQL}` ({cols_str}) VALUES ({placeholders})\"\n", + "\n", + " # 6. 清空目标表\n", + " mysql_cursor.execute(f\"TRUNCATE TABLE `{TARGET_TABLE_MYSQL}`;\")\n", + " print(\"🗑️ 已清空 NGV 表\")\n", + "\n", + " # 7. 按 date_id 分批处理\n", + " # -- 新增:固定列名用于 SELECT --\n", + " fixed_columns = [col for col, _ in schema]\n", + " quoted_fixed_columns = \", \".join([f'\"{c}\"' for c in fixed_columns])\n", + "\n", + " # 动态选择排序字段(必须在 fixed_columns 中)\n", + " exclude_cols = {PARTITION_COLUMN} | set(datetime_cols)\n", + " candidates = [col for col in fixed_columns if col not in exclude_cols]\n", + " order_col = f'\"{candidates[0]}\"' if candidates else f'\"{PARTITION_COLUMN}\"'\n", + "\n", + " if \"org_code\" not in column_names:\n", + " raise ValueError(\"❌ 源表中未找到唯一字段 'org_code'\")\n", + "\n", + " for date_id in date_ids:\n", + " print(f\"\\n>>> 处理 date_id = {date_id}\")\n", + " last_org_code = None # 游标:上一批最大的 org_code\n", + " i = 1\n", + "\n", + " while True:\n", + " time.sleep(3)\n", + " pg_conn = psycopg2.connect(**PG_CONN_INFO)\n", + " pg_cur = pg_conn.cursor()\n", + "\n", + " # 构造 WHERE 条件\n", + " if last_org_code is None:\n", + " where_clause = f'\"{PARTITION_COLUMN}\" = %s'\n", + " params = (date_id,)\n", + " else:\n", + " where_clause = f'\"{PARTITION_COLUMN}\" = %s AND \"org_code\" > %s'\n", + " params = (date_id, last_org_code)\n", + "\n", + " sql = f\"\"\"\n", + " SELECT {quoted_fixed_columns}\n", + " FROM {SOURCE_TABLE}\n", + " WHERE {where_clause}\n", + " ORDER BY \"org_code\"\n", + " LIMIT {BATCH_SIZE};\n", + " \"\"\"\n", + " pg_cur.execute(sql, params)\n", + " rows = pg_cur.fetchall()\n", + " pg_cur.close()\n", + " pg_conn.close()\n", + "\n", + " if not rows:\n", + " break\n", + "\n", + " df_batch = pd.DataFrame(rows, columns=fixed_columns)\n", + " df_batch = normalize_datetime_cols(df_batch, datetime_cols)\n", + " df_batch.to_csv(f\"输出查看{i}.csv\", index=False)\n", + " i += 1\n", + "\n", + " # 更新游标:取本批最后一条的 org_code\n", + " last_org_code = df_batch.iloc[-1][\"org_code\"]\n", + "\n", + " # 清洗并插入 MySQL\n", + " def sanitize_row(row):\n", + " return tuple(\n", + " None if (isinstance(x, float) and pd.isna(x)) or pd.isna(x) else x\n", + " for x in row\n", + " )\n", + "\n", + " data_tuples = [sanitize_row(row) for row in df_batch.values]\n", + " mysql_cursor.executemany(insert_sql, data_tuples)\n", + " mysql_conn.commit()\n", + "\n", + " inserted = len(data_tuples)\n", + " print(f\" date_id={date_id} 已插入 {inserted} 行 (last_org_code={last_org_code})\")\n", + "\n", + " print(f\"✅ date_id={date_id} 同步完成\")\n", + "\n", + " print(f\"\\n🎉 同步完成!数据已写入 MySQL 表 `{TARGET_TABLE_MYSQL}`\")\n", + "\n", + " except Exception as e:\n", + " print(f\"❌ 错误: {e}\")\n", + " mysql_conn.rollback()\n", + " finally:\n", + " mysql_cursor.close()\n", + " mysql_conn.close()\n", + "\n", + "if __name__ == \"__main__\":\n", + " main()" + ], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "将同步以下 date_id 分区: ['20260113', '20260112', '20260111']\n", + "检测到 141 个字段,其中时间字段: []...\n", + "✅ MySQL 表 NGV 已创建(时间字段为 DATETIME,其余为 TEXT)\n", + "🗑️ 已清空 NGV 表\n", + "\n", + ">>> 处理 date_id = 20260113\n", + "✅ date_id=20260113 同步完成\n", + "\n", + ">>> 处理 date_id = 20260112\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS201812070004175)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS201903250025112)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS201907240033962)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS201910150040257)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS201912160046642)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202003230057861)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202006080088028)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202010040108419)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202104090119587)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202106210130145)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202108300139429)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202112050146822)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202204250175005)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202209300189703)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202303230219406)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202308210240694)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202402260259380)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202406260273963)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202411040284912)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202503300294788)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202507150304118)\n", + " date_id=20260112 已插入 2000 行 (last_org_code=CHS202511120312981)\n", + " date_id=20260112 已插入 1278 行 (last_org_code=TQB201509180060)\n", + "✅ date_id=20260112 同步完成\n", + "\n", + ">>> 处理 date_id = 20260111\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS201812070004175)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS201903250025112)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS201907240033962)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS201910150040257)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS201912160046642)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202003230057861)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202006080088042)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202010050108424)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202104090119621)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202106210130146)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202108300139429)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202112050146822)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202204250175005)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202209300189703)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202303230219406)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202308210240694)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202402260259380)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202406260273963)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202411040284912)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202503300294779)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202507150304098)\n", + " date_id=20260111 已插入 2000 行 (last_org_code=CHS202511120312975)\n", + " date_id=20260111 已插入 1259 行 (last_org_code=TQB201509180060)\n", + "✅ date_id=20260111 同步完成\n", + "\n", + "🎉 同步完成!数据已写入 MySQL 表 `jdy_ngv_data_source`\n" + ] + } + ], + "execution_count": 7 + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/test/宜搭续约待办转换简道云数据源.py b/test/宜搭续约待办转换简道云数据源.py new file mode 100644 index 0000000..e0a6a22 --- /dev/null +++ b/test/宜搭续约待办转换简道云数据源.py @@ -0,0 +1,524 @@ +import os +import pandas as pd +import json +import ast +import re +from datetime import datetime +from chardet import detect +import unicodedata + + +# ============================== +# 核心修复:处理数组类型数据的工具函数 +# ============================== +def is_empty_value(value): + """判断值是否为空(支持数组、字符串、数值等所有类型)""" + if pd.isna(value): + return True + if value is None: + return True + if isinstance(value, str) and value.strip() == '': + return True + if isinstance(value, (list, tuple, set)) and len(value) == 0: + return True + return False + + +def clean_special_characters(value): + """清理特殊字符(支持数组类型,避免歧义错误)""" + # 处理空值 + if is_empty_value(value): + return value + + # 处理数组类型(如 ['a', 'b']) + if isinstance(value, (list, tuple)): + cleaned_list = [] + for item in value: + if isinstance(item, str): + cleaned_list.append(_clean_single_string(item)) + else: + cleaned_list.append(item) + return cleaned_list + + # 处理字符串类型 + elif isinstance(value, str): + return _clean_single_string(value) + + # 其他类型(数值、布尔等)直接返回 + else: + return value + + +def _clean_single_string(text): + """清理单个字符串的特殊字符(内部调用)""" + try: + # 移除控制字符(保留换行、制表符) + cleaned = ''.join( + char for char in text + if unicodedata.category(char)[0] != 'C' or char in '\n\t' + ) + # 移除特定无法编码的字符(如右至左标记) + special_chars = ['\u202d', '\u202c', '\u202a', '\u202b', '\u200b'] + for char in special_chars: + cleaned = cleaned.replace(char, '') + # 替换全角空格为半角空格 + cleaned = cleaned.replace('\u3000', ' ') + return cleaned + except: + return text + + +def get_safe_encoding(file_path=None): + """获取安全的编码格式(优先UTF-8-SIG)""" + if file_path and os.path.exists(file_path): + with open(file_path, 'rb') as f: + raw_data = f.read(10000) + result = detect(raw_data) + detected = result['encoding'] + if detected in ['utf-8', 'utf-8-sig', 'gbk', 'gb2312']: + return 'utf-8-sig' + return 'utf-8-sig' + + +# ============================== +# 第一步:生成 expanded_yd_data.csv(彻底修复数组问题) +# ============================== +def generate_expanded_csv(output_dir): + """生成展开后的源文件(支持数组类型数据)""" + os.makedirs(output_dir, exist_ok=True) + converted_csv_path = os.path.join(output_dir, "converted_yd_data.csv") + + if not os.path.exists(converted_csv_path): + print(f"❌ 错误:converted_yd_data.csv 不存在,路径:{converted_csv_path}") + return None + + # 读取文件(兼容不同编码) + encoding = get_safe_encoding(converted_csv_path) + try: + df = pd.read_csv(converted_csv_path, encoding=encoding) + print(f"✅ 成功读取 converted_yd_data.csv(编码:{encoding}),数据规模:{df.shape[0]}行 × {df.shape[1]}列") + except Exception as e: + print(f"❌ 读取 converted_yd_data.csv 失败:{str(e)}") + return None + + if 'data' not in df.columns: + print(f"❌ 错误:converted_yd_data.csv 中缺少 'data' 列") + return None + + # 清理非data列的特殊字符(仅处理字符串列) + non_data_cols = [col for col in df.columns if col != 'data'] + for col in non_data_cols: + if df[col].dtype == 'object': # 仅处理字符串类型列 + df[col] = df[col].apply(clean_special_characters) + + # 检查并解析 data 列(核心:处理数组类型) + sample = df['data'].dropna().iloc[0] if not df['data'].dropna().empty else "" + print("Sample of 'data' column (after cleaning):") + print(repr(str(sample))[:200]) # 转为字符串避免数组打印过长 + + if isinstance(sample, str): + print("Detected string format, parsing with ast.literal_eval...") + + def safe_literal_eval(x): + if is_empty_value(x): + return {} + try: + # 先清理字符串,再解析 + cleaned_x = clean_special_characters(x) if isinstance(x, str) else str(x) + parsed = ast.literal_eval(cleaned_x) + # 解析后再次清理(处理数组中的特殊字符) + return clean_special_characters(parsed) + except (ValueError, SyntaxError, TypeError) as e: + print(f"Parse error on: {repr(str(x))[:100]}... Error: {str(e)[:50]}") + return {} + + df['data'] = df['data'].apply(safe_literal_eval) + + # 展开 data 列(处理数组类型,转为字符串存储) + def flatten_expanded_data(expanded_df): + """展平数据,将数组转为字符串""" + for col in expanded_df.columns: + if expanded_df[col].dtype == 'object': + expanded_df[col] = expanded_df[col].apply( + lambda x: ','.join(map(str, x)) if isinstance(x, (list, tuple)) else x + ) + return expanded_df + + expanded = pd.json_normalize(df['data']) + expanded = flatten_expanded_data(expanded) # 数组转字符串(如 ['a','b'] → "a,b") + + # 清理展开后的数据 + for col in expanded.columns: + expanded[col] = expanded[col].apply(clean_special_characters) + + # 合并数据 + other_cols = df.drop(columns=['data']) + new_df = pd.concat([other_cols.reset_index(drop=True), expanded.reset_index(drop=True)], axis=1) + + # 数据过滤(修复数组转字符串后的判断逻辑) + if 'instanceStatus' in new_df.columns: + # 确保过滤条件处理字符串 + new_df['instanceStatus'] = new_df['instanceStatus'].astype(str) + new_df = new_df[new_df["instanceStatus"].str.strip() == "RUNNING"] + print(f"✅ 过滤后(instanceStatus=RUNNING):{new_df.shape[0]}行") + else: + print(f"⚠️ 警告:缺少 'instanceStatus' 列,跳过状态过滤") + + col = "textField_kto3q3ev" + if col in new_df.columns: + # 处理数组转字符串后的空值判断 + new_df[col] = new_df[col].apply( + lambda x: ','.join(map(str, x)) if isinstance(x, (list, tuple)) else x + ).astype(str) + mask2 = (new_df[col].str.strip() == "") | (new_df[col].str.strip() == "nan") + new_df = new_df[mask2] + print(f"✅ 过滤后({col}为空):{new_df.shape[0]}行") + else: + print(f"⚠️ 警告:缺少 '{col}' 列,跳过订单编码过滤") + + # 保存文件(UTF-8-SIG编码) + expanded_csv_path = os.path.join(output_dir, "expanded_yd_data.csv") + try: + new_df.to_csv( + expanded_csv_path, + index=False, + encoding='utf-8-sig', + na_rep='', + errors='replace' + ) + print(f"✅ Expanded data saved to: {expanded_csv_path}(编码:utf-8-sig)") + return expanded_csv_path + except Exception as e: + print(f"❌ 保存 expanded_yd_data.csv 失败:{str(e)}") + return None + + +# ============================== +# 第二步:格式转换核心函数(兼容数组处理结果) +# ============================== +def convert_to_data_ngv_format( + source_csv_path, + target_csv_path, + main_output_path, + additional_output_path, + doc_output_path +): + print("\n=== 开始格式转换(目标格式:data_NGV.csv)===") + try: + # 读取目标文件 + if not os.path.exists(target_csv_path): + raise FileNotFoundError(f"目标文件不存在:{target_csv_path}") + + # 兼容目标文件的编码 + try: + df_target = pd.read_csv(target_csv_path, encoding='utf-8-sig') + target_encoding = 'utf-8-sig' + except: + df_target = pd.read_csv(target_csv_path, encoding='gbk') + target_encoding = 'gbk' + + print(f"✅ 成功读取目标文件:{target_csv_path}(编码:{target_encoding})") + print(f" 数据规模:{df_target.shape[0]}行 × {df_target.shape[1]}列") + + # 读取源文件(已处理数组问题) + df_source = pd.read_csv(source_csv_path, encoding='utf-8-sig') + print(f"✅ 成功读取源文件:{source_csv_path}(编码:utf-8-sig)") + print(f" 数据规模:{df_source.shape[0]}行 × {df_source.shape[1]}列") + + except Exception as e: + print(f"❌ 文件读取失败:{str(e)}") + return False + + # 字段映射(适配 data_NGV 格式) + print("\n=== 建立字段映射关系 ===") + field_mapping = { + 'createTimeGMT': 'saas_create_time', + 'modifiedTimeGMT': 'etl_time', + 'title': 'org_remark', + 'instanceStatus': 'active_status_fmt', + 'processInstanceId': 'id_own_org', + 'actionExecutor': 'technician', + 'originator': 'salesmen', + 'textField_kuj8nx00': 'province_name', + 'textField_kuj8nx01': 'city_name' + } + + # 筛选有效映射 + valid_mapping = {} + for source_field, target_field in field_mapping.items(): + if target_field in df_target.columns and source_field in df_source.columns: + valid_mapping[source_field] = target_field + print(f" {source_field} → {target_field}") + + if len(valid_mapping) == 0: + print(f"⚠️ 警告:未找到有效字段映射,将仅填充默认值") + + # 字段分类 + target_columns = set(df_target.columns) + source_columns = set(df_source.columns) + additional_columns = list(source_columns - target_columns - set(valid_mapping.keys())) + print(f"\n=== 字段统计 ===") + print(f" 目标文件总字段数:{len(target_columns)}") + print(f" 源文件总字段数:{len(source_columns)}") + print(f" 有效映射字段数:{len(valid_mapping)}") + print(f" 额外字段数(放入单独文件):{len(additional_columns)}") + + # 创建结果数据结构 + df_main = df_target.iloc[0:0].copy() + df_main.insert(0, 'main_file_id', '') # 关联ID列 + df_additional = pd.DataFrame(columns=['main_file_id'] + additional_columns) + + # 数据处理工具函数(兼容字符串化的数组) + def extract_org_info(title_text): + if is_empty_value(title_text): + return '', '' + title_str = str(title_text).strip() + org_name_match = re.search(r'门店名称:([^,,\n]+)', title_str) + org_code_match = re.search(r'门店编码:([^,,\n]+)', title_str) + org_name = org_name_match.group(1).strip() if org_name_match else '' + org_code = org_code_match.group(1).strip() if org_code_match else '' + return org_name, org_code + + def extract_technician(executor_text): + if is_empty_value(executor_text): + return '' + executor_str = str(executor_text).strip() + # 匹配字符串化的数组中的中文姓名(如 "{'nameInChinese':'何钊'}") + name_match = re.search(r"'nameInChinese':\s*'([^']+)'", executor_str) + return name_match.group(1).strip() if name_match else '' + + def convert_gmt_time(gmt_str): + if is_empty_value(gmt_str): + return None + try: + time_str = str(gmt_str).replace('T', ' ').replace('Z', '').strip() + return pd.to_datetime(time_str) + except: + return None + + # 逐行处理数据 + print(f"\n=== 开始数据处理(共{len(df_source)}行) ===") + for idx, source_row in df_source.iterrows(): + main_file_id = f"REC-{idx:06d}" + + # 处理主文件数据 + main_row = pd.Series(index=df_main.columns, dtype='object') + main_row['main_file_id'] = main_file_id + + # 填充映射字段(处理字符串化的数组) + for source_field, target_field in valid_mapping.items(): + value = source_row[source_field] + if not is_empty_value(value): + # 将字符串化的数组转为普通字符串(如 "a,b" → "a,b") + if isinstance(value, (list, tuple)): + main_row[target_field] = ','.join(map(str, value)) + else: + main_row[target_field] = str(value).strip() + else: + main_row[target_field] = '' + + # 处理关键业务字段 + create_time = convert_gmt_time(source_row.get('createTimeGMT')) + if create_time: + main_row['date_fmt'] = create_time.strftime('%Y/%m/%d') + main_row['date_id'] = int(create_time.strftime('%Y%m%d')) + main_row['pt'] = main_row['date_id'] + + # 提取门店信息 + org_name, org_code = extract_org_info(source_row.get('title')) + if org_name: + main_row['org_name'] = org_name + main_row['group_name'] = org_name + if org_code: + main_row['org_code'] = org_code + + # 提取处理人 + technician = extract_technician(source_row.get('actionExecutor')) + if technician: + main_row['technician'] = technician + + # 填充默认值 + default_values = { + 'org_type': '一般', + 'org_status': '留存', + 'group_grade': '普通客户(VIP)', + 'is_active': 1, + 'active_status_fmt': '活跃', + 'province_name': main_row.get('province_name', '未知'), + 'city_name': main_row.get('city_name', '未知'), + 'area_name': '未知', + 'is_wechat': 0, + 'is_mini_app': 0, + 'id_own_group': 0, + 'org_code': main_row.get('org_code', '') + } + + for col, default_val in default_values.items(): + if col in main_row.index and is_empty_value(main_row[col]): + main_row[col] = default_val + + df_main.loc[idx] = main_row + + # 处理额外字段文件 + additional_row = pd.Series(index=df_additional.columns, dtype='object') + additional_row['main_file_id'] = main_file_id + + for col in additional_columns: + if col in source_row.index: + value = source_row[col] + if not is_empty_value(value): + # 统一转为字符串存储(兼容数组) + if isinstance(value, (list, tuple)): + additional_row[col] = ','.join(map(str, value)) + else: + additional_row[col] = str(value).strip() + else: + additional_row[col] = '' + + df_additional.loc[idx] = additional_row + + # 进度提示(大数据量优化) + if (idx + 1) % 500 == 0 or (idx + 1) == len(df_source): + print(f" 已处理 {idx + 1}/{len(df_source)} 行") + + # 数据类型优化 + print(f"\n=== 优化数据类型 ===") + numeric_cols = ['date_id', 'pt', 'is_active', 'is_wechat', 'is_mini_app', + 'id_own_group', 'active_user_count', 'limit_user_count'] + + for col in numeric_cols: + if col in df_main.columns: + # 处理字符串格式的数值 + df_main[col] = pd.to_numeric( + df_main[col].astype(str).str.replace(',', ''), # 移除数组转字符串的逗号 + errors='coerce' + ).fillna(0).astype(int) + + # 填充空值 + df_main = df_main.fillna('') + df_additional = df_additional.fillna('') + + # 保存文件 + print(f"\n=== 保存结果文件 ===") + try: + # 保存主文件(匹配 data_NGV 格式) + df_main.to_csv( + main_output_path, + index=False, + encoding='utf-8-sig', + na_rep='', + errors='replace' + ) + print(f"✅ 主文件保存成功: {main_output_path}(编码:utf-8-sig)") + + # 保存额外字段文件 + df_additional.to_csv( + additional_output_path, + index=False, + encoding='utf-8-sig', + na_rep='', + errors='replace' + ) + print(f"✅ 额外字段文件保存成功: {additional_output_path}(编码:utf-8-sig)") + + # 生成说明文档 + generate_relation_doc(doc_output_path, main_output_path, additional_output_path, + len(target_columns), len(source_columns), len(valid_mapping)) + print(f"✅ 关联说明文档保存成功: {doc_output_path}") + + return True + + except Exception as e: + print(f"❌ 文件保存失败: {str(e)}") + return False + + +# ============================== +# 辅助函数:生成关联说明文档 +# ============================== +def generate_relation_doc(doc_path, main_file, additional_file, target_col_count, source_col_count, mapping_count): + doc_content = f"""# 数据格式转换结果说明 +## 目标格式文件:data_NGV.csv + +### 一、文件概述 +1. **主文件(匹配目标格式)** + - 文件名:{os.path.basename(main_file)} + - 格式来源:完全匹配 data_NGV.csv 结构 + - 数据规模:{pd.read_csv(main_file, encoding='utf-8-sig').shape[0]}行 × {target_col_count + 1}列 + - 编码格式:UTF-8-SIG(兼容所有字符和数组数据) + +2. **额外字段文件** + - 文件名:{os.path.basename(additional_file)} + - 包含内容:源文件中 data_NGV.csv 没有的字段 + - 数据规模:{pd.read_csv(additional_file, encoding='utf-8-sig').shape[0]}行 × {pd.read_csv(additional_file, encoding='utf-8-sig').shape[1]}列 + - 编码格式:UTF-8-SIG + +### 二、关键处理说明 +1. **数组数据处理**:源文件中的数组(如 ['a','b'])已转为字符串("a,b")存储,避免格式错误 +2. **特殊字符清理**:自动移除无法编码的控制字符(如右至左标记 \u202d) +3. **编码统一**:所有文件使用 UTF-8-SIG 编码,兼容中文和特殊字符 + +### 三、关联方法 +1. **关联字段**:`main_file_id`(格式:REC-000000) +2. **Excel导入**:数据→自文本/CSV→选择文件→编码选择"UTF-8"→完成 + +### 四、使用建议 +1. 主文件可直接用于业务系统,与 data_NGV.csv 格式完全兼容 +2. 额外字段文件用于原始数据追溯,通过 main_file_id 关联 +3. 数组转字符串后的数据可通过 Excel 的"文本分列"功能恢复为数组 +""" + with open(doc_path, 'w', encoding='utf-8') as f: + f.write(doc_content) + + +# ============================== +# 主执行函数 +# ============================== +def main(): + # 配置文件路径(请根据您的实际位置修改!) + base_dir = "D:\\Idea Project\\SaaS_V1.7\\test\\output" + target_csv_path = "D:\\Idea Project\\SaaS_V1.7\\test\\output\\data_NGV.csv" + + # 生成 expanded_yd_data.csv(解决数组问题) + expanded_csv_path = generate_expanded_csv(base_dir) + if not expanded_csv_path: + print("❌ 生成 expanded_yd_data.csv 失败,终止转换") + return + + # 配置输出路径 + main_output_path = os.path.join(base_dir, "主文件_匹配data_NGV格式.csv") + additional_output_path = os.path.join(base_dir, "额外字段文件_源文件特有列.csv") + doc_output_path = os.path.join(base_dir, "格式转换说明.md") + + # 执行转换 + success = convert_to_data_ngv_format( + source_csv_path=expanded_csv_path, + target_csv_path=target_csv_path, + main_output_path=main_output_path, + additional_output_path=additional_output_path, + doc_output_path=doc_output_path + ) + + if success: + print(f"\n🎉 格式转换全部完成!") + print(f"📁 输出目录:{base_dir}") + print(f"🔔 重要:Excel导入时需选择'UTF-8'编码,数组数据已转为逗号分隔字符串") + else: + print(f"\n❌ 格式转换失败,请查看日志信息排查问题") + + +# ============================== +# 执行入口 +# ============================== +if __name__ == "__main__": + # API模块导入(不影响核心功能) + try: + from yd_api import YDAPI + from api import API + + print("✅ 成功导入 API 模块") + except ImportError: + print("⚠️ 警告:未找到 yd_api 或 api 模块,跳过API初始化(不影响格式转换)") + + # 执行主流程 + main() \ No newline at end of file diff --git a/test/宜搭续约待办转简道云数据源.ipynb b/test/宜搭续约待办转简道云数据源.ipynb new file mode 100644 index 0000000..e949cf9 --- /dev/null +++ b/test/宜搭续约待办转简道云数据源.ipynb @@ -0,0 +1,850 @@ +{ + "cells": [ + { + "metadata": {}, + "cell_type": "markdown", + "source": "# 数据获取", + "id": "be79c113026b04f" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2026-01-14T02:38:34.642781900Z", + "start_time": "2026-01-14T02:31:28.896098900Z" + } + }, + "cell_type": "code", + "source": [ + "import os\n", + "from datetime import datetime, timezone, timedelta\n", + "import pandas as pd\n", + "from holidays.countries import saint_martin as record\n", + "from tqdm import tqdm\n", + "import json\n", + "from yd_api import YDAPI\n", + "from api import API\n", + "import time\n", + "\n", + "output_dir = \"output\"\n", + "os.makedirs(output_dir, exist_ok=True)\n", + "\n", + "api_instance = API()\n", + "yd_api_instance = YDAPI()\n", + "\n", + "\n", + "def generate_monthly_ranges(start: str, end: str):\n", + " \"\"\"\n", + " 生成按自然月划分的时间段列表(左闭右开)\n", + " 例如: [('2025-11-01T00:00:00Z', '2025-12-01T00:00:00Z'), ...]\n", + " \"\"\"\n", + " start_dt = datetime.fromisoformat(start.replace(\"Z\", \"+00:00\"))\n", + " end_dt = datetime.fromisoformat(end.replace(\"Z\", \"+00:00\"))\n", + "\n", + " ranges = []\n", + " current = start_dt\n", + "\n", + " while current < end_dt:\n", + " # 下一个月的第一天\n", + " if current.month == 12:\n", + " next_month = current.replace(year=current.year + 1, month=1, day=1)\n", + " else:\n", + " next_month = current.replace(month=current.month + 1, day=1)\n", + " # 不超过 end_dt\n", + " segment_end = min(next_month, end_dt)\n", + " ranges.append((\n", + " current.strftime(\"%Y-%m-%dT00:00:00Z\"),\n", + " segment_end.strftime(\"%Y-%m-%dT00:00:00Z\")\n", + " ))\n", + " current = next_month\n", + "\n", + " return ranges\n", + "\n", + "\n", + "class GetYDData:\n", + "\n", + " def __init__(self):\n", + " self.FORMID = \"FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22\"\n", + " self.appType = \"APP_UYZ0KG6L0CCNV80GZ66O\"\n", + " self.systemToken = \"XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2\"\n", + "\n", + " # 第一段:2025-01-01 到 2025-11-01\n", + " first_segment = (\"2025-01-01T00:00:00Z\", \"2025-02-01T00:00:00Z\")\n", + "\n", + " # 第二段:2025-11-01 到当前时间(按月拆分)\n", + " now_utc_str = datetime.now(timezone.utc).strftime(\"%Y-%m-%dT%H:%M:%SZ\")\n", + " monthly_segments = generate_monthly_ranges(\"2025-02-01T00:00:00Z\", now_utc_str)\n", + "\n", + " # 合并所有时间段\n", + " self.time_ranges = [first_segment] + monthly_segments\n", + "\n", + " print(\"📅 计划拉取以下时间段:\")\n", + " for i, (s, e) in enumerate(self.time_ranges, 1):\n", + " print(f\" {i}. {s} → {e}\")\n", + "\n", + " def build_value_to_label_map(self, form_structure):\n", + " value_to_label_map = {}\n", + " fields = form_structure.get(\"result\", [])\n", + " for field in fields:\n", + " field_id = field.get(\"fieldId\")\n", + " component = field.get(\"componentName\")\n", + " props = field.get(\"props\", {})\n", + " data_source = props.get(\"dataSource\", [])\n", + "\n", + " if component in [\"SelectField\", \"RadioField\"] and data_source:\n", + " option_map = {}\n", + " for opt in data_source:\n", + " val = opt.get(\"value\")\n", + " if val is None:\n", + " continue\n", + " text_obj = opt.get(\"text\", {})\n", + " if isinstance(text_obj, dict):\n", + " zh_text = text_obj.get(\"zh_CN\")\n", + " if zh_text is None and \"value\" in text_obj:\n", + " raw = text_obj[\"value\"]\n", + " if isinstance(raw, str) and raw.startswith('\"') and raw.endswith('\"'):\n", + " zh_text = raw[1:-1]\n", + " else:\n", + " zh_text = str(val)\n", + " elif zh_text is None:\n", + " zh_text = str(val)\n", + " else:\n", + " zh_text = str(text_obj)\n", + " option_map[str(val)] = zh_text\n", + " if option_map:\n", + " value_to_label_map[field_id] = option_map\n", + " return value_to_label_map\n", + "\n", + " def convert_record_values(self, record, value_map):\n", + " converted = {}\n", + " for key, val in record.items():\n", + " if key in value_map and val is not None:\n", + " str_val = str(val)\n", + " converted[key] = value_map[key].get(str_val, val)\n", + " else:\n", + " converted[key] = val\n", + " return converted\n", + "\n", + " def fetch_records_in_range(self, token, start_time, end_time):\n", + " \"\"\"拉取指定时间范围内的所有记录\"\"\"\n", + " try:\n", + " first_page = yd_api_instance.read_processes_instances(\n", + " token=token,\n", + " formUuid=self.FORMID,\n", + " page=1,\n", + " n=100,\n", + " appType=self.appType,\n", + " systemToken=self.systemToken,\n", + " instanceStatus=\"\",\n", + " modifiedFromTimeGMT=start_time,\n", + " modifiedToTimeGMT=end_time,\n", + " )\n", + " except Exception as e:\n", + " print(f\"❌ 首页请求失败 ({start_time} ~ {end_time}): {e}\")\n", + " return []\n", + "\n", + " total_count = first_page.get(\"totalCount\", 0)\n", + " total_pages = (total_count // 100) + (1 if total_count % 100 else 0)\n", + " print(f\"📊 [{start_time[:10]} ~ {end_time[:10]}] 总记录数: {total_count}, 共 {total_pages} 页\")\n", + "\n", + " all_records = []\n", + " if total_count > 0:\n", + " all_records.extend(first_page.get(\"data\", []))\n", + " for page in tqdm(range(2, total_pages + 1), desc=f\"{start_time[:7]}\"):\n", + " try:\n", + " resp = yd_api_instance.read_processes_instances(\n", + " token=token,\n", + " formUuid=self.FORMID,\n", + " page=page,\n", + " n=100,\n", + " appType=self.appType,\n", + " systemToken=self.systemToken,\n", + " instanceStatus=\"\",\n", + " modifiedFromTimeGMT=start_time,\n", + " modifiedToTimeGMT=end_time,\n", + " )\n", + " page_data = resp.get(\"data\", [])\n", + " all_records.extend(page_data)\n", + " time.sleep(0.15) # 稍微增加间隔,更安全\n", + " except Exception as e:\n", + " print(f\"⚠️ 第 {page} 页失败 ({start_time[:10]}): {e}\")\n", + " continue\n", + " return all_records\n", + "\n", + " def main(self):\n", + " # Step 1: 获取表单结构\n", + " token = yd_api_instance.generateToken()\n", + " form_struct = yd_api_instance.get_form_structures(\n", + " token=token,\n", + " formUuid=self.FORMID\n", + " )\n", + " value_map = self.build_value_to_label_map(form_struct)\n", + " print(\"\\n✅ 表单选项映射构建完成\")\n", + "\n", + " # Step 2: 按时间段拉取\n", + " all_records = []\n", + " all_records_detils = []\n", + " for start_time, end_time in self.time_ranges:\n", + " print(f\"\\n⏳ 拉取: {start_time} → {end_time}\")\n", + " records = self.fetch_records_in_range(token, start_time, end_time)\n", + " all_records.extend(records)\n", + " try:\n", + " record_data = record.get(\"data\", [])\n", + " all_records_detils.extend(record_data)\n", + " except Exception as e:\n", + " continue\n", + "\n", + " print(f\"\\n📥 总共获取 {len(all_records)} 条流程实例\")\n", + "\n", + " # # Step 3: 转换 formData\n", + " converted_records = []\n", + " for inst in all_records:\n", + " form_data = inst.get(\"formData\", {})\n", + " converted = self.convert_record_values(form_data, value_map)\n", + " converted_records.append(converted)\n", + "\n", + " # Step 4: 保存\n", + " if all_records:\n", + " df = pd.DataFrame(all_records)\n", + " output_path = \"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\converted_yd_data.csv\"\n", + " df.to_csv(output_path, index=False)\n", + "\n", + " df1 = pd.DataFrame(all_records_detils)\n", + " output_path1 = \"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\converted_yd_data_detail.csv\"\n", + " df1.to_csv(output_path1, index=False)\n", + " print(f\"\\n✅ 成功保存 {len(all_records)} 条记录至: {output_path}\")\n", + " else:\n", + " print(\"\\n❌ 无有效数据\")\n", + "\n", + "\n", + "if __name__ == \"__main__\":\n", + " GetYDData().main()" + ], + "id": "1974d0f55ce6f25d", + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "📅 计划拉取以下时间段:\n", + " 1. 2025-01-01T00:00:00Z → 2025-02-01T00:00:00Z\n", + " 2. 2025-02-01T00:00:00Z → 2025-03-01T00:00:00Z\n", + " 3. 2025-03-01T00:00:00Z → 2025-04-01T00:00:00Z\n", + " 4. 2025-04-01T00:00:00Z → 2025-05-01T00:00:00Z\n", + " 5. 2025-05-01T00:00:00Z → 2025-06-01T00:00:00Z\n", + " 6. 2025-06-01T00:00:00Z → 2025-07-01T00:00:00Z\n", + " 7. 2025-07-01T00:00:00Z → 2025-08-01T00:00:00Z\n", + " 8. 2025-08-01T00:00:00Z → 2025-09-01T00:00:00Z\n", + " 9. 2025-09-01T00:00:00Z → 2025-10-01T00:00:00Z\n", + " 10. 2025-10-01T00:00:00Z → 2025-11-01T00:00:00Z\n", + " 11. 2025-11-01T00:00:00Z → 2025-12-01T00:00:00Z\n", + " 12. 2025-12-01T00:00:00Z → 2026-01-01T00:00:00Z\n", + " 13. 2026-01-01T00:00:00Z → 2026-01-14T00:00:00Z\n", + "\n", + "✅ 表单选项映射构建完成\n", + "\n", + "⏳ 拉取: 2025-01-01T00:00:00Z → 2025-02-01T00:00:00Z\n", + "📊 [2025-01-01 ~ 2025-02-01] 总记录数: 649, 共 7 页\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2025-01: 100%|██████████| 6/6 [00:12<00:00, 2.02s/it]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "⏳ 拉取: 2025-02-01T00:00:00Z → 2025-03-01T00:00:00Z\n", + "📊 [2025-02-01 ~ 2025-03-01] 总记录数: 936, 共 10 页\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2025-02: 100%|██████████| 9/9 [00:18<00:00, 2.01s/it]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "⏳ 拉取: 2025-03-01T00:00:00Z → 2025-04-01T00:00:00Z\n", + "📊 [2025-03-01 ~ 2025-04-01] 总记录数: 1272, 共 13 页\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2025-03: 100%|██████████| 12/12 [00:24<00:00, 2.08s/it]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "⏳ 拉取: 2025-04-01T00:00:00Z → 2025-05-01T00:00:00Z\n", + "📊 [2025-04-01 ~ 2025-05-01] 总记录数: 1099, 共 11 页\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2025-04: 100%|██████████| 10/10 [00:20<00:00, 2.10s/it]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "⏳ 拉取: 2025-05-01T00:00:00Z → 2025-06-01T00:00:00Z\n", + "📊 [2025-05-01 ~ 2025-06-01] 总记录数: 1327, 共 14 页\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2025-05: 100%|██████████| 13/13 [00:27<00:00, 2.10s/it]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "⏳ 拉取: 2025-06-01T00:00:00Z → 2025-07-01T00:00:00Z\n", + "📊 [2025-06-01 ~ 2025-07-01] 总记录数: 1169, 共 12 页\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2025-06: 100%|██████████| 11/11 [00:22<00:00, 2.05s/it]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "⏳ 拉取: 2025-07-01T00:00:00Z → 2025-08-01T00:00:00Z\n", + "📊 [2025-07-01 ~ 2025-08-01] 总记录数: 1328, 共 14 页\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2025-07: 100%|██████████| 13/13 [00:25<00:00, 1.99s/it]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "⏳ 拉取: 2025-08-01T00:00:00Z → 2025-09-01T00:00:00Z\n", + "📊 [2025-08-01 ~ 2025-09-01] 总记录数: 1551, 共 16 页\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2025-08: 100%|██████████| 15/15 [00:31<00:00, 2.09s/it]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "⏳ 拉取: 2025-09-01T00:00:00Z → 2025-10-01T00:00:00Z\n", + "📊 [2025-09-01 ~ 2025-10-01] 总记录数: 1227, 共 13 页\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2025-09: 100%|██████████| 12/12 [00:24<00:00, 2.02s/it]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "⏳ 拉取: 2025-10-01T00:00:00Z → 2025-11-01T00:00:00Z\n", + "📊 [2025-10-01 ~ 2025-11-01] 总记录数: 1364, 共 14 页\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2025-10: 100%|██████████| 13/13 [00:25<00:00, 1.94s/it]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "⏳ 拉取: 2025-11-01T00:00:00Z → 2025-12-01T00:00:00Z\n", + "📊 [2025-11-01 ~ 2025-12-01] 总记录数: 2219, 共 23 页\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2025-11: 100%|██████████| 22/22 [00:47<00:00, 2.14s/it]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "⏳ 拉取: 2025-12-01T00:00:00Z → 2026-01-01T00:00:00Z\n", + "📊 [2025-12-01 ~ 2026-01-01] 总记录数: 3624, 共 37 页\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2025-12: 100%|██████████| 36/36 [01:25<00:00, 2.38s/it]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "⏳ 拉取: 2026-01-01T00:00:00Z → 2026-01-14T00:00:00Z\n", + "📊 [2026-01-01 ~ 2026-01-14] 总记录数: 1408, 共 15 页\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-01: 100%|██████████| 14/14 [00:30<00:00, 2.20s/it]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "📥 总共获取 19173 条流程实例\n", + "\n", + "✅ 成功保存 19173 条记录至: output\\converted_yd_data.csv\n" + ] + } + ], + "execution_count": 9 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "# 数据过滤", + "id": "d4493b51868b83e6" + }, + { + "cell_type": "code", + "id": "initial_id", + "metadata": { + "collapsed": true, + "ExecuteTime": { + "end_time": "2026-01-14T05:38:38.667141600Z", + "start_time": "2026-01-14T05:38:26.972349600Z" + } + }, + "source": [ + "import sys\n", + "import os\n", + "import pandas as pd\n", + "import ast\n", + "from datetime import datetime, timedelta\n", + "import pytz\n", + "\n", + "# 获取当前 notebook 所在目录的上级目录,并加入 Python 路径\n", + "nb_path = os.path.abspath('') # 当前 notebook 所在目录\n", + "parent_dir = os.path.dirname(nb_path)\n", + "sys.path.append(parent_dir)\n", + "\n", + "from yd_api import YDAPI\n", + "from api import API\n", + "\n", + "output_dir = \"output\"\n", + "os.makedirs(output_dir, exist_ok=True)\n", + "\n", + "api_instance = API()\n", + "yd_api_instance = YDAPI()\n", + "\n", + "df = pd.read_csv(\"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\converted_yd_data.csv\").astype(str)\n", + "# df = df[df[\"instanceStatus\"] == \"RUNNING\"]\n", + "\n", + "# 检查 data 列的第一个非空值类型\n", + "sample = df['data'].dropna().iloc[0] if not df['data'].dropna().empty else \"\"\n", + "print(\"Sample of 'data' column:\")\n", + "print(repr(sample)[:200])\n", + "\n", + "if isinstance(sample, str):\n", + " print(\"Detected string format, parsing with ast.literal_eval...\")\n", + "\n", + "\n", + " def safe_literal_eval(x):\n", + " if pd.isna(x) or x == \"\":\n", + " return {}\n", + " try:\n", + " return ast.literal_eval(x)\n", + " except (ValueError, SyntaxError) as e:\n", + " print(f\"Parse error on: {repr(x)[:100]}... Error: {e}\")\n", + " return {}\n", + "\n", + "\n", + " df['data'] = df['data'].apply(safe_literal_eval)\n", + "\n", + "# 展开 data 列\n", + "expanded = pd.json_normalize(df['data'])\n", + "other_cols = df.drop(columns=['data'])\n", + "new_df = pd.concat([other_cols.reset_index(drop=True), expanded.reset_index(drop=True)], axis=1)\n", + "\n", + "# 先转为字符串是为了避免后续列缺失报错,但日期处理需要原始类型\n", + "# 所以我们先做日期过滤,再转字符串(调整顺序)\n", + "\n", + "# ===== 新增:日期过滤逻辑 =====\n", + "date_col = \"dateField_ksirro5l\"\n", + "\n", + "if date_col not in new_df.columns:\n", + " print(f\"⚠️ Warning: Column '{date_col}' not found!\")\n", + "else:\n", + " # 将时间戳转为 datetime(假设是毫秒级 Unix 时间戳)\n", + " def parse_timestamp(val):\n", + " if pd.isna(val) or val == \"\" or val == \"NaT\":\n", + " return pd.NaT\n", + " try:\n", + " # 转换为 int 并除以 1000 得到秒级时间戳(毫秒级)\n", + " ts = int(float(val)) / 1000.0\n", + " return pd.to_datetime(ts, unit='s', utc=True)\n", + " except Exception as e:\n", + " print(f\"Parse error on timestamp: {val} -> Error: {e}\")\n", + " return pd.NaT\n", + "\n", + "\n", + " new_df[date_col] = new_df[date_col].apply(parse_timestamp)\n", + "\n", + " # 设置当前 UTC 时间\n", + " now_utc = pd.Timestamp.now(tz='UTC')\n", + " future_150_days = now_utc + timedelta(days=150)\n", + "\n", + " # 筛选条件:非空,且在 [now, now+150天] 区间内\n", + " # ===== 调整后:日期范围为 [过去30天, 未来120天] =====\n", + " past_30_days = now_utc - timedelta(days=30)\n", + " future_120_days = now_utc + timedelta(days=120)\n", + "\n", + " mask_date = (\n", + " new_df[date_col].notna() &\n", + " (new_df[date_col] >= past_30_days) &\n", + " (new_df[date_col] <= future_120_days)\n", + " )\n", + " new_df = new_df[mask_date]\n", + "\n", + "# 继续原有筛选:RUNNING 状态\n", + "# new_df = new_df[new_df[\"instanceStatus\"] == \"RUNNING\"]\n", + "\n", + "# 订单编码为空\n", + "col_order = \"textField_kto3q3ev\"\n", + "mask2 = (\n", + " new_df[col_order].isnull() |\n", + " (new_df[col_order].astype(str).str.strip() == \"\")\n", + ")\n", + "new_df = new_df[mask2]\n", + "\n", + "# 最后再转为字符串(避免日期被转成奇怪字符串影响保存)\n", + "new_df = new_df.astype(str)\n", + "\n", + "# 保存结果\n", + "new_df.to_csv(\"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\expanded_yd_data.csv\", index=False, encoding='utf-8-sig')\n", + "print(\"✅ Expanded and filtered data saved to 'expanded_yd_data.csv'\")" + ], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "<>:22: SyntaxWarning: invalid escape sequence '\\I'\n", + "<>:106: SyntaxWarning: invalid escape sequence '\\I'\n", + "<>:22: SyntaxWarning: invalid escape sequence '\\I'\n", + "<>:106: SyntaxWarning: invalid escape sequence '\\I'\n", + "C:\\Users\\hp_z66\\AppData\\Local\\Temp\\ipykernel_2148\\1452756513.py:22: SyntaxWarning: invalid escape sequence '\\I'\n", + " df = pd.read_csv(\"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\converted_yd_data.csv\").astype(str)\n", + "C:\\Users\\hp_z66\\AppData\\Local\\Temp\\ipykernel_2148\\1452756513.py:106: SyntaxWarning: invalid escape sequence '\\I'\n", + " new_df.to_csv(\"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\expanded_yd_data.csv\", index=False, encoding='utf-8-sig')\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Sample of 'data' column:\n", + "'{\\'employeeField_ljz6416i_id\\': [\\'032861373036352679\\', \\'1253235059942945\\'], \\'textField_kuj8nx00\\': \\'山东省\\', \\'selectField_kuz7mfmd\\': \\'E.L\\', \\'textField_kuj8nx01\\': \\'菏泽市\\', \\'employeeField_ks\n", + "Detected string format, parsing with ast.literal_eval...\n", + "✅ Expanded and filtered data saved to 'expanded_yd_data.csv'\n" + ] + } + ], + "execution_count": 17 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "# 数据派发", + "id": "4ef868ec5caa7297" + }, + { + "metadata": {}, + "cell_type": "code", + "execution_count": null, + "source": "# 续约待办派发测试数据", + "id": "475a2fcbd8c1760d", + "outputs": [] + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "# 字段刷写", + "id": "c4533794f287dc49" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2026-01-14T07:09:31.106319900Z", + "start_time": "2026-01-14T07:09:23.173718600Z" + } + }, + "cell_type": "code", + "source": [ + "import os\n", + "from datetime import datetime\n", + "import pandas as pd\n", + "from api import API\n", + "from back_ground_module import CommonModule\n", + "from log_config import configure_task_logger, configure_error_task_logger\n", + "from collections import defaultdict\n", + "from datetime import datetime, timezone, timedelta, date, UTC\n", + "from config import Config\n", + "from yd_api import YDAPI\n", + "\n", + "logger = configure_task_logger()\n", + "error_task_logger = configure_error_task_logger()\n", + "api_instance = API()\n", + "yd_api_instance = YDAPI()\n", + "common_module = CommonModule()\n", + "output_dir = \"output\" # 设置输出目录\n", + "os.makedirs(output_dir, exist_ok=True)\n", + "\n", + "df = pd.read_csv(\"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\expanded_yd_data.csv\").astype(str)\n", + "df2 = pd.read_excel(\n", + " \"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\续约服务流程_历史维修记录迁移测试_20260114094227.xlsx\").astype(\n", + " str) # 简道云导出文件\n", + "\n", + "# 从df中获取流程编码获取流程详细信息\n", + "token = yd_api_instance.generateToken()\n", + "FORMID = \"FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22\"\n", + "appType = \"APP_UYZ0KG6L0CCNV80GZ66O\"\n", + "systemToken = \"XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2\"\n", + "\n", + "all_instance_data = []\n", + "for index, row in df[:2].iterrows():\n", + " instance_id = row[\"processInstanceId\"]\n", + " instance_info = yd_api_instance.processes_instancesInfos(token, instance_id, appType, systemToken)\n", + " all_instance_data.append(instance_info.get(\"data\"))\n", + "\n", + "ndf = pd.DataFrame(all_instance_data)\n", + "ndf.to_csv(\"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\yd_process_details.csv\")\n", + "\n", + "# 简道云宜搭map\n", + "# step1 简道云字段唯一值map\n", + "jdy_map = {\"关联数据\": \"_widget_1764820541663\",\n", + " \"公司名称\": \"_widget_1764820541616\",\n", + " \"门店名称\": \"_widget_1764820541617\",\n", + " \"门店编码\": \"_widget_1764820541661\",\n", + " \"加盟商\": \"_widget_1764820541618\",\n", + " \"过期日\": \"_widget_1764820541672\",\n", + " \"Saas版本\": \"_widget_1764820541623\",\n", + " \"上次购买价格\": \"_widget_1764820541624\",\n", + " \"联系人\": \"_widget_1764820541621\",\n", + " \"联系手机号\": \"_widget_1764820541622\",\n", + " \"运营顾问\": \"_widget_1764820541625\",\n", + " \"订单商品名称\": \"_widget_1766730385209\",\n", + " \"120天是否跟进\": \"_widget_1764820541628\",\n", + " \"120天处理人\": \"_widget_1764820541634\",\n", + " \"120天跟进时间\": \"_widget_1765352838631\",\n", + " \"60天是否跟进\": \"_widget_1764820541630\",\n", + " \"60天处理人\": \"_widget_1764820541635\",\n", + " \"60天跟进时间\": \"_widget_1765352838632\",\n", + " \"30天是否跟进\": \"_widget_1764820541632\",\n", + " \"30天处理人\": \"_widget_1764820541636\",\n", + " \"30天跟进时间\": \"_widget_1765352838633\",\n", + " \"是否联系上\": \"_widget_1764820541638\",\n", + " \"现阶段问题\": \"_widget_1764820541641\",\n", + " \"未联系原因\": \"_widget_1765330820509\",\n", + " \"联系情况及问题说明\": \"_widget_1764820541653\",\n", + " \"回访方式\": \"_widget_1764820541697\",\n", + " \"潜在商机\": \"_widget_1764820541657\",\n", + " \"商机归属\": \"_widget_1766633812301\",\n", + " \"商机详情\": \"_widget_1764820541659\",\n", + " \"续约意愿\": \"_widget_1764820541654\",\n", + " \"不续约原因\": \"_widget_1764820541700\",\n", + " \"产品问题\": \"_widget_1764820541707\",\n", + " \"服务问题\": \"_widget_1764820541709\",\n", + " \"门店问题\": \"_widget_1764820541711\",\n", + " \"价格问题\": \"_widget_1764820541713\",\n", + " \"不续约具体情况说明\": \"_widget_1764820541702\",\n", + " \"周期性增购\": \"_widget_1764820541717\",\n", + " \"周期性增购.商品名称\": \"_widget_1764820541717._widget_1764820541719\",\n", + " \"周期性增购.购买数量\": \"_widget_1764820541717._widget_1764820541722\",\n", + " \"周期性增购.购买金额\": \"_widget_1764820541717._widget_1764820541720\",\n", + " \"周期性增购.应续约日\": \"_widget_1764820541717._widget_1764820541721\",\n", + " \"周期性增购.是否愿意续约\": \"_widget_1764820541717._widget_1764820541724\",\n", + " \"周期性增购.不续约原因\": \"_widget_1764820541717._widget_1764820541723\",\n", + " \"周期性增购.续约后订单编码\": \"_widget_1764820541717._widget_1764820541725\",\n", + " \"连锁门店待办同步处理\": \"_widget_1764820541681\",\n", + " \"选择需要同步的门店名称\": \"_widget_1765330820391\",\n", + " \"订单编码\": \"_widget_1764820541674\",\n", + " \"订单支付日期\": \"_widget_1764820541679\",\n", + " \"本次-实付金额(元)\": \"_widget_1764820541676\",\n", + " \"业务类型(续约、升级)\": \"_widget_1764820541680\",\n", + " \"120天自动流转时间\": \"_widget_1764820541865\",\n", + " \"60天自动流转时间\": \"_widget_1765964381895\",\n", + " \"30天自动流转时间\": \"_widget_1765964381896\",\n", + " \"0天自动流转时间\": \"_widget_1765964381897\",\n", + " \"当前所处节点\": \"_widget_1765352838609\",\n", + " \"流程状态\": \"_widget_1765352838610\",\n", + " \"经营模式\": \"_widget_1765964381952\",\n", + " \"程序关停\": \"_widget_1765866283543\",\n", + " \"区域客服\": \"_widget_1764820541715\",\n", + " \"公司等级\": \"_widget_1766130435561\",\n", + " \"运营专家\": \"_widget_1764820541678\",\n", + " \"流水号\": \"_widget_1766376046563\",\n", + " \"是否已同步宜搭\": \"_widget_1766469131897\",\n", + " \"连锁店数据同步辅助:门店列表\": \"_widget_1766633812134\",\n", + " \"公司id\": \"_widget_1766631811839\",\n", + " }\n", + "# step2 宜搭唯一值简道云唯一值map\n", + "yd_jdy_map = {\n", + " \"120天是否跟进\": \"radioField_kuntp6fm\",\n", + " \"120天处理人\": \"textField_livc8bjj\",\n", + " \"120天跟进时间\": \"dateField_lifr1fdv\",\n", + " \"60天是否跟进\": \"radioField_kurxyhvp\",\n", + " \"60天处理人\": \"textField_livc8bjl\",\n", + " \"60天跟进时间\": \"dateField_lifr1fdx\",\n", + " \"30天是否跟进\": \"radioField_kurxyhvq\",\n", + " \"30天处理人\": \"textField_livc8bjm\",\n", + " \"30天跟进时间\": \"dateField_lifr1fdy\",\n", + "}\n", + "\n", + "\n", + "\n", + "\n" + ], + "id": "1a9d527c7026a96c", + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "<>:20: SyntaxWarning: invalid escape sequence '\\I'\n", + "<>:22: SyntaxWarning: invalid escape sequence '\\I'\n", + "<>:38: SyntaxWarning: invalid escape sequence '\\I'\n", + "<>:20: SyntaxWarning: invalid escape sequence '\\I'\n", + "<>:22: SyntaxWarning: invalid escape sequence '\\I'\n", + "<>:38: SyntaxWarning: invalid escape sequence '\\I'\n", + "C:\\Users\\hp_z66\\AppData\\Local\\Temp\\ipykernel_2148\\1799850687.py:20: SyntaxWarning: invalid escape sequence '\\I'\n", + " df = pd.read_csv(\"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\expanded_yd_data.csv\").astype(str)\n", + "C:\\Users\\hp_z66\\AppData\\Local\\Temp\\ipykernel_2148\\1799850687.py:22: SyntaxWarning: invalid escape sequence '\\I'\n", + " \"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\续约服务流程_历史维修记录迁移测试_20260114094227.xlsx\").astype(\n", + "C:\\Users\\hp_z66\\AppData\\Local\\Temp\\ipykernel_2148\\1799850687.py:38: SyntaxWarning: invalid escape sequence '\\I'\n", + " ndf.to_csv(\"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\yd_process_details.csv\")\n" + ] + } + ], + "execution_count": 22 + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2026-01-14T05:42:53.361991700Z", + "start_time": "2026-01-14T05:42:53.310045900Z" + } + }, + "cell_type": "code", + "source": "token", + "id": "ad8d93d00c8a3be4", + "outputs": [ + { + "data": { + "text/plain": [ + "'e22e9617f9c23d20b6575cf7ecdcbbd2'" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "execution_count": 19 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "# 流程同步", + "id": "2c6a1c0a573f24cb" + }, + { + "metadata": {}, + "cell_type": "code", + "outputs": [], + "execution_count": null, + "source": "", + "id": "7652deba86d75e24" + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/test/宜搭续约待办转简道云源数据新.py b/test/宜搭续约待办转简道云源数据新.py new file mode 100644 index 0000000..02f0a82 --- /dev/null +++ b/test/宜搭续约待办转简道云源数据新.py @@ -0,0 +1,59 @@ +import os +import pandas as pd +import json +import ast # 👈 新增导入 +from yd_api import YDAPI +from api import API + +output_dir = "output" +os.makedirs(output_dir, exist_ok=True) + +api_instance = API() +yd_api_instance = YDAPI() + +df = pd.read_csv(os.path.join(output_dir, "converted_yd_data.csv")) + +# 检查 data 列的第一个非空值类型 +sample = df['data'].dropna().iloc[0] if not df['data'].dropna().empty else "" +print("Sample of 'data' column:") +print(repr(sample)[:200]) # 打印前200字符,看是单引号还是双引号 + +# 如果是字符串(且是 Python dict 格式,单引号),用 ast.literal_eval +if isinstance(sample, str): + print("Detected string format, parsing with ast.literal_eval...") + + + # 安全地将字符串转为字典 + def safe_literal_eval(x): + if pd.isna(x) or x == "": + return {} + try: + return ast.literal_eval(x) + except (ValueError, SyntaxError) as e: + print(f"Parse error on: {repr(x)[:100]}... Error: {e}") + return {} + + + df['data'] = df['data'].apply(safe_literal_eval) + +# 展开 data 列 +expanded = pd.json_normalize(df['data']) + +# 保留其他列(注意:原列是 'data',不是 'raw_data') +other_cols = df.drop(columns=['data']) + +# 合并 +new_df = pd.concat([other_cols.reset_index(drop=True), expanded.reset_index(drop=True)], axis=1).astype(str) +# 过滤进行中 +new_df = new_df[new_df["instanceStatus"] == "RUNNING"] +# 订单编码为空 +col = "textField_kto3q3ev" +mask2 = ( + new_df[col].isnull() | + (new_df[col].astype(str).str.strip() == "") +) +new_df = new_df[mask2] +# 保存结果 +new_df.to_csv(os.path.join(output_dir, "expanded_yd_data.csv"), index=False, encoding='utf-8-sig') +print("✅ Expanded data saved to 'expanded_yd_data.csv'") + diff --git a/test/宜搭获取续约待办数据.py b/test/宜搭获取续约待办数据.py index cd1334c..d2972ed 100644 --- a/test/宜搭获取续约待办数据.py +++ b/test/宜搭获取续约待办数据.py @@ -1,6 +1,7 @@ import os from datetime import datetime, timezone, timedelta import pandas as pd +from holidays.countries import saint_martin as record from tqdm import tqdm import json from yd_api import YDAPI @@ -48,11 +49,11 @@ class GetYDData: self.systemToken = "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2" # 第一段:2025-01-01 到 2025-11-01 - first_segment = ("2025-01-01T00:00:00Z", "2025-11-01T00:00:00Z") + first_segment = ("2025-01-01T00:00:00Z", "2025-02-01T00:00:00Z") # 第二段:2025-11-01 到当前时间(按月拆分) now_utc_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") - monthly_segments = generate_monthly_ranges("2025-11-01T00:00:00Z", now_utc_str) + monthly_segments = generate_monthly_ranges("2025-02-01T00:00:00Z", now_utc_str) # 合并所有时间段 self.time_ranges = [first_segment] + monthly_segments @@ -114,7 +115,7 @@ class GetYDData: n=100, appType=self.appType, systemToken=self.systemToken, - instanceStatus="RUNNING", + instanceStatus="", modifiedFromTimeGMT=start_time, modifiedToTimeGMT=end_time, ) @@ -138,7 +139,7 @@ class GetYDData: n=100, appType=self.appType, systemToken=self.systemToken, - instanceStatus="RUNNING", + instanceStatus="", modifiedFromTimeGMT=start_time, modifiedToTimeGMT=end_time, ) @@ -162,14 +163,20 @@ class GetYDData: # Step 2: 按时间段拉取 all_records = [] + all_records_detils = [] for start_time, end_time in self.time_ranges: print(f"\n⏳ 拉取: {start_time} → {end_time}") records = self.fetch_records_in_range(token, start_time, end_time) all_records.extend(records) + try: + record_data = record.get("data", []) + all_records_detils.extend(record_data) + except Exception as e: + continue print(f"\n📥 总共获取 {len(all_records)} 条流程实例") - # Step 3: 转换 formData + # # Step 3: 转换 formData converted_records = [] for inst in all_records: form_data = inst.get("formData", {}) @@ -177,11 +184,15 @@ class GetYDData: converted_records.append(converted) # Step 4: 保存 - if converted_records: - df = pd.DataFrame(converted_records) + if all_records: + df = pd.DataFrame(all_records) output_path = os.path.join(output_dir, "converted_yd_data.csv") - df.to_csv(output_path, index=False, encoding="utf_8_sig") - print(f"\n✅ 成功保存 {len(converted_records)} 条记录至: {output_path}") + df.to_csv(output_path, index=False) + + df1 = pd.DataFrame(all_records_detils) + output_path1 = os.path.join(output_dir, "converted_yd_data_detail.csv") + df1.to_csv(output_path1, index=False) + print(f"\n✅ 成功保存 {len(all_records)} 条记录至: {output_path}") else: print("\n❌ 无有效数据") diff --git a/test/续约待办派发.py b/test/续约待办派发.py index aa4b6b6..579d7b7 100644 --- a/test/续约待办派发.py +++ b/test/续约待办派发.py @@ -122,7 +122,6 @@ class RenewalToDo: def load_all_data(self): """ 从各类来源加载数据上加载数据 - :return: """ # 数据库获取续约回访数据 diff --git a/test/续约待办派发测试数据.py b/test/续约待办派发测试数据.py index 9b28bf1..caef20a 100644 --- a/test/续约待办派发测试数据.py +++ b/test/续约待办派发测试数据.py @@ -5,7 +5,8 @@ from api import API from back_ground_module import CommonModule from log_config import configure_task_logger, configure_error_task_logger from collections import defaultdict - +from datetime import datetime, timezone, timedelta, date, UTC +from config import Config logger = configure_task_logger() error_task_logger = configure_error_task_logger() api_instance = API() @@ -14,7 +15,81 @@ output_dir = "output" # 设置输出目录 os.makedirs(output_dir, exist_ok=True) +import pandas as pd +import psycopg2 +from datetime import datetime, timedelta, date +import os +def get_renewal_details(): + """ + 从固定的数据库中获取续约待办数据,先拉全量(指定 date_id),再用 CSV 中的门店编码在内存中过滤 + """ + try: + # 1. 从 CSV 文件中读取门店编码 + csv_path = r"D:\Idea Project\SaaS_V1.7\test\output\expanded_yd_data.csv" + if not os.path.exists(csv_path): + error_task_logger.error(f"CSV 文件不存在: {csv_path}") + return pd.DataFrame() + + store_df = pd.read_csv(csv_path, dtype=str) + if "textField_ksydghqw" not in store_df.columns: + error_task_logger.error("CSV 文件中缺少列 'textField_ksydghqw'") + return pd.DataFrame() + + store_codes = set(store_df["textField_ksydghqw"].dropna().unique()) # 转为 set 提升查找效率 + if not store_codes: + error_task_logger.warning("CSV 中未找到有效的门店编码") + return pd.DataFrame() + + # 2. 连接数据库 + conn = psycopg2.connect(**Config.CONN_INFO) + cursor = conn.cursor() + + # 获取前两天的 date_id(整数格式 YYYYMMDD) + now_time = datetime.now() + yes_time = now_time + timedelta(days=-2) + yes_time_nyr = int(yes_time.strftime('%Y%m%d')) + + # 3. 构造 SQL 查询:不再包含 org_code 过滤 + sql = """ + SELECT * + FROM "public"."holo_ads_report_saas_profile_ngv_detail_d" + WHERE "date_id" = %s + """ + + # 执行查询(只传 date_id) + cursor.execute(sql, (yes_time_nyr,)) + rows = cursor.fetchall() + all_fields = cursor.description + + # 转换为 DataFrame + col = [i[0] for i in all_fields] + data_NGV = pd.DataFrame(rows, columns=col) if rows else pd.DataFrame(columns=col) + + # 关闭连接 + cursor.close() + conn.close() + + # 4. 在内存中用 store_codes 过滤 org_code + if "org_code" not in data_NGV.columns: + error_task_logger.error("数据库结果中缺少 'org_code' 字段") + return pd.DataFrame() + + # 确保 org_code 是字符串类型(与 store_codes 一致) + data_NGV = data_NGV.copy() # 避免 SettingWithCopyWarning + data_NGV["org_code"] = data_NGV["org_code"].astype(str) + + # 过滤:只保留 org_code 在 store_codes 中的行 + filtered_data = data_NGV[data_NGV["org_code"].isin(store_codes)].reset_index(drop=True) + + return filtered_data + + except Exception as e: + error_task_logger.error(f"获取续约待办数据时出错: {e}", exc_info=True) + return pd.DataFrame() + class RenewalToDo: + """续约回访待办派发""" + def __init__(self): self.renewal_data_list = None self.cyclic_increasing = None @@ -84,6 +159,8 @@ class RenewalToDo: "流程状态": "_widget_1765352838610", "经营模式": "_widget_1765964381952", "公司等级": "_widget_1766130435561", + "公司id": "_widget_1766631811839", + "订单商品名称": "_widget_1766730385209", "提交人": "creator", "提交时间": "createTime", "更新时间": "updateTime" @@ -101,6 +178,7 @@ class RenewalToDo: "group_grade": "公司等级", "technician": "运营专家", "manage_model": "经营模式", + "id_own_group": "公司id", } self.subform_field_map = { "商品名称": "_widget_1764820541719", @@ -112,7 +190,7 @@ class RenewalToDo: "续约后订单编码": "_widget_1764820541725", # 根据实际需要添加更多字段 } - self.renewal_list_map ={ + self.renewal_list_map = { } @@ -123,7 +201,9 @@ class RenewalToDo: """ # 数据库获取续约回访数据 - self.data_NGV = pd.read_csv(os.path.join(output_dir, "data_NGV.csv"), encoding="gbk") + self.data_NGV = common_module.get_renewal_details() + # self.data_NGV = get_renewal_details() # 历史数据 + self.data_NGV.to_csv("D:\\Idea Project\\SaaS_V1.7\\test\\output\data_NGV1.csv") # 获取加盟商信息 self.franchisee = common_module.get_renewal_franchisee_details() @@ -344,6 +424,10 @@ class RenewalToDo: data_NGV['60天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=30) data_NGV['30天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=0) data_NGV['0天自动流转时间'] = data_NGV['过期日'] + pd.Timedelta(days=90) + + data_NGV['120天是否跟进'] = "主动" + data_NGV['60天是否跟进'] = "主动" + data_NGV['30天是否跟进'] = "主动" # 格式化为字符串(去掉时区) for col in ['过期日', '120天自动流转时间', '60天自动流转时间', '30天自动流转时间', '0天自动流转时间']: data_NGV[col] = data_NGV[col].dt.strftime('%Y-%m-%d %H:%M:%S') @@ -356,24 +440,39 @@ class RenewalToDo: ) # 新增上次购买价格列 - # 1. 清洗并拼接类型+价格 - df_lp = self.last_price[['门店编码', '类型', '价格']].copy() + # 1. 清洗数据 + df_lp = self.last_price[['门店编码', '类型', '订单商品名称', '价格']].copy() + + # 处理“类型”和“订单商品名称”的缺失值 df_lp['类型'] = df_lp['类型'].fillna('').astype(str) + df_lp['订单商品名称'] = df_lp['订单商品名称'].fillna('').astype(str) + + # 处理价格:转数字、四舍五入、填0、转字符串 df_lp['价格'] = ( pd.to_numeric(df_lp['价格'], errors='coerce') .round().fillna(0).astype(int).astype(str) ) - df_lp['类型_价格'] = df_lp['类型'] + df_lp['价格'] - # 2. 按门店聚合,分号连接 - agg_df = df_lp.groupby('门店编码', as_index=False)['类型_价格'].apply(';'.join) + # 2. 拼接“类型:价格” + df_lp['类型_价格'] = df_lp['类型'] + ':' + df_lp['价格'] - # 3. 合并回主表 - data_NGV = data_NGV.merge(agg_df, on='门店编码', how='left').fillna({'类型_价格': ''}) - data_NGV.rename(columns={'类型_价格': '上次购买价格'}, inplace=True) + # 3. 按门店聚合两列 + agg_df = df_lp.groupby('门店编码', as_index=False).agg({ + '类型_价格': lambda x: ';'.join(x), + '订单商品名称': lambda x: ';'.join(x) + }) - # 4. 处理没有匹配记录的门店(填空或默认值) - data_NGV['上次购买价格'] = data_NGV['上次购买价格'].fillna('') + # 4. 合并回主表 + data_NGV = data_NGV.merge(agg_df, on='门店编码', how='left') + + # 5. 填充缺失值为空字符串,并重命名列 + data_NGV['类型_价格'] = data_NGV['类型_价格'].fillna('') + data_NGV['订单商品名称'] = data_NGV['订单商品名称'].fillna('') + + data_NGV.rename(columns={ + '类型_价格': '上次购买价格', + '订单商品名称': '订单商品名称' + }, inplace=True) # 成员字段替换(现在列名是中文) staff_name_cols = [ @@ -482,7 +581,7 @@ class RenewalToDo: payload = { "api_key": "675b900991ad2491c69389ca", - "entry_id": "6931063d64187eaf6b927557", + "entry_id": "6965eec36b73376aa0b5bff8", "data_list": records } print(payload) @@ -502,11 +601,8 @@ class RenewalToDo: data_NGV = self.process_data() # step3:数据派发 self.dispatch_task(data_NGV) - # step4:过期日发生变化更新已有表单 - # step5:自动同意原表单 - - common_module.send_task_status(task_start_time, "续约回访待办") + # common_module.send_task_status(task_start_time, "续约回访待办") except Exception as e: error_task_logger.error(f"续约回访待办发生错误{e}") # common_module.send_task_error(task_start_time, "续约回访待办", str(e)) diff --git a/tools/BI.ipynb b/tools/BI.ipynb index 1573e79..a87e861 100644 --- a/tools/BI.ipynb +++ b/tools/BI.ipynb @@ -551,8 +551,8 @@ { "metadata": { "ExecuteTime": { - "end_time": "2025-08-21T02:06:22.956060Z", - "start_time": "2025-08-21T02:06:22.797879Z" + "end_time": "2026-01-13T03:20:16.053485200Z", + "start_time": "2026-01-13T03:20:15.845522200Z" } }, "cell_type": "code", @@ -577,7 +577,7 @@ "} # 衡时数据库链接配置-mysql\n", "\n", "# 表名\n", - "table_name = \"test\" # 请替换为实际的表名\n", + "table_name = \"jdy_ngv_data_source\" # 请替换为实际的表名\n", "\n", "# 连接数据库\n", "connection = mysql.connector.connect(\n", @@ -605,7 +605,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "成功删除表 test\n" + "成功删除表 jdy_ngv_data_source\n" ] } ],