NGV换源

This commit is contained in:
2026-01-14 15:13:44 +08:00
parent 1ef81def0f
commit 25795f4a2d
9 changed files with 1964 additions and 66 deletions
+79 -34
View File
@@ -81,50 +81,77 @@ class CommonModule:
def get_ngv_details(self, days_back=1):
"""
从固定的数据库中获取前几天的NGV明细。
参数 `days_back` 表示相对于今天的天数偏移量,默认为1前一天)
返回包含NGV明细的pandas DataFrame
重构后适配MySQL的NGV明细获取方法(仅处理saas_create_time字段,全字段保留文本类型)
参数 `days_back`相对于今天的天数偏移量,默认为1(前一天)
返回pandas DataFrame(所有字段为文本类型,仅saas_create_time做日期格式化),失败返回None
"""
conn = None
cursor = None
try:
# 获得连接
conn = psycopg2.connect(**self.conn)
# 1. 建立MySQL连接(仅适配MySQL,参数与原逻辑对齐)
conn = pymysql.connect(
host=Config.BI_CONN_host,
database=Config.BI_CONN_INFO_database,
user=Config.BI_CONN_INFO_user,
password=Config.BI_CONN_INFO_password,
charset='utf8mb4', # MySQL中文兼容
cursorclass=pymysql.cursors.DictCursor # 保持字典游标,字段名映射一致
)
cursor = conn.cursor()
# 获取指定天数前的日期
# 2. 日期计算逻辑(完全复用原始逻辑)
now_time = datetime.now()
target_time = now_time + timedelta(days=-days_back)
target_date_id = int(target_time.strftime('%Y%m%d')) # 获取目标日期
target_time = now_time - timedelta(days=days_back)
target_date_id = int(target_time.strftime('%Y%m%d'))
# sql语句查询
sql = f"""
SELECT * FROM "public"."holo_ads_report_saas_profile_ngv_detail_d" WHERE "date_id" = '{target_date_id}' ;
"""
# 执行语句并获取结果集
cursor.execute(sql)
# 3. MySQL兼容的SQL(仅替换语法,逻辑不变)
sql = """
SELECT *
FROM `jdy_ngv_data_source`
WHERE `date_id` = %s;
"""
cursor.execute(sql, (target_date_id,))
rows = cursor.fetchall()
all_fields = cursor.description
# 执行结果转化为dataframe
col = [i[0] for i in all_fields]
data_NGV = pd.DataFrame(rows, columns=col)
# 4. 数据转换:强制全字段为文本类型(匹配原始数据源特性)
if rows:
# 核心:所有字段转字符串,空值统一为'',避免后续处理异常
data_NGV = pd.DataFrame(rows).astype(str).replace({'nan': '', 'NaT': ''})
else:
data_NGV = pd.DataFrame()
# 尝试自动解析日期时间字符串
# 5. 仅处理saas_create_time字段(完全复用原始转换逻辑)
time_format = "%Y-%m-%d %H:%M:%S"
if 'saas_create_time' in data_NGV.columns:
data_NGV['saas_create_time'] = pd.to_datetime(data_NGV['saas_create_time'], format=time_format,
errors='coerce')
data_NGV['saas_create_time'] = data_NGV['saas_create_time'].dt.strftime('%Y-%m-%d')
# 步骤1:解析为datetime(消除格式警告)
temp_dt = pd.to_datetime(
data_NGV['saas_create_time'],
format=time_format, # 指定格式,消除UserWarning
errors='coerce' # 解析失败设为NaT
)
# 步骤2:转换为YYYY-MM-DD格式的字符串,覆盖原始列(与原逻辑一致)
data_NGV['saas_create_time'] = temp_dt.dt.strftime('%Y-%m-%d').fillna('')
# 关闭游标和连接
cursor.close()
conn.close()
# 6. 其他时间字段完全保留原始文本格式(不做任何处理)
# date_fmt/expiry_time等字段仅保留从数据库读取的原始字符串)
return data_NGV
except Exception as e:
error_task_logger.error(f"获取NGV明细失败: {e}")
error_task_logger.error(f"获取NGV明细失败(MySQL适配): {str(e)}", exc_info=True)
return None
finally:
# 确保MySQL连接/游标关闭(资源释放)
if cursor:
try:
cursor.close()
except Exception as e:
error_task_logger.warning(f"关闭MySQL游标失败: {str(e)}")
if conn:
try:
conn.close()
except Exception as e:
error_task_logger.warning(f"关闭MySQL连接失败: {str(e)}")
def get_yichang_details(self, days_back=1):
"""
@@ -180,7 +207,14 @@ class CommonModule:
"""
try:
# 获得连接
conn = psycopg2.connect(**self.conn)
conn = pymysql.connect(
host=Config.BI_CONN_host,
database=Config.BI_CONN_INFO_database,
user=Config.BI_CONN_INFO_user,
password=Config.BI_CONN_INFO_password,
charset='utf8mb4', # MySQL中文兼容
cursorclass=pymysql.cursors.DictCursor # 保持字典游标,字段名映射一致
)
cursor = conn.cursor()
# 获取指定天数前的日期
@@ -195,15 +229,26 @@ class CommonModule:
print("距离今天还有{}天的日期是:{}".format(days_to_add, future_date))
sql = f"""SELECT * FROM "public"."holo_ads_report_saas_profile_ngv_detail_d" WHERE "date_id" = '{yes_time_nyr}' and "expiry_time" like '%{future_date}%';"""
sql = """
SELECT *
FROM `jdy_ngv_data_source`
WHERE `date_id` = %s \
AND `expiry_time` LIKE %s; \
"""
# 执行语句并获取结果集
cursor.execute(sql)
like_pattern = f"%{future_date}%"
cursor.execute(sql, (yes_time_nyr, like_pattern))
rows = cursor.fetchall()
all_fields = cursor.description # 获取所有字段名
# 执行结果转化为dataframe
col = [i[0] for i in all_fields]
data_NGV = pd.DataFrame(list(rows), columns=col)
if rows:
# data_NGV = pd.DataFrame(rows).astype(str).replace({'nan': '', 'NaT': ''})
all_fields = cursor.description # 获取所有字段名
# 执行结果转化为dataframe
col = [i[0] for i in all_fields]
data_NGV = pd.DataFrame(list(rows), columns=col).astype(str).replace({'nan': '', 'NaT': ''})
else:
data_NGV = pd.DataFrame()
# 关闭数据库连接
cursor.close()
+314
View File
@@ -0,0 +1,314 @@
{
"cells": [
{
"cell_type": "code",
"id": "initial_id",
"metadata": {
"collapsed": true,
"ExecuteTime": {
"end_time": "2026-01-13T07:56:05.597737600Z",
"start_time": "2026-01-13T07:50:57.192717400Z"
}
},
"source": [
"import time\n",
"\n",
"import pandas as pd\n",
"import psycopg2\n",
"import mysql.connector\n",
"from mysql.connector import Error\n",
"from datetime import datetime, timedelta\n",
"import numpy as np\n",
"\n",
"# ========== 配置 ==========\n",
"PG_CONN_INFO = {\n",
" \"database\": \"f6_bi\",\n",
" \"user\": \"LTAI5tMJsijFA9BS1R6uBpUT\",\n",
" \"password\": \"PajEQMIRWNRcipd8mYvlud2KHWJr6N\",\n",
" \"host\": \"hgpostcn-cn-m1e4gikbu00l-cn-shanghai.hologres.aliyuncs.com\",\n",
" \"port\": \"80\"\n",
"}\n",
"\n",
"MYSQL_CONFIG = {\n",
" 'host': \"f6-public.rwlb.rds.aliyuncs.com\",\n",
" 'user': \"rw_operation_data_relay\",\n",
" 'password': \"m+q5Z4%IVuF9bf\",\n",
" 'database': \"f6operation_data_relay\"\n",
"}\n",
"\n",
"SOURCE_TABLE = '\"public\".\"holo_ads_report_saas_profile_ngv_detail_d\"'\n",
"PARTITION_COLUMN = \"date_id\"\n",
"TARGET_TABLE_MYSQL = \"jdy_ngv_data_source\"\n",
"BATCH_SIZE = 2000\n",
"\n",
"# ========== 辅助函数 ==========\n",
"def is_datetime_type(pg_type: str) -> bool:\n",
" if not pg_type:\n",
" return False\n",
" pg_type = pg_type.lower()\n",
" return any(kw in pg_type for kw in ['timestamp', 'datetime', 'date'])\n",
"\n",
"def clean_column_name(name, index):\n",
" \"\"\"将列名转为合法字符串,处理 None / nan / 空值\"\"\"\n",
" if name is None:\n",
" return f\"unknown_col_{index}\"\n",
" if isinstance(name, float) and pd.isna(name):\n",
" return f\"unknown_col_{index}\"\n",
" name_str = str(name).strip()\n",
" if not name_str or name_str.lower() in ('nan', 'none', 'null', ''):\n",
" return f\"unknown_col_{index}\"\n",
" return name_str\n",
"\n",
"def get_source_schema():\n",
" conn = psycopg2.connect(**PG_CONN_INFO)\n",
" cur = conn.cursor()\n",
" cur.execute(\"\"\"\n",
" SELECT column_name, data_type\n",
" FROM information_schema.columns\n",
" WHERE table_schema = 'public'\n",
" AND table_name = 'holo_ads_report_saas_profile_ngv_detail_d'\n",
" ORDER BY ordinal_position;\n",
" \"\"\")\n",
" raw_schema = cur.fetchall()\n",
" cur.close()\n",
" conn.close()\n",
"\n",
" # 清洗列名\n",
" cleaned_schema = []\n",
" for i, (col_name, data_type) in enumerate(raw_schema):\n",
" clean_name = clean_column_name(col_name, i)\n",
" cleaned_schema.append((clean_name, data_type or 'text'))\n",
" return cleaned_schema\n",
"\n",
"def create_ngv_table(cursor, schema):\n",
" col_defs = []\n",
" for col_name, pg_type in schema:\n",
" if is_datetime_type(pg_type):\n",
" col_defs.append(f\"`{col_name}` DATETIME\")\n",
" else:\n",
" col_defs.append(f\"`{col_name}` TEXT\") # ✅ 关键:用 TEXT 避免行大小超限\n",
" create_sql = f\"\"\"\n",
" CREATE TABLE IF NOT EXISTS `{TARGET_TABLE_MYSQL}` (\n",
" {\",\\n \".join(col_defs)}\n",
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC;\n",
" \"\"\"\n",
" cursor.execute(create_sql)\n",
" print(\"✅ MySQL 表 NGV 已创建(时间字段为 DATETIME,其余为 TEXT\")\n",
"\n",
"def normalize_datetime_cols(df, datetime_cols):\n",
" df = df.copy()\n",
" for col in datetime_cols:\n",
" if col in df.columns:\n",
" df[col] = pd.to_datetime(df[col], errors='coerce')\n",
" df[col] = df[col].dt.strftime('%Y-%m-%d %H:%M:%S').where(df[col].notnull(), None)\n",
" return df.where(pd.notnull(df), None)\n",
"\n",
"# ========== 主流程 ==========\n",
"def main():\n",
" # 1. 生成最近10天的 date_id(字符串格式)\n",
" date_ids = [\n",
" (datetime.now().date() - timedelta(days=i)).strftime(\"%Y%m%d\")\n",
" for i in range(3)\n",
" ]\n",
" print(f\"将同步以下 date_id 分区: {date_ids}\")\n",
"\n",
" # 2. 获取并清洗源表结构\n",
" schema = get_source_schema()\n",
" column_names = [col for col, _ in schema]\n",
" datetime_cols = [col for col, typ in schema if is_datetime_type(typ)]\n",
"\n",
" print(f\"检测到 {len(column_names)} 个字段,其中时间字段: {datetime_cols[:3]}...\")\n",
"\n",
" # 3. 连接 MySQL\n",
" mysql_conn = mysql.connector.connect(**MYSQL_CONFIG)\n",
" mysql_cursor = mysql_conn.cursor()\n",
"\n",
" try:\n",
" # 4. 创建目标表\n",
" create_ngv_table(mysql_cursor, schema)\n",
"\n",
" # 5. 构造插入 SQL\n",
" placeholders = \", \".join([\"%s\"] * len(column_names))\n",
" cols_str = \", \".join([f\"`{c}`\" for c in column_names])\n",
" insert_sql = f\"INSERT INTO `{TARGET_TABLE_MYSQL}` ({cols_str}) VALUES ({placeholders})\"\n",
"\n",
" # 6. 清空目标表\n",
" mysql_cursor.execute(f\"TRUNCATE TABLE `{TARGET_TABLE_MYSQL}`;\")\n",
" print(\"🗑️ 已清空 NGV 表\")\n",
"\n",
" # 7. 按 date_id 分批处理\n",
" # -- 新增:固定列名用于 SELECT --\n",
" fixed_columns = [col for col, _ in schema]\n",
" quoted_fixed_columns = \", \".join([f'\"{c}\"' for c in fixed_columns])\n",
"\n",
" # 动态选择排序字段(必须在 fixed_columns 中)\n",
" exclude_cols = {PARTITION_COLUMN} | set(datetime_cols)\n",
" candidates = [col for col in fixed_columns if col not in exclude_cols]\n",
" order_col = f'\"{candidates[0]}\"' if candidates else f'\"{PARTITION_COLUMN}\"'\n",
"\n",
" if \"org_code\" not in column_names:\n",
" raise ValueError(\"❌ 源表中未找到唯一字段 'org_code'\")\n",
"\n",
" for date_id in date_ids:\n",
" print(f\"\\n>>> 处理 date_id = {date_id}\")\n",
" last_org_code = None # 游标:上一批最大的 org_code\n",
" i = 1\n",
"\n",
" while True:\n",
" time.sleep(3)\n",
" pg_conn = psycopg2.connect(**PG_CONN_INFO)\n",
" pg_cur = pg_conn.cursor()\n",
"\n",
" # 构造 WHERE 条件\n",
" if last_org_code is None:\n",
" where_clause = f'\"{PARTITION_COLUMN}\" = %s'\n",
" params = (date_id,)\n",
" else:\n",
" where_clause = f'\"{PARTITION_COLUMN}\" = %s AND \"org_code\" > %s'\n",
" params = (date_id, last_org_code)\n",
"\n",
" sql = f\"\"\"\n",
" SELECT {quoted_fixed_columns}\n",
" FROM {SOURCE_TABLE}\n",
" WHERE {where_clause}\n",
" ORDER BY \"org_code\"\n",
" LIMIT {BATCH_SIZE};\n",
" \"\"\"\n",
" pg_cur.execute(sql, params)\n",
" rows = pg_cur.fetchall()\n",
" pg_cur.close()\n",
" pg_conn.close()\n",
"\n",
" if not rows:\n",
" break\n",
"\n",
" df_batch = pd.DataFrame(rows, columns=fixed_columns)\n",
" df_batch = normalize_datetime_cols(df_batch, datetime_cols)\n",
" df_batch.to_csv(f\"输出查看{i}.csv\", index=False)\n",
" i += 1\n",
"\n",
" # 更新游标:取本批最后一条的 org_code\n",
" last_org_code = df_batch.iloc[-1][\"org_code\"]\n",
"\n",
" # 清洗并插入 MySQL\n",
" def sanitize_row(row):\n",
" return tuple(\n",
" None if (isinstance(x, float) and pd.isna(x)) or pd.isna(x) else x\n",
" for x in row\n",
" )\n",
"\n",
" data_tuples = [sanitize_row(row) for row in df_batch.values]\n",
" mysql_cursor.executemany(insert_sql, data_tuples)\n",
" mysql_conn.commit()\n",
"\n",
" inserted = len(data_tuples)\n",
" print(f\" date_id={date_id} 已插入 {inserted} 行 (last_org_code={last_org_code})\")\n",
"\n",
" print(f\"✅ date_id={date_id} 同步完成\")\n",
"\n",
" print(f\"\\n🎉 同步完成!数据已写入 MySQL 表 `{TARGET_TABLE_MYSQL}`\")\n",
"\n",
" except Exception as e:\n",
" print(f\"❌ 错误: {e}\")\n",
" mysql_conn.rollback()\n",
" finally:\n",
" mysql_cursor.close()\n",
" mysql_conn.close()\n",
"\n",
"if __name__ == \"__main__\":\n",
" main()"
],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"将同步以下 date_id 分区: ['20260113', '20260112', '20260111']\n",
"检测到 141 个字段,其中时间字段: []...\n",
"✅ MySQL 表 NGV 已创建(时间字段为 DATETIME,其余为 TEXT\n",
"🗑️ 已清空 NGV 表\n",
"\n",
">>> 处理 date_id = 20260113\n",
"✅ date_id=20260113 同步完成\n",
"\n",
">>> 处理 date_id = 20260112\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS201812070004175)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS201903250025112)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS201907240033962)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS201910150040257)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS201912160046642)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202003230057861)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202006080088028)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202010040108419)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202104090119587)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202106210130145)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202108300139429)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202112050146822)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202204250175005)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202209300189703)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202303230219406)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202308210240694)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202402260259380)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202406260273963)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202411040284912)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202503300294788)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202507150304118)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202511120312981)\n",
" date_id=20260112 已插入 1278 行 (last_org_code=TQB201509180060)\n",
"✅ date_id=20260112 同步完成\n",
"\n",
">>> 处理 date_id = 20260111\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS201812070004175)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS201903250025112)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS201907240033962)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS201910150040257)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS201912160046642)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202003230057861)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202006080088042)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202010050108424)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202104090119621)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202106210130146)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202108300139429)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202112050146822)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202204250175005)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202209300189703)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202303230219406)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202308210240694)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202402260259380)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202406260273963)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202411040284912)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202503300294779)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202507150304098)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202511120312975)\n",
" date_id=20260111 已插入 1259 行 (last_org_code=TQB201509180060)\n",
"✅ date_id=20260111 同步完成\n",
"\n",
"🎉 同步完成!数据已写入 MySQL 表 `jdy_ngv_data_source`\n"
]
}
],
"execution_count": 7
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
@@ -0,0 +1,524 @@
import os
import pandas as pd
import json
import ast
import re
from datetime import datetime
from chardet import detect
import unicodedata
# ==============================
# 核心修复:处理数组类型数据的工具函数
# ==============================
def is_empty_value(value):
"""判断值是否为空(支持数组、字符串、数值等所有类型)"""
if pd.isna(value):
return True
if value is None:
return True
if isinstance(value, str) and value.strip() == '':
return True
if isinstance(value, (list, tuple, set)) and len(value) == 0:
return True
return False
def clean_special_characters(value):
"""清理特殊字符(支持数组类型,避免歧义错误)"""
# 处理空值
if is_empty_value(value):
return value
# 处理数组类型(如 ['a', 'b']
if isinstance(value, (list, tuple)):
cleaned_list = []
for item in value:
if isinstance(item, str):
cleaned_list.append(_clean_single_string(item))
else:
cleaned_list.append(item)
return cleaned_list
# 处理字符串类型
elif isinstance(value, str):
return _clean_single_string(value)
# 其他类型(数值、布尔等)直接返回
else:
return value
def _clean_single_string(text):
"""清理单个字符串的特殊字符(内部调用)"""
try:
# 移除控制字符(保留换行、制表符)
cleaned = ''.join(
char for char in text
if unicodedata.category(char)[0] != 'C' or char in '\n\t'
)
# 移除特定无法编码的字符(如右至左标记)
special_chars = ['\u202d', '\u202c', '\u202a', '\u202b', '\u200b']
for char in special_chars:
cleaned = cleaned.replace(char, '')
# 替换全角空格为半角空格
cleaned = cleaned.replace('\u3000', ' ')
return cleaned
except:
return text
def get_safe_encoding(file_path=None):
"""获取安全的编码格式(优先UTF-8-SIG)"""
if file_path and os.path.exists(file_path):
with open(file_path, 'rb') as f:
raw_data = f.read(10000)
result = detect(raw_data)
detected = result['encoding']
if detected in ['utf-8', 'utf-8-sig', 'gbk', 'gb2312']:
return 'utf-8-sig'
return 'utf-8-sig'
# ==============================
# 第一步:生成 expanded_yd_data.csv(彻底修复数组问题)
# ==============================
def generate_expanded_csv(output_dir):
"""生成展开后的源文件(支持数组类型数据)"""
os.makedirs(output_dir, exist_ok=True)
converted_csv_path = os.path.join(output_dir, "converted_yd_data.csv")
if not os.path.exists(converted_csv_path):
print(f"❌ 错误:converted_yd_data.csv 不存在,路径:{converted_csv_path}")
return None
# 读取文件(兼容不同编码)
encoding = get_safe_encoding(converted_csv_path)
try:
df = pd.read_csv(converted_csv_path, encoding=encoding)
print(f"✅ 成功读取 converted_yd_data.csv(编码:{encoding}),数据规模:{df.shape[0]}× {df.shape[1]}")
except Exception as e:
print(f"❌ 读取 converted_yd_data.csv 失败:{str(e)}")
return None
if 'data' not in df.columns:
print(f"❌ 错误:converted_yd_data.csv 中缺少 'data'")
return None
# 清理非data列的特殊字符(仅处理字符串列)
non_data_cols = [col for col in df.columns if col != 'data']
for col in non_data_cols:
if df[col].dtype == 'object': # 仅处理字符串类型列
df[col] = df[col].apply(clean_special_characters)
# 检查并解析 data 列(核心:处理数组类型)
sample = df['data'].dropna().iloc[0] if not df['data'].dropna().empty else ""
print("Sample of 'data' column (after cleaning):")
print(repr(str(sample))[:200]) # 转为字符串避免数组打印过长
if isinstance(sample, str):
print("Detected string format, parsing with ast.literal_eval...")
def safe_literal_eval(x):
if is_empty_value(x):
return {}
try:
# 先清理字符串,再解析
cleaned_x = clean_special_characters(x) if isinstance(x, str) else str(x)
parsed = ast.literal_eval(cleaned_x)
# 解析后再次清理(处理数组中的特殊字符)
return clean_special_characters(parsed)
except (ValueError, SyntaxError, TypeError) as e:
print(f"Parse error on: {repr(str(x))[:100]}... Error: {str(e)[:50]}")
return {}
df['data'] = df['data'].apply(safe_literal_eval)
# 展开 data 列(处理数组类型,转为字符串存储)
def flatten_expanded_data(expanded_df):
"""展平数据,将数组转为字符串"""
for col in expanded_df.columns:
if expanded_df[col].dtype == 'object':
expanded_df[col] = expanded_df[col].apply(
lambda x: ','.join(map(str, x)) if isinstance(x, (list, tuple)) else x
)
return expanded_df
expanded = pd.json_normalize(df['data'])
expanded = flatten_expanded_data(expanded) # 数组转字符串(如 ['a','b'] → "a,b"
# 清理展开后的数据
for col in expanded.columns:
expanded[col] = expanded[col].apply(clean_special_characters)
# 合并数据
other_cols = df.drop(columns=['data'])
new_df = pd.concat([other_cols.reset_index(drop=True), expanded.reset_index(drop=True)], axis=1)
# 数据过滤(修复数组转字符串后的判断逻辑)
if 'instanceStatus' in new_df.columns:
# 确保过滤条件处理字符串
new_df['instanceStatus'] = new_df['instanceStatus'].astype(str)
new_df = new_df[new_df["instanceStatus"].str.strip() == "RUNNING"]
print(f"✅ 过滤后(instanceStatus=RUNNING):{new_df.shape[0]}")
else:
print(f"⚠️ 警告:缺少 'instanceStatus' 列,跳过状态过滤")
col = "textField_kto3q3ev"
if col in new_df.columns:
# 处理数组转字符串后的空值判断
new_df[col] = new_df[col].apply(
lambda x: ','.join(map(str, x)) if isinstance(x, (list, tuple)) else x
).astype(str)
mask2 = (new_df[col].str.strip() == "") | (new_df[col].str.strip() == "nan")
new_df = new_df[mask2]
print(f"✅ 过滤后({col}为空):{new_df.shape[0]}")
else:
print(f"⚠️ 警告:缺少 '{col}' 列,跳过订单编码过滤")
# 保存文件(UTF-8-SIG编码)
expanded_csv_path = os.path.join(output_dir, "expanded_yd_data.csv")
try:
new_df.to_csv(
expanded_csv_path,
index=False,
encoding='utf-8-sig',
na_rep='',
errors='replace'
)
print(f"✅ Expanded data saved to: {expanded_csv_path}(编码:utf-8-sig")
return expanded_csv_path
except Exception as e:
print(f"❌ 保存 expanded_yd_data.csv 失败:{str(e)}")
return None
# ==============================
# 第二步:格式转换核心函数(兼容数组处理结果)
# ==============================
def convert_to_data_ngv_format(
source_csv_path,
target_csv_path,
main_output_path,
additional_output_path,
doc_output_path
):
print("\n=== 开始格式转换(目标格式:data_NGV.csv===")
try:
# 读取目标文件
if not os.path.exists(target_csv_path):
raise FileNotFoundError(f"目标文件不存在:{target_csv_path}")
# 兼容目标文件的编码
try:
df_target = pd.read_csv(target_csv_path, encoding='utf-8-sig')
target_encoding = 'utf-8-sig'
except:
df_target = pd.read_csv(target_csv_path, encoding='gbk')
target_encoding = 'gbk'
print(f"✅ 成功读取目标文件:{target_csv_path}(编码:{target_encoding}")
print(f" 数据规模:{df_target.shape[0]}× {df_target.shape[1]}")
# 读取源文件(已处理数组问题)
df_source = pd.read_csv(source_csv_path, encoding='utf-8-sig')
print(f"✅ 成功读取源文件:{source_csv_path}(编码:utf-8-sig")
print(f" 数据规模:{df_source.shape[0]}× {df_source.shape[1]}")
except Exception as e:
print(f"❌ 文件读取失败:{str(e)}")
return False
# 字段映射(适配 data_NGV 格式)
print("\n=== 建立字段映射关系 ===")
field_mapping = {
'createTimeGMT': 'saas_create_time',
'modifiedTimeGMT': 'etl_time',
'title': 'org_remark',
'instanceStatus': 'active_status_fmt',
'processInstanceId': 'id_own_org',
'actionExecutor': 'technician',
'originator': 'salesmen',
'textField_kuj8nx00': 'province_name',
'textField_kuj8nx01': 'city_name'
}
# 筛选有效映射
valid_mapping = {}
for source_field, target_field in field_mapping.items():
if target_field in df_target.columns and source_field in df_source.columns:
valid_mapping[source_field] = target_field
print(f" {source_field}{target_field}")
if len(valid_mapping) == 0:
print(f"⚠️ 警告:未找到有效字段映射,将仅填充默认值")
# 字段分类
target_columns = set(df_target.columns)
source_columns = set(df_source.columns)
additional_columns = list(source_columns - target_columns - set(valid_mapping.keys()))
print(f"\n=== 字段统计 ===")
print(f" 目标文件总字段数:{len(target_columns)}")
print(f" 源文件总字段数:{len(source_columns)}")
print(f" 有效映射字段数:{len(valid_mapping)}")
print(f" 额外字段数(放入单独文件):{len(additional_columns)}")
# 创建结果数据结构
df_main = df_target.iloc[0:0].copy()
df_main.insert(0, 'main_file_id', '') # 关联ID列
df_additional = pd.DataFrame(columns=['main_file_id'] + additional_columns)
# 数据处理工具函数(兼容字符串化的数组)
def extract_org_info(title_text):
if is_empty_value(title_text):
return '', ''
title_str = str(title_text).strip()
org_name_match = re.search(r'门店名称:([^,\n]+)', title_str)
org_code_match = re.search(r'门店编码:([^,\n]+)', title_str)
org_name = org_name_match.group(1).strip() if org_name_match else ''
org_code = org_code_match.group(1).strip() if org_code_match else ''
return org_name, org_code
def extract_technician(executor_text):
if is_empty_value(executor_text):
return ''
executor_str = str(executor_text).strip()
# 匹配字符串化的数组中的中文姓名(如 "{'nameInChinese':'何钊'}"
name_match = re.search(r"'nameInChinese':\s*'([^']+)'", executor_str)
return name_match.group(1).strip() if name_match else ''
def convert_gmt_time(gmt_str):
if is_empty_value(gmt_str):
return None
try:
time_str = str(gmt_str).replace('T', ' ').replace('Z', '').strip()
return pd.to_datetime(time_str)
except:
return None
# 逐行处理数据
print(f"\n=== 开始数据处理(共{len(df_source)}行) ===")
for idx, source_row in df_source.iterrows():
main_file_id = f"REC-{idx:06d}"
# 处理主文件数据
main_row = pd.Series(index=df_main.columns, dtype='object')
main_row['main_file_id'] = main_file_id
# 填充映射字段(处理字符串化的数组)
for source_field, target_field in valid_mapping.items():
value = source_row[source_field]
if not is_empty_value(value):
# 将字符串化的数组转为普通字符串(如 "a,b" → "a,b"
if isinstance(value, (list, tuple)):
main_row[target_field] = ','.join(map(str, value))
else:
main_row[target_field] = str(value).strip()
else:
main_row[target_field] = ''
# 处理关键业务字段
create_time = convert_gmt_time(source_row.get('createTimeGMT'))
if create_time:
main_row['date_fmt'] = create_time.strftime('%Y/%m/%d')
main_row['date_id'] = int(create_time.strftime('%Y%m%d'))
main_row['pt'] = main_row['date_id']
# 提取门店信息
org_name, org_code = extract_org_info(source_row.get('title'))
if org_name:
main_row['org_name'] = org_name
main_row['group_name'] = org_name
if org_code:
main_row['org_code'] = org_code
# 提取处理人
technician = extract_technician(source_row.get('actionExecutor'))
if technician:
main_row['technician'] = technician
# 填充默认值
default_values = {
'org_type': '一般',
'org_status': '留存',
'group_grade': '普通客户(VIP',
'is_active': 1,
'active_status_fmt': '活跃',
'province_name': main_row.get('province_name', '未知'),
'city_name': main_row.get('city_name', '未知'),
'area_name': '未知',
'is_wechat': 0,
'is_mini_app': 0,
'id_own_group': 0,
'org_code': main_row.get('org_code', '')
}
for col, default_val in default_values.items():
if col in main_row.index and is_empty_value(main_row[col]):
main_row[col] = default_val
df_main.loc[idx] = main_row
# 处理额外字段文件
additional_row = pd.Series(index=df_additional.columns, dtype='object')
additional_row['main_file_id'] = main_file_id
for col in additional_columns:
if col in source_row.index:
value = source_row[col]
if not is_empty_value(value):
# 统一转为字符串存储(兼容数组)
if isinstance(value, (list, tuple)):
additional_row[col] = ','.join(map(str, value))
else:
additional_row[col] = str(value).strip()
else:
additional_row[col] = ''
df_additional.loc[idx] = additional_row
# 进度提示(大数据量优化)
if (idx + 1) % 500 == 0 or (idx + 1) == len(df_source):
print(f" 已处理 {idx + 1}/{len(df_source)}")
# 数据类型优化
print(f"\n=== 优化数据类型 ===")
numeric_cols = ['date_id', 'pt', 'is_active', 'is_wechat', 'is_mini_app',
'id_own_group', 'active_user_count', 'limit_user_count']
for col in numeric_cols:
if col in df_main.columns:
# 处理字符串格式的数值
df_main[col] = pd.to_numeric(
df_main[col].astype(str).str.replace(',', ''), # 移除数组转字符串的逗号
errors='coerce'
).fillna(0).astype(int)
# 填充空值
df_main = df_main.fillna('')
df_additional = df_additional.fillna('')
# 保存文件
print(f"\n=== 保存结果文件 ===")
try:
# 保存主文件(匹配 data_NGV 格式)
df_main.to_csv(
main_output_path,
index=False,
encoding='utf-8-sig',
na_rep='',
errors='replace'
)
print(f"✅ 主文件保存成功: {main_output_path}(编码:utf-8-sig")
# 保存额外字段文件
df_additional.to_csv(
additional_output_path,
index=False,
encoding='utf-8-sig',
na_rep='',
errors='replace'
)
print(f"✅ 额外字段文件保存成功: {additional_output_path}(编码:utf-8-sig")
# 生成说明文档
generate_relation_doc(doc_output_path, main_output_path, additional_output_path,
len(target_columns), len(source_columns), len(valid_mapping))
print(f"✅ 关联说明文档保存成功: {doc_output_path}")
return True
except Exception as e:
print(f"❌ 文件保存失败: {str(e)}")
return False
# ==============================
# 辅助函数:生成关联说明文档
# ==============================
def generate_relation_doc(doc_path, main_file, additional_file, target_col_count, source_col_count, mapping_count):
doc_content = f"""# 数据格式转换结果说明
## 目标格式文件:data_NGV.csv
### 一、文件概述
1. **主文件(匹配目标格式)**
- 文件名:{os.path.basename(main_file)}
- 格式来源:完全匹配 data_NGV.csv 结构
- 数据规模:{pd.read_csv(main_file, encoding='utf-8-sig').shape[0]}× {target_col_count + 1}
- 编码格式:UTF-8-SIG(兼容所有字符和数组数据)
2. **额外字段文件**
- 文件名:{os.path.basename(additional_file)}
- 包含内容:源文件中 data_NGV.csv 没有的字段
- 数据规模:{pd.read_csv(additional_file, encoding='utf-8-sig').shape[0]}× {pd.read_csv(additional_file, encoding='utf-8-sig').shape[1]}
- 编码格式:UTF-8-SIG
### 二、关键处理说明
1. **数组数据处理**:源文件中的数组(如 ['a','b'])已转为字符串("a,b")存储,避免格式错误
2. **特殊字符清理**:自动移除无法编码的控制字符(如右至左标记 \u202d
3. **编码统一**:所有文件使用 UTF-8-SIG 编码,兼容中文和特殊字符
### 三、关联方法
1. **关联字段**`main_file_id`(格式:REC-000000
2. **Excel导入**:数据→自文本/CSV→选择文件→编码选择"UTF-8"→完成
### 四、使用建议
1. 主文件可直接用于业务系统,与 data_NGV.csv 格式完全兼容
2. 额外字段文件用于原始数据追溯,通过 main_file_id 关联
3. 数组转字符串后的数据可通过 Excel 的"文本分列"功能恢复为数组
"""
with open(doc_path, 'w', encoding='utf-8') as f:
f.write(doc_content)
# ==============================
# 主执行函数
# ==============================
def main():
# 配置文件路径(请根据您的实际位置修改!)
base_dir = "D:\\Idea Project\\SaaS_V1.7\\test\\output"
target_csv_path = "D:\\Idea Project\\SaaS_V1.7\\test\\output\\data_NGV.csv"
# 生成 expanded_yd_data.csv(解决数组问题)
expanded_csv_path = generate_expanded_csv(base_dir)
if not expanded_csv_path:
print("❌ 生成 expanded_yd_data.csv 失败,终止转换")
return
# 配置输出路径
main_output_path = os.path.join(base_dir, "主文件_匹配data_NGV格式.csv")
additional_output_path = os.path.join(base_dir, "额外字段文件_源文件特有列.csv")
doc_output_path = os.path.join(base_dir, "格式转换说明.md")
# 执行转换
success = convert_to_data_ngv_format(
source_csv_path=expanded_csv_path,
target_csv_path=target_csv_path,
main_output_path=main_output_path,
additional_output_path=additional_output_path,
doc_output_path=doc_output_path
)
if success:
print(f"\n🎉 格式转换全部完成!")
print(f"📁 输出目录:{base_dir}")
print(f"🔔 重要:Excel导入时需选择'UTF-8'编码,数组数据已转为逗号分隔字符串")
else:
print(f"\n❌ 格式转换失败,请查看日志信息排查问题")
# ==============================
# 执行入口
# ==============================
if __name__ == "__main__":
# API模块导入(不影响核心功能)
try:
from yd_api import YDAPI
from api import API
print("✅ 成功导入 API 模块")
except ImportError:
print("⚠️ 警告:未找到 yd_api 或 api 模块,跳过API初始化(不影响格式转换)")
# 执行主流程
main()
@@ -0,0 +1,850 @@
{
"cells": [
{
"metadata": {},
"cell_type": "markdown",
"source": "# 数据获取",
"id": "be79c113026b04f"
},
{
"metadata": {
"ExecuteTime": {
"end_time": "2026-01-14T02:38:34.642781900Z",
"start_time": "2026-01-14T02:31:28.896098900Z"
}
},
"cell_type": "code",
"source": [
"import os\n",
"from datetime import datetime, timezone, timedelta\n",
"import pandas as pd\n",
"from holidays.countries import saint_martin as record\n",
"from tqdm import tqdm\n",
"import json\n",
"from yd_api import YDAPI\n",
"from api import API\n",
"import time\n",
"\n",
"output_dir = \"output\"\n",
"os.makedirs(output_dir, exist_ok=True)\n",
"\n",
"api_instance = API()\n",
"yd_api_instance = YDAPI()\n",
"\n",
"\n",
"def generate_monthly_ranges(start: str, end: str):\n",
" \"\"\"\n",
" 生成按自然月划分的时间段列表(左闭右开)\n",
" 例如: [('2025-11-01T00:00:00Z', '2025-12-01T00:00:00Z'), ...]\n",
" \"\"\"\n",
" start_dt = datetime.fromisoformat(start.replace(\"Z\", \"+00:00\"))\n",
" end_dt = datetime.fromisoformat(end.replace(\"Z\", \"+00:00\"))\n",
"\n",
" ranges = []\n",
" current = start_dt\n",
"\n",
" while current < end_dt:\n",
" # 下一个月的第一天\n",
" if current.month == 12:\n",
" next_month = current.replace(year=current.year + 1, month=1, day=1)\n",
" else:\n",
" next_month = current.replace(month=current.month + 1, day=1)\n",
" # 不超过 end_dt\n",
" segment_end = min(next_month, end_dt)\n",
" ranges.append((\n",
" current.strftime(\"%Y-%m-%dT00:00:00Z\"),\n",
" segment_end.strftime(\"%Y-%m-%dT00:00:00Z\")\n",
" ))\n",
" current = next_month\n",
"\n",
" return ranges\n",
"\n",
"\n",
"class GetYDData:\n",
"\n",
" def __init__(self):\n",
" self.FORMID = \"FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22\"\n",
" self.appType = \"APP_UYZ0KG6L0CCNV80GZ66O\"\n",
" self.systemToken = \"XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2\"\n",
"\n",
" # 第一段:2025-01-01 到 2025-11-01\n",
" first_segment = (\"2025-01-01T00:00:00Z\", \"2025-02-01T00:00:00Z\")\n",
"\n",
" # 第二段:2025-11-01 到当前时间(按月拆分)\n",
" now_utc_str = datetime.now(timezone.utc).strftime(\"%Y-%m-%dT%H:%M:%SZ\")\n",
" monthly_segments = generate_monthly_ranges(\"2025-02-01T00:00:00Z\", now_utc_str)\n",
"\n",
" # 合并所有时间段\n",
" self.time_ranges = [first_segment] + monthly_segments\n",
"\n",
" print(\"📅 计划拉取以下时间段:\")\n",
" for i, (s, e) in enumerate(self.time_ranges, 1):\n",
" print(f\" {i}. {s} → {e}\")\n",
"\n",
" def build_value_to_label_map(self, form_structure):\n",
" value_to_label_map = {}\n",
" fields = form_structure.get(\"result\", [])\n",
" for field in fields:\n",
" field_id = field.get(\"fieldId\")\n",
" component = field.get(\"componentName\")\n",
" props = field.get(\"props\", {})\n",
" data_source = props.get(\"dataSource\", [])\n",
"\n",
" if component in [\"SelectField\", \"RadioField\"] and data_source:\n",
" option_map = {}\n",
" for opt in data_source:\n",
" val = opt.get(\"value\")\n",
" if val is None:\n",
" continue\n",
" text_obj = opt.get(\"text\", {})\n",
" if isinstance(text_obj, dict):\n",
" zh_text = text_obj.get(\"zh_CN\")\n",
" if zh_text is None and \"value\" in text_obj:\n",
" raw = text_obj[\"value\"]\n",
" if isinstance(raw, str) and raw.startswith('\"') and raw.endswith('\"'):\n",
" zh_text = raw[1:-1]\n",
" else:\n",
" zh_text = str(val)\n",
" elif zh_text is None:\n",
" zh_text = str(val)\n",
" else:\n",
" zh_text = str(text_obj)\n",
" option_map[str(val)] = zh_text\n",
" if option_map:\n",
" value_to_label_map[field_id] = option_map\n",
" return value_to_label_map\n",
"\n",
" def convert_record_values(self, record, value_map):\n",
" converted = {}\n",
" for key, val in record.items():\n",
" if key in value_map and val is not None:\n",
" str_val = str(val)\n",
" converted[key] = value_map[key].get(str_val, val)\n",
" else:\n",
" converted[key] = val\n",
" return converted\n",
"\n",
" def fetch_records_in_range(self, token, start_time, end_time):\n",
" \"\"\"拉取指定时间范围内的所有记录\"\"\"\n",
" try:\n",
" first_page = yd_api_instance.read_processes_instances(\n",
" token=token,\n",
" formUuid=self.FORMID,\n",
" page=1,\n",
" n=100,\n",
" appType=self.appType,\n",
" systemToken=self.systemToken,\n",
" instanceStatus=\"\",\n",
" modifiedFromTimeGMT=start_time,\n",
" modifiedToTimeGMT=end_time,\n",
" )\n",
" except Exception as e:\n",
" print(f\"❌ 首页请求失败 ({start_time} {end_time}): {e}\")\n",
" return []\n",
"\n",
" total_count = first_page.get(\"totalCount\", 0)\n",
" total_pages = (total_count // 100) + (1 if total_count % 100 else 0)\n",
" print(f\"📊 [{start_time[:10]} {end_time[:10]}] 总记录数: {total_count}, 共 {total_pages} 页\")\n",
"\n",
" all_records = []\n",
" if total_count > 0:\n",
" all_records.extend(first_page.get(\"data\", []))\n",
" for page in tqdm(range(2, total_pages + 1), desc=f\"{start_time[:7]}\"):\n",
" try:\n",
" resp = yd_api_instance.read_processes_instances(\n",
" token=token,\n",
" formUuid=self.FORMID,\n",
" page=page,\n",
" n=100,\n",
" appType=self.appType,\n",
" systemToken=self.systemToken,\n",
" instanceStatus=\"\",\n",
" modifiedFromTimeGMT=start_time,\n",
" modifiedToTimeGMT=end_time,\n",
" )\n",
" page_data = resp.get(\"data\", [])\n",
" all_records.extend(page_data)\n",
" time.sleep(0.15) # 稍微增加间隔,更安全\n",
" except Exception as e:\n",
" print(f\"⚠️ 第 {page} 页失败 ({start_time[:10]}): {e}\")\n",
" continue\n",
" return all_records\n",
"\n",
" def main(self):\n",
" # Step 1: 获取表单结构\n",
" token = yd_api_instance.generateToken()\n",
" form_struct = yd_api_instance.get_form_structures(\n",
" token=token,\n",
" formUuid=self.FORMID\n",
" )\n",
" value_map = self.build_value_to_label_map(form_struct)\n",
" print(\"\\n✅ 表单选项映射构建完成\")\n",
"\n",
" # Step 2: 按时间段拉取\n",
" all_records = []\n",
" all_records_detils = []\n",
" for start_time, end_time in self.time_ranges:\n",
" print(f\"\\n⏳ 拉取: {start_time} → {end_time}\")\n",
" records = self.fetch_records_in_range(token, start_time, end_time)\n",
" all_records.extend(records)\n",
" try:\n",
" record_data = record.get(\"data\", [])\n",
" all_records_detils.extend(record_data)\n",
" except Exception as e:\n",
" continue\n",
"\n",
" print(f\"\\n📥 总共获取 {len(all_records)} 条流程实例\")\n",
"\n",
" # # Step 3: 转换 formData\n",
" converted_records = []\n",
" for inst in all_records:\n",
" form_data = inst.get(\"formData\", {})\n",
" converted = self.convert_record_values(form_data, value_map)\n",
" converted_records.append(converted)\n",
"\n",
" # Step 4: 保存\n",
" if all_records:\n",
" df = pd.DataFrame(all_records)\n",
" output_path = \"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\converted_yd_data.csv\"\n",
" df.to_csv(output_path, index=False)\n",
"\n",
" df1 = pd.DataFrame(all_records_detils)\n",
" output_path1 = \"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\converted_yd_data_detail.csv\"\n",
" df1.to_csv(output_path1, index=False)\n",
" print(f\"\\n✅ 成功保存 {len(all_records)} 条记录至: {output_path}\")\n",
" else:\n",
" print(\"\\n❌ 无有效数据\")\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" GetYDData().main()"
],
"id": "1974d0f55ce6f25d",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"📅 计划拉取以下时间段:\n",
" 1. 2025-01-01T00:00:00Z → 2025-02-01T00:00:00Z\n",
" 2. 2025-02-01T00:00:00Z → 2025-03-01T00:00:00Z\n",
" 3. 2025-03-01T00:00:00Z → 2025-04-01T00:00:00Z\n",
" 4. 2025-04-01T00:00:00Z → 2025-05-01T00:00:00Z\n",
" 5. 2025-05-01T00:00:00Z → 2025-06-01T00:00:00Z\n",
" 6. 2025-06-01T00:00:00Z → 2025-07-01T00:00:00Z\n",
" 7. 2025-07-01T00:00:00Z → 2025-08-01T00:00:00Z\n",
" 8. 2025-08-01T00:00:00Z → 2025-09-01T00:00:00Z\n",
" 9. 2025-09-01T00:00:00Z → 2025-10-01T00:00:00Z\n",
" 10. 2025-10-01T00:00:00Z → 2025-11-01T00:00:00Z\n",
" 11. 2025-11-01T00:00:00Z → 2025-12-01T00:00:00Z\n",
" 12. 2025-12-01T00:00:00Z → 2026-01-01T00:00:00Z\n",
" 13. 2026-01-01T00:00:00Z → 2026-01-14T00:00:00Z\n",
"\n",
"✅ 表单选项映射构建完成\n",
"\n",
"⏳ 拉取: 2025-01-01T00:00:00Z → 2025-02-01T00:00:00Z\n",
"📊 [2025-01-01 2025-02-01] 总记录数: 649, 共 7 页\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2025-01: 100%|██████████| 6/6 [00:12<00:00, 2.02s/it]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"⏳ 拉取: 2025-02-01T00:00:00Z → 2025-03-01T00:00:00Z\n",
"📊 [2025-02-01 2025-03-01] 总记录数: 936, 共 10 页\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2025-02: 100%|██████████| 9/9 [00:18<00:00, 2.01s/it]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"⏳ 拉取: 2025-03-01T00:00:00Z → 2025-04-01T00:00:00Z\n",
"📊 [2025-03-01 2025-04-01] 总记录数: 1272, 共 13 页\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2025-03: 100%|██████████| 12/12 [00:24<00:00, 2.08s/it]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"⏳ 拉取: 2025-04-01T00:00:00Z → 2025-05-01T00:00:00Z\n",
"📊 [2025-04-01 2025-05-01] 总记录数: 1099, 共 11 页\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2025-04: 100%|██████████| 10/10 [00:20<00:00, 2.10s/it]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"⏳ 拉取: 2025-05-01T00:00:00Z → 2025-06-01T00:00:00Z\n",
"📊 [2025-05-01 2025-06-01] 总记录数: 1327, 共 14 页\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2025-05: 100%|██████████| 13/13 [00:27<00:00, 2.10s/it]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"⏳ 拉取: 2025-06-01T00:00:00Z → 2025-07-01T00:00:00Z\n",
"📊 [2025-06-01 2025-07-01] 总记录数: 1169, 共 12 页\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2025-06: 100%|██████████| 11/11 [00:22<00:00, 2.05s/it]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"⏳ 拉取: 2025-07-01T00:00:00Z → 2025-08-01T00:00:00Z\n",
"📊 [2025-07-01 2025-08-01] 总记录数: 1328, 共 14 页\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2025-07: 100%|██████████| 13/13 [00:25<00:00, 1.99s/it]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"⏳ 拉取: 2025-08-01T00:00:00Z → 2025-09-01T00:00:00Z\n",
"📊 [2025-08-01 2025-09-01] 总记录数: 1551, 共 16 页\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2025-08: 100%|██████████| 15/15 [00:31<00:00, 2.09s/it]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"⏳ 拉取: 2025-09-01T00:00:00Z → 2025-10-01T00:00:00Z\n",
"📊 [2025-09-01 2025-10-01] 总记录数: 1227, 共 13 页\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2025-09: 100%|██████████| 12/12 [00:24<00:00, 2.02s/it]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"⏳ 拉取: 2025-10-01T00:00:00Z → 2025-11-01T00:00:00Z\n",
"📊 [2025-10-01 2025-11-01] 总记录数: 1364, 共 14 页\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2025-10: 100%|██████████| 13/13 [00:25<00:00, 1.94s/it]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"⏳ 拉取: 2025-11-01T00:00:00Z → 2025-12-01T00:00:00Z\n",
"📊 [2025-11-01 2025-12-01] 总记录数: 2219, 共 23 页\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2025-11: 100%|██████████| 22/22 [00:47<00:00, 2.14s/it]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"⏳ 拉取: 2025-12-01T00:00:00Z → 2026-01-01T00:00:00Z\n",
"📊 [2025-12-01 2026-01-01] 总记录数: 3624, 共 37 页\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2025-12: 100%|██████████| 36/36 [01:25<00:00, 2.38s/it]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"⏳ 拉取: 2026-01-01T00:00:00Z → 2026-01-14T00:00:00Z\n",
"📊 [2026-01-01 2026-01-14] 总记录数: 1408, 共 15 页\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2026-01: 100%|██████████| 14/14 [00:30<00:00, 2.20s/it]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"📥 总共获取 19173 条流程实例\n",
"\n",
"✅ 成功保存 19173 条记录至: output\\converted_yd_data.csv\n"
]
}
],
"execution_count": 9
},
{
"metadata": {},
"cell_type": "markdown",
"source": "# 数据过滤",
"id": "d4493b51868b83e6"
},
{
"cell_type": "code",
"id": "initial_id",
"metadata": {
"collapsed": true,
"ExecuteTime": {
"end_time": "2026-01-14T05:38:38.667141600Z",
"start_time": "2026-01-14T05:38:26.972349600Z"
}
},
"source": [
"import sys\n",
"import os\n",
"import pandas as pd\n",
"import ast\n",
"from datetime import datetime, timedelta\n",
"import pytz\n",
"\n",
"# 获取当前 notebook 所在目录的上级目录,并加入 Python 路径\n",
"nb_path = os.path.abspath('') # 当前 notebook 所在目录\n",
"parent_dir = os.path.dirname(nb_path)\n",
"sys.path.append(parent_dir)\n",
"\n",
"from yd_api import YDAPI\n",
"from api import API\n",
"\n",
"output_dir = \"output\"\n",
"os.makedirs(output_dir, exist_ok=True)\n",
"\n",
"api_instance = API()\n",
"yd_api_instance = YDAPI()\n",
"\n",
"df = pd.read_csv(\"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\converted_yd_data.csv\").astype(str)\n",
"# df = df[df[\"instanceStatus\"] == \"RUNNING\"]\n",
"\n",
"# 检查 data 列的第一个非空值类型\n",
"sample = df['data'].dropna().iloc[0] if not df['data'].dropna().empty else \"\"\n",
"print(\"Sample of 'data' column:\")\n",
"print(repr(sample)[:200])\n",
"\n",
"if isinstance(sample, str):\n",
" print(\"Detected string format, parsing with ast.literal_eval...\")\n",
"\n",
"\n",
" def safe_literal_eval(x):\n",
" if pd.isna(x) or x == \"\":\n",
" return {}\n",
" try:\n",
" return ast.literal_eval(x)\n",
" except (ValueError, SyntaxError) as e:\n",
" print(f\"Parse error on: {repr(x)[:100]}... Error: {e}\")\n",
" return {}\n",
"\n",
"\n",
" df['data'] = df['data'].apply(safe_literal_eval)\n",
"\n",
"# 展开 data 列\n",
"expanded = pd.json_normalize(df['data'])\n",
"other_cols = df.drop(columns=['data'])\n",
"new_df = pd.concat([other_cols.reset_index(drop=True), expanded.reset_index(drop=True)], axis=1)\n",
"\n",
"# 先转为字符串是为了避免后续列缺失报错,但日期处理需要原始类型\n",
"# 所以我们先做日期过滤,再转字符串(调整顺序)\n",
"\n",
"# ===== 新增:日期过滤逻辑 =====\n",
"date_col = \"dateField_ksirro5l\"\n",
"\n",
"if date_col not in new_df.columns:\n",
" print(f\"⚠️ Warning: Column '{date_col}' not found!\")\n",
"else:\n",
" # 将时间戳转为 datetime(假设是毫秒级 Unix 时间戳)\n",
" def parse_timestamp(val):\n",
" if pd.isna(val) or val == \"\" or val == \"NaT\":\n",
" return pd.NaT\n",
" try:\n",
" # 转换为 int 并除以 1000 得到秒级时间戳(毫秒级)\n",
" ts = int(float(val)) / 1000.0\n",
" return pd.to_datetime(ts, unit='s', utc=True)\n",
" except Exception as e:\n",
" print(f\"Parse error on timestamp: {val} -> Error: {e}\")\n",
" return pd.NaT\n",
"\n",
"\n",
" new_df[date_col] = new_df[date_col].apply(parse_timestamp)\n",
"\n",
" # 设置当前 UTC 时间\n",
" now_utc = pd.Timestamp.now(tz='UTC')\n",
" future_150_days = now_utc + timedelta(days=150)\n",
"\n",
" # 筛选条件:非空,且在 [now, now+150天] 区间内\n",
" # ===== 调整后:日期范围为 [过去30天, 未来120天] =====\n",
" past_30_days = now_utc - timedelta(days=30)\n",
" future_120_days = now_utc + timedelta(days=120)\n",
"\n",
" mask_date = (\n",
" new_df[date_col].notna() &\n",
" (new_df[date_col] >= past_30_days) &\n",
" (new_df[date_col] <= future_120_days)\n",
" )\n",
" new_df = new_df[mask_date]\n",
"\n",
"# 继续原有筛选:RUNNING 状态\n",
"# new_df = new_df[new_df[\"instanceStatus\"] == \"RUNNING\"]\n",
"\n",
"# 订单编码为空\n",
"col_order = \"textField_kto3q3ev\"\n",
"mask2 = (\n",
" new_df[col_order].isnull() |\n",
" (new_df[col_order].astype(str).str.strip() == \"\")\n",
")\n",
"new_df = new_df[mask2]\n",
"\n",
"# 最后再转为字符串(避免日期被转成奇怪字符串影响保存)\n",
"new_df = new_df.astype(str)\n",
"\n",
"# 保存结果\n",
"new_df.to_csv(\"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\expanded_yd_data.csv\", index=False, encoding='utf-8-sig')\n",
"print(\"✅ Expanded and filtered data saved to 'expanded_yd_data.csv'\")"
],
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"<>:22: SyntaxWarning: invalid escape sequence '\\I'\n",
"<>:106: SyntaxWarning: invalid escape sequence '\\I'\n",
"<>:22: SyntaxWarning: invalid escape sequence '\\I'\n",
"<>:106: SyntaxWarning: invalid escape sequence '\\I'\n",
"C:\\Users\\hp_z66\\AppData\\Local\\Temp\\ipykernel_2148\\1452756513.py:22: SyntaxWarning: invalid escape sequence '\\I'\n",
" df = pd.read_csv(\"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\converted_yd_data.csv\").astype(str)\n",
"C:\\Users\\hp_z66\\AppData\\Local\\Temp\\ipykernel_2148\\1452756513.py:106: SyntaxWarning: invalid escape sequence '\\I'\n",
" new_df.to_csv(\"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\expanded_yd_data.csv\", index=False, encoding='utf-8-sig')\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Sample of 'data' column:\n",
"'{\\'employeeField_ljz6416i_id\\': [\\'032861373036352679\\', \\'1253235059942945\\'], \\'textField_kuj8nx00\\': \\'山东省\\', \\'selectField_kuz7mfmd\\': \\'E.L\\', \\'textField_kuj8nx01\\': \\'菏泽市\\', \\'employeeField_ks\n",
"Detected string format, parsing with ast.literal_eval...\n",
"✅ Expanded and filtered data saved to 'expanded_yd_data.csv'\n"
]
}
],
"execution_count": 17
},
{
"metadata": {},
"cell_type": "markdown",
"source": "# 数据派发",
"id": "4ef868ec5caa7297"
},
{
"metadata": {},
"cell_type": "code",
"execution_count": null,
"source": "# 续约待办派发测试数据",
"id": "475a2fcbd8c1760d",
"outputs": []
},
{
"metadata": {},
"cell_type": "markdown",
"source": "# 字段刷写",
"id": "c4533794f287dc49"
},
{
"metadata": {
"ExecuteTime": {
"end_time": "2026-01-14T07:09:31.106319900Z",
"start_time": "2026-01-14T07:09:23.173718600Z"
}
},
"cell_type": "code",
"source": [
"import os\n",
"from datetime import datetime\n",
"import pandas as pd\n",
"from api import API\n",
"from back_ground_module import CommonModule\n",
"from log_config import configure_task_logger, configure_error_task_logger\n",
"from collections import defaultdict\n",
"from datetime import datetime, timezone, timedelta, date, UTC\n",
"from config import Config\n",
"from yd_api import YDAPI\n",
"\n",
"logger = configure_task_logger()\n",
"error_task_logger = configure_error_task_logger()\n",
"api_instance = API()\n",
"yd_api_instance = YDAPI()\n",
"common_module = CommonModule()\n",
"output_dir = \"output\" # 设置输出目录\n",
"os.makedirs(output_dir, exist_ok=True)\n",
"\n",
"df = pd.read_csv(\"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\expanded_yd_data.csv\").astype(str)\n",
"df2 = pd.read_excel(\n",
" \"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\续约服务流程_历史维修记录迁移测试_20260114094227.xlsx\").astype(\n",
" str) # 简道云导出文件\n",
"\n",
"# 从df中获取流程编码获取流程详细信息\n",
"token = yd_api_instance.generateToken()\n",
"FORMID = \"FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22\"\n",
"appType = \"APP_UYZ0KG6L0CCNV80GZ66O\"\n",
"systemToken = \"XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2\"\n",
"\n",
"all_instance_data = []\n",
"for index, row in df[:2].iterrows():\n",
" instance_id = row[\"processInstanceId\"]\n",
" instance_info = yd_api_instance.processes_instancesInfos(token, instance_id, appType, systemToken)\n",
" all_instance_data.append(instance_info.get(\"data\"))\n",
"\n",
"ndf = pd.DataFrame(all_instance_data)\n",
"ndf.to_csv(\"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\yd_process_details.csv\")\n",
"\n",
"# 简道云宜搭map\n",
"# step1 简道云字段唯一值map\n",
"jdy_map = {\"关联数据\": \"_widget_1764820541663\",\n",
" \"公司名称\": \"_widget_1764820541616\",\n",
" \"门店名称\": \"_widget_1764820541617\",\n",
" \"门店编码\": \"_widget_1764820541661\",\n",
" \"加盟商\": \"_widget_1764820541618\",\n",
" \"过期日\": \"_widget_1764820541672\",\n",
" \"Saas版本\": \"_widget_1764820541623\",\n",
" \"上次购买价格\": \"_widget_1764820541624\",\n",
" \"联系人\": \"_widget_1764820541621\",\n",
" \"联系手机号\": \"_widget_1764820541622\",\n",
" \"运营顾问\": \"_widget_1764820541625\",\n",
" \"订单商品名称\": \"_widget_1766730385209\",\n",
" \"120天是否跟进\": \"_widget_1764820541628\",\n",
" \"120天处理人\": \"_widget_1764820541634\",\n",
" \"120天跟进时间\": \"_widget_1765352838631\",\n",
" \"60天是否跟进\": \"_widget_1764820541630\",\n",
" \"60天处理人\": \"_widget_1764820541635\",\n",
" \"60天跟进时间\": \"_widget_1765352838632\",\n",
" \"30天是否跟进\": \"_widget_1764820541632\",\n",
" \"30天处理人\": \"_widget_1764820541636\",\n",
" \"30天跟进时间\": \"_widget_1765352838633\",\n",
" \"是否联系上\": \"_widget_1764820541638\",\n",
" \"现阶段问题\": \"_widget_1764820541641\",\n",
" \"未联系原因\": \"_widget_1765330820509\",\n",
" \"联系情况及问题说明\": \"_widget_1764820541653\",\n",
" \"回访方式\": \"_widget_1764820541697\",\n",
" \"潜在商机\": \"_widget_1764820541657\",\n",
" \"商机归属\": \"_widget_1766633812301\",\n",
" \"商机详情\": \"_widget_1764820541659\",\n",
" \"续约意愿\": \"_widget_1764820541654\",\n",
" \"不续约原因\": \"_widget_1764820541700\",\n",
" \"产品问题\": \"_widget_1764820541707\",\n",
" \"服务问题\": \"_widget_1764820541709\",\n",
" \"门店问题\": \"_widget_1764820541711\",\n",
" \"价格问题\": \"_widget_1764820541713\",\n",
" \"不续约具体情况说明\": \"_widget_1764820541702\",\n",
" \"周期性增购\": \"_widget_1764820541717\",\n",
" \"周期性增购.商品名称\": \"_widget_1764820541717._widget_1764820541719\",\n",
" \"周期性增购.购买数量\": \"_widget_1764820541717._widget_1764820541722\",\n",
" \"周期性增购.购买金额\": \"_widget_1764820541717._widget_1764820541720\",\n",
" \"周期性增购.应续约日\": \"_widget_1764820541717._widget_1764820541721\",\n",
" \"周期性增购.是否愿意续约\": \"_widget_1764820541717._widget_1764820541724\",\n",
" \"周期性增购.不续约原因\": \"_widget_1764820541717._widget_1764820541723\",\n",
" \"周期性增购.续约后订单编码\": \"_widget_1764820541717._widget_1764820541725\",\n",
" \"连锁门店待办同步处理\": \"_widget_1764820541681\",\n",
" \"选择需要同步的门店名称\": \"_widget_1765330820391\",\n",
" \"订单编码\": \"_widget_1764820541674\",\n",
" \"订单支付日期\": \"_widget_1764820541679\",\n",
" \"本次-实付金额(元)\": \"_widget_1764820541676\",\n",
" \"业务类型(续约、升级)\": \"_widget_1764820541680\",\n",
" \"120天自动流转时间\": \"_widget_1764820541865\",\n",
" \"60天自动流转时间\": \"_widget_1765964381895\",\n",
" \"30天自动流转时间\": \"_widget_1765964381896\",\n",
" \"0天自动流转时间\": \"_widget_1765964381897\",\n",
" \"当前所处节点\": \"_widget_1765352838609\",\n",
" \"流程状态\": \"_widget_1765352838610\",\n",
" \"经营模式\": \"_widget_1765964381952\",\n",
" \"程序关停\": \"_widget_1765866283543\",\n",
" \"区域客服\": \"_widget_1764820541715\",\n",
" \"公司等级\": \"_widget_1766130435561\",\n",
" \"运营专家\": \"_widget_1764820541678\",\n",
" \"流水号\": \"_widget_1766376046563\",\n",
" \"是否已同步宜搭\": \"_widget_1766469131897\",\n",
" \"连锁店数据同步辅助:门店列表\": \"_widget_1766633812134\",\n",
" \"公司id\": \"_widget_1766631811839\",\n",
" }\n",
"# step2 宜搭唯一值简道云唯一值map\n",
"yd_jdy_map = {\n",
" \"120天是否跟进\": \"radioField_kuntp6fm\",\n",
" \"120天处理人\": \"textField_livc8bjj\",\n",
" \"120天跟进时间\": \"dateField_lifr1fdv\",\n",
" \"60天是否跟进\": \"radioField_kurxyhvp\",\n",
" \"60天处理人\": \"textField_livc8bjl\",\n",
" \"60天跟进时间\": \"dateField_lifr1fdx\",\n",
" \"30天是否跟进\": \"radioField_kurxyhvq\",\n",
" \"30天处理人\": \"textField_livc8bjm\",\n",
" \"30天跟进时间\": \"dateField_lifr1fdy\",\n",
"}\n",
"\n",
"\n",
"\n",
"\n"
],
"id": "1a9d527c7026a96c",
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"<>:20: SyntaxWarning: invalid escape sequence '\\I'\n",
"<>:22: SyntaxWarning: invalid escape sequence '\\I'\n",
"<>:38: SyntaxWarning: invalid escape sequence '\\I'\n",
"<>:20: SyntaxWarning: invalid escape sequence '\\I'\n",
"<>:22: SyntaxWarning: invalid escape sequence '\\I'\n",
"<>:38: SyntaxWarning: invalid escape sequence '\\I'\n",
"C:\\Users\\hp_z66\\AppData\\Local\\Temp\\ipykernel_2148\\1799850687.py:20: SyntaxWarning: invalid escape sequence '\\I'\n",
" df = pd.read_csv(\"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\expanded_yd_data.csv\").astype(str)\n",
"C:\\Users\\hp_z66\\AppData\\Local\\Temp\\ipykernel_2148\\1799850687.py:22: SyntaxWarning: invalid escape sequence '\\I'\n",
" \"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\续约服务流程_历史维修记录迁移测试_20260114094227.xlsx\").astype(\n",
"C:\\Users\\hp_z66\\AppData\\Local\\Temp\\ipykernel_2148\\1799850687.py:38: SyntaxWarning: invalid escape sequence '\\I'\n",
" ndf.to_csv(\"D:\\Idea Project\\SaaS_V1.7\\\\test\\output\\yd_process_details.csv\")\n"
]
}
],
"execution_count": 22
},
{
"metadata": {
"ExecuteTime": {
"end_time": "2026-01-14T05:42:53.361991700Z",
"start_time": "2026-01-14T05:42:53.310045900Z"
}
},
"cell_type": "code",
"source": "token",
"id": "ad8d93d00c8a3be4",
"outputs": [
{
"data": {
"text/plain": [
"'e22e9617f9c23d20b6575cf7ecdcbbd2'"
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"execution_count": 19
},
{
"metadata": {},
"cell_type": "markdown",
"source": "# 流程同步",
"id": "2c6a1c0a573f24cb"
},
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": "",
"id": "7652deba86d75e24"
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
@@ -0,0 +1,59 @@
import os
import pandas as pd
import json
import ast # 👈 新增导入
from yd_api import YDAPI
from api import API
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)
api_instance = API()
yd_api_instance = YDAPI()
df = pd.read_csv(os.path.join(output_dir, "converted_yd_data.csv"))
# 检查 data 列的第一个非空值类型
sample = df['data'].dropna().iloc[0] if not df['data'].dropna().empty else ""
print("Sample of 'data' column:")
print(repr(sample)[:200]) # 打印前200字符,看是单引号还是双引号
# 如果是字符串(且是 Python dict 格式,单引号),用 ast.literal_eval
if isinstance(sample, str):
print("Detected string format, parsing with ast.literal_eval...")
# 安全地将字符串转为字典
def safe_literal_eval(x):
if pd.isna(x) or x == "":
return {}
try:
return ast.literal_eval(x)
except (ValueError, SyntaxError) as e:
print(f"Parse error on: {repr(x)[:100]}... Error: {e}")
return {}
df['data'] = df['data'].apply(safe_literal_eval)
# 展开 data 列
expanded = pd.json_normalize(df['data'])
# 保留其他列(注意:原列是 'data',不是 'raw_data'
other_cols = df.drop(columns=['data'])
# 合并
new_df = pd.concat([other_cols.reset_index(drop=True), expanded.reset_index(drop=True)], axis=1).astype(str)
# 过滤进行中
new_df = new_df[new_df["instanceStatus"] == "RUNNING"]
# 订单编码为空
col = "textField_kto3q3ev"
mask2 = (
new_df[col].isnull() |
(new_df[col].astype(str).str.strip() == "")
)
new_df = new_df[mask2]
# 保存结果
new_df.to_csv(os.path.join(output_dir, "expanded_yd_data.csv"), index=False, encoding='utf-8-sig')
print("✅ Expanded data saved to 'expanded_yd_data.csv'")
+20 -9
View File
@@ -1,6 +1,7 @@
import os
from datetime import datetime, timezone, timedelta
import pandas as pd
from holidays.countries import saint_martin as record
from tqdm import tqdm
import json
from yd_api import YDAPI
@@ -48,11 +49,11 @@ class GetYDData:
self.systemToken = "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2"
# 第一段:2025-01-01 到 2025-11-01
first_segment = ("2025-01-01T00:00:00Z", "2025-11-01T00:00:00Z")
first_segment = ("2025-01-01T00:00:00Z", "2025-02-01T00:00:00Z")
# 第二段:2025-11-01 到当前时间(按月拆分)
now_utc_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
monthly_segments = generate_monthly_ranges("2025-11-01T00:00:00Z", now_utc_str)
monthly_segments = generate_monthly_ranges("2025-02-01T00:00:00Z", now_utc_str)
# 合并所有时间段
self.time_ranges = [first_segment] + monthly_segments
@@ -114,7 +115,7 @@ class GetYDData:
n=100,
appType=self.appType,
systemToken=self.systemToken,
instanceStatus="RUNNING",
instanceStatus="",
modifiedFromTimeGMT=start_time,
modifiedToTimeGMT=end_time,
)
@@ -138,7 +139,7 @@ class GetYDData:
n=100,
appType=self.appType,
systemToken=self.systemToken,
instanceStatus="RUNNING",
instanceStatus="",
modifiedFromTimeGMT=start_time,
modifiedToTimeGMT=end_time,
)
@@ -162,14 +163,20 @@ class GetYDData:
# Step 2: 按时间段拉取
all_records = []
all_records_detils = []
for start_time, end_time in self.time_ranges:
print(f"\n⏳ 拉取: {start_time}{end_time}")
records = self.fetch_records_in_range(token, start_time, end_time)
all_records.extend(records)
try:
record_data = record.get("data", [])
all_records_detils.extend(record_data)
except Exception as e:
continue
print(f"\n📥 总共获取 {len(all_records)} 条流程实例")
# Step 3: 转换 formData
# # Step 3: 转换 formData
converted_records = []
for inst in all_records:
form_data = inst.get("formData", {})
@@ -177,11 +184,15 @@ class GetYDData:
converted_records.append(converted)
# Step 4: 保存
if converted_records:
df = pd.DataFrame(converted_records)
if all_records:
df = pd.DataFrame(all_records)
output_path = os.path.join(output_dir, "converted_yd_data.csv")
df.to_csv(output_path, index=False, encoding="utf_8_sig")
print(f"\n✅ 成功保存 {len(converted_records)} 条记录至: {output_path}")
df.to_csv(output_path, index=False)
df1 = pd.DataFrame(all_records_detils)
output_path1 = os.path.join(output_dir, "converted_yd_data_detail.csv")
df1.to_csv(output_path1, index=False)
print(f"\n✅ 成功保存 {len(all_records)} 条记录至: {output_path}")
else:
print("\n❌ 无有效数据")
-1
View File
@@ -122,7 +122,6 @@ class RenewalToDo:
def load_all_data(self):
"""
从各类来源加载数据上加载数据
:return:
"""
# 数据库获取续约回访数据
+114 -18
View File
@@ -5,7 +5,8 @@ from api import API
from back_ground_module import CommonModule
from log_config import configure_task_logger, configure_error_task_logger
from collections import defaultdict
from datetime import datetime, timezone, timedelta, date, UTC
from config import Config
logger = configure_task_logger()
error_task_logger = configure_error_task_logger()
api_instance = API()
@@ -14,7 +15,81 @@ output_dir = "output" # 设置输出目录
os.makedirs(output_dir, exist_ok=True)
import pandas as pd
import psycopg2
from datetime import datetime, timedelta, date
import os
def get_renewal_details():
"""
从固定的数据库中获取续约待办数据,先拉全量(指定 date_id),再用 CSV 中的门店编码在内存中过滤
"""
try:
# 1. 从 CSV 文件中读取门店编码
csv_path = r"D:\Idea Project\SaaS_V1.7\test\output\expanded_yd_data.csv"
if not os.path.exists(csv_path):
error_task_logger.error(f"CSV 文件不存在: {csv_path}")
return pd.DataFrame()
store_df = pd.read_csv(csv_path, dtype=str)
if "textField_ksydghqw" not in store_df.columns:
error_task_logger.error("CSV 文件中缺少列 'textField_ksydghqw'")
return pd.DataFrame()
store_codes = set(store_df["textField_ksydghqw"].dropna().unique()) # 转为 set 提升查找效率
if not store_codes:
error_task_logger.warning("CSV 中未找到有效的门店编码")
return pd.DataFrame()
# 2. 连接数据库
conn = psycopg2.connect(**Config.CONN_INFO)
cursor = conn.cursor()
# 获取前两天的 date_id(整数格式 YYYYMMDD
now_time = datetime.now()
yes_time = now_time + timedelta(days=-2)
yes_time_nyr = int(yes_time.strftime('%Y%m%d'))
# 3. 构造 SQL 查询:不再包含 org_code 过滤
sql = """
SELECT *
FROM "public"."holo_ads_report_saas_profile_ngv_detail_d"
WHERE "date_id" = %s
"""
# 执行查询(只传 date_id
cursor.execute(sql, (yes_time_nyr,))
rows = cursor.fetchall()
all_fields = cursor.description
# 转换为 DataFrame
col = [i[0] for i in all_fields]
data_NGV = pd.DataFrame(rows, columns=col) if rows else pd.DataFrame(columns=col)
# 关闭连接
cursor.close()
conn.close()
# 4. 在内存中用 store_codes 过滤 org_code
if "org_code" not in data_NGV.columns:
error_task_logger.error("数据库结果中缺少 'org_code' 字段")
return pd.DataFrame()
# 确保 org_code 是字符串类型(与 store_codes 一致)
data_NGV = data_NGV.copy() # 避免 SettingWithCopyWarning
data_NGV["org_code"] = data_NGV["org_code"].astype(str)
# 过滤:只保留 org_code 在 store_codes 中的行
filtered_data = data_NGV[data_NGV["org_code"].isin(store_codes)].reset_index(drop=True)
return filtered_data
except Exception as e:
error_task_logger.error(f"获取续约待办数据时出错: {e}", exc_info=True)
return pd.DataFrame()
class RenewalToDo:
"""续约回访待办派发"""
def __init__(self):
self.renewal_data_list = None
self.cyclic_increasing = None
@@ -84,6 +159,8 @@ class RenewalToDo:
"流程状态": "_widget_1765352838610",
"经营模式": "_widget_1765964381952",
"公司等级": "_widget_1766130435561",
"公司id": "_widget_1766631811839",
"订单商品名称": "_widget_1766730385209",
"提交人": "creator",
"提交时间": "createTime",
"更新时间": "updateTime"
@@ -101,6 +178,7 @@ class RenewalToDo:
"group_grade": "公司等级",
"technician": "运营专家",
"manage_model": "经营模式",
"id_own_group": "公司id",
}
self.subform_field_map = {
"商品名称": "_widget_1764820541719",
@@ -112,7 +190,7 @@ class RenewalToDo:
"续约后订单编码": "_widget_1764820541725",
# 根据实际需要添加更多字段
}
self.renewal_list_map ={
self.renewal_list_map = {
}
@@ -123,7 +201,9 @@ class RenewalToDo:
"""
# 数据库获取续约回访数据
self.data_NGV = pd.read_csv(os.path.join(output_dir, "data_NGV.csv"), encoding="gbk")
self.data_NGV = common_module.get_renewal_details()
# self.data_NGV = get_renewal_details() # 历史数据
self.data_NGV.to_csv("D:\\Idea Project\\SaaS_V1.7\\test\\output\data_NGV1.csv")
# 获取加盟商信息
self.franchisee = common_module.get_renewal_franchisee_details()
@@ -344,6 +424,10 @@ class RenewalToDo:
data_NGV['60天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=30)
data_NGV['30天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=0)
data_NGV['0天自动流转时间'] = data_NGV['过期日'] + pd.Timedelta(days=90)
data_NGV['120天是否跟进'] = "主动"
data_NGV['60天是否跟进'] = "主动"
data_NGV['30天是否跟进'] = "主动"
# 格式化为字符串(去掉时区)
for col in ['过期日', '120天自动流转时间', '60天自动流转时间', '30天自动流转时间', '0天自动流转时间']:
data_NGV[col] = data_NGV[col].dt.strftime('%Y-%m-%d %H:%M:%S')
@@ -356,24 +440,39 @@ class RenewalToDo:
)
# 新增上次购买价格列
# 1. 清洗并拼接类型+价格
df_lp = self.last_price[['门店编码', '类型', '价格']].copy()
# 1. 清洗数据
df_lp = self.last_price[['门店编码', '类型', '订单商品名称', '价格']].copy()
# 处理“类型”和“订单商品名称”的缺失值
df_lp['类型'] = df_lp['类型'].fillna('').astype(str)
df_lp['订单商品名称'] = df_lp['订单商品名称'].fillna('').astype(str)
# 处理价格:转数字、四舍五入、填0、转字符串
df_lp['价格'] = (
pd.to_numeric(df_lp['价格'], errors='coerce')
.round().fillna(0).astype(int).astype(str)
)
df_lp['类型_价格'] = df_lp['类型'] + df_lp['价格']
# 2. 按门店聚合,分号连接
agg_df = df_lp.groupby('门店编码', as_index=False)['类型_价格'].apply(';'.join)
# 2. 拼接“类型:价格”
df_lp['类型_价格'] = df_lp['类型'] + ':' + df_lp['价格']
# 3. 合并回主表
data_NGV = data_NGV.merge(agg_df, on='门店编码', how='left').fillna({'类型_价格': ''})
data_NGV.rename(columns={'类型_价格': '上次购买价格'}, inplace=True)
# 3. 按门店聚合两列
agg_df = df_lp.groupby('门店编码', as_index=False).agg({
'类型_价格': lambda x: ';'.join(x),
'订单商品名称': lambda x: ';'.join(x)
})
# 4. 处理没有匹配记录的门店(填空或默认值)
data_NGV['上次购买价格'] = data_NGV['上次购买价格'].fillna('')
# 4. 合并回主表
data_NGV = data_NGV.merge(agg_df, on='门店编码', how='left')
# 5. 填充缺失值为空字符串,并重命名列
data_NGV['类型_价格'] = data_NGV['类型_价格'].fillna('')
data_NGV['订单商品名称'] = data_NGV['订单商品名称'].fillna('')
data_NGV.rename(columns={
'类型_价格': '上次购买价格',
'订单商品名称': '订单商品名称'
}, inplace=True)
# 成员字段替换(现在列名是中文)
staff_name_cols = [
@@ -482,7 +581,7 @@ class RenewalToDo:
payload = {
"api_key": "675b900991ad2491c69389ca",
"entry_id": "6931063d64187eaf6b927557",
"entry_id": "6965eec36b73376aa0b5bff8",
"data_list": records
}
print(payload)
@@ -502,11 +601,8 @@ class RenewalToDo:
data_NGV = self.process_data()
# step3:数据派发
self.dispatch_task(data_NGV)
# step4:过期日发生变化更新已有表单
# step5:自动同意原表单
common_module.send_task_status(task_start_time, "续约回访待办")
# common_module.send_task_status(task_start_time, "续约回访待办")
except Exception as e:
error_task_logger.error(f"续约回访待办发生错误{e}")
# common_module.send_task_error(task_start_time, "续约回访待办", str(e))
+4 -4
View File
@@ -551,8 +551,8 @@
{
"metadata": {
"ExecuteTime": {
"end_time": "2025-08-21T02:06:22.956060Z",
"start_time": "2025-08-21T02:06:22.797879Z"
"end_time": "2026-01-13T03:20:16.053485200Z",
"start_time": "2026-01-13T03:20:15.845522200Z"
}
},
"cell_type": "code",
@@ -577,7 +577,7 @@
"} # 衡时数据库链接配置-mysql\n",
"\n",
"# 表名\n",
"table_name = \"test\" # 请替换为实际的表名\n",
"table_name = \"jdy_ngv_data_source\" # 请替换为实际的表名\n",
"\n",
"# 连接数据库\n",
"connection = mysql.connector.connect(\n",
@@ -605,7 +605,7 @@
"name": "stdout",
"output_type": "stream",
"text": [
"成功删除表 test\n"
"成功删除表 jdy_ngv_data_source\n"
]
}
],