Files
saas/test/ngv写入数据库.ipynb
T
2026-01-14 15:13:44 +08:00

315 lines
14 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{
"cells": [
{
"cell_type": "code",
"id": "initial_id",
"metadata": {
"collapsed": true,
"ExecuteTime": {
"end_time": "2026-01-13T07:56:05.597737600Z",
"start_time": "2026-01-13T07:50:57.192717400Z"
}
},
"source": [
"import time\n",
"\n",
"import pandas as pd\n",
"import psycopg2\n",
"import mysql.connector\n",
"from mysql.connector import Error\n",
"from datetime import datetime, timedelta\n",
"import numpy as np\n",
"\n",
"# ========== 配置 ==========\n",
"PG_CONN_INFO = {\n",
" \"database\": \"f6_bi\",\n",
" \"user\": \"LTAI5tMJsijFA9BS1R6uBpUT\",\n",
" \"password\": \"PajEQMIRWNRcipd8mYvlud2KHWJr6N\",\n",
" \"host\": \"hgpostcn-cn-m1e4gikbu00l-cn-shanghai.hologres.aliyuncs.com\",\n",
" \"port\": \"80\"\n",
"}\n",
"\n",
"MYSQL_CONFIG = {\n",
" 'host': \"f6-public.rwlb.rds.aliyuncs.com\",\n",
" 'user': \"rw_operation_data_relay\",\n",
" 'password': \"m+q5Z4%IVuF9bf\",\n",
" 'database': \"f6operation_data_relay\"\n",
"}\n",
"\n",
"SOURCE_TABLE = '\"public\".\"holo_ads_report_saas_profile_ngv_detail_d\"'\n",
"PARTITION_COLUMN = \"date_id\"\n",
"TARGET_TABLE_MYSQL = \"jdy_ngv_data_source\"\n",
"BATCH_SIZE = 2000\n",
"\n",
"# ========== 辅助函数 ==========\n",
"def is_datetime_type(pg_type: str) -> bool:\n",
" if not pg_type:\n",
" return False\n",
" pg_type = pg_type.lower()\n",
" return any(kw in pg_type for kw in ['timestamp', 'datetime', 'date'])\n",
"\n",
"def clean_column_name(name, index):\n",
" \"\"\"将列名转为合法字符串,处理 None / nan / 空值\"\"\"\n",
" if name is None:\n",
" return f\"unknown_col_{index}\"\n",
" if isinstance(name, float) and pd.isna(name):\n",
" return f\"unknown_col_{index}\"\n",
" name_str = str(name).strip()\n",
" if not name_str or name_str.lower() in ('nan', 'none', 'null', ''):\n",
" return f\"unknown_col_{index}\"\n",
" return name_str\n",
"\n",
"def get_source_schema():\n",
" conn = psycopg2.connect(**PG_CONN_INFO)\n",
" cur = conn.cursor()\n",
" cur.execute(\"\"\"\n",
" SELECT column_name, data_type\n",
" FROM information_schema.columns\n",
" WHERE table_schema = 'public'\n",
" AND table_name = 'holo_ads_report_saas_profile_ngv_detail_d'\n",
" ORDER BY ordinal_position;\n",
" \"\"\")\n",
" raw_schema = cur.fetchall()\n",
" cur.close()\n",
" conn.close()\n",
"\n",
" # 清洗列名\n",
" cleaned_schema = []\n",
" for i, (col_name, data_type) in enumerate(raw_schema):\n",
" clean_name = clean_column_name(col_name, i)\n",
" cleaned_schema.append((clean_name, data_type or 'text'))\n",
" return cleaned_schema\n",
"\n",
"def create_ngv_table(cursor, schema):\n",
" col_defs = []\n",
" for col_name, pg_type in schema:\n",
" if is_datetime_type(pg_type):\n",
" col_defs.append(f\"`{col_name}` DATETIME\")\n",
" else:\n",
" col_defs.append(f\"`{col_name}` TEXT\") # ✅ 关键:用 TEXT 避免行大小超限\n",
" create_sql = f\"\"\"\n",
" CREATE TABLE IF NOT EXISTS `{TARGET_TABLE_MYSQL}` (\n",
" {\",\\n \".join(col_defs)}\n",
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC;\n",
" \"\"\"\n",
" cursor.execute(create_sql)\n",
" print(\"✅ MySQL 表 NGV 已创建(时间字段为 DATETIME,其余为 TEXT\")\n",
"\n",
"def normalize_datetime_cols(df, datetime_cols):\n",
" df = df.copy()\n",
" for col in datetime_cols:\n",
" if col in df.columns:\n",
" df[col] = pd.to_datetime(df[col], errors='coerce')\n",
" df[col] = df[col].dt.strftime('%Y-%m-%d %H:%M:%S').where(df[col].notnull(), None)\n",
" return df.where(pd.notnull(df), None)\n",
"\n",
"# ========== 主流程 ==========\n",
"def main():\n",
" # 1. 生成最近10天的 date_id(字符串格式)\n",
" date_ids = [\n",
" (datetime.now().date() - timedelta(days=i)).strftime(\"%Y%m%d\")\n",
" for i in range(3)\n",
" ]\n",
" print(f\"将同步以下 date_id 分区: {date_ids}\")\n",
"\n",
" # 2. 获取并清洗源表结构\n",
" schema = get_source_schema()\n",
" column_names = [col for col, _ in schema]\n",
" datetime_cols = [col for col, typ in schema if is_datetime_type(typ)]\n",
"\n",
" print(f\"检测到 {len(column_names)} 个字段,其中时间字段: {datetime_cols[:3]}...\")\n",
"\n",
" # 3. 连接 MySQL\n",
" mysql_conn = mysql.connector.connect(**MYSQL_CONFIG)\n",
" mysql_cursor = mysql_conn.cursor()\n",
"\n",
" try:\n",
" # 4. 创建目标表\n",
" create_ngv_table(mysql_cursor, schema)\n",
"\n",
" # 5. 构造插入 SQL\n",
" placeholders = \", \".join([\"%s\"] * len(column_names))\n",
" cols_str = \", \".join([f\"`{c}`\" for c in column_names])\n",
" insert_sql = f\"INSERT INTO `{TARGET_TABLE_MYSQL}` ({cols_str}) VALUES ({placeholders})\"\n",
"\n",
" # 6. 清空目标表\n",
" mysql_cursor.execute(f\"TRUNCATE TABLE `{TARGET_TABLE_MYSQL}`;\")\n",
" print(\"🗑️ 已清空 NGV 表\")\n",
"\n",
" # 7. 按 date_id 分批处理\n",
" # -- 新增:固定列名用于 SELECT --\n",
" fixed_columns = [col for col, _ in schema]\n",
" quoted_fixed_columns = \", \".join([f'\"{c}\"' for c in fixed_columns])\n",
"\n",
" # 动态选择排序字段(必须在 fixed_columns 中)\n",
" exclude_cols = {PARTITION_COLUMN} | set(datetime_cols)\n",
" candidates = [col for col in fixed_columns if col not in exclude_cols]\n",
" order_col = f'\"{candidates[0]}\"' if candidates else f'\"{PARTITION_COLUMN}\"'\n",
"\n",
" if \"org_code\" not in column_names:\n",
" raise ValueError(\"❌ 源表中未找到唯一字段 'org_code'\")\n",
"\n",
" for date_id in date_ids:\n",
" print(f\"\\n>>> 处理 date_id = {date_id}\")\n",
" last_org_code = None # 游标:上一批最大的 org_code\n",
" i = 1\n",
"\n",
" while True:\n",
" time.sleep(3)\n",
" pg_conn = psycopg2.connect(**PG_CONN_INFO)\n",
" pg_cur = pg_conn.cursor()\n",
"\n",
" # 构造 WHERE 条件\n",
" if last_org_code is None:\n",
" where_clause = f'\"{PARTITION_COLUMN}\" = %s'\n",
" params = (date_id,)\n",
" else:\n",
" where_clause = f'\"{PARTITION_COLUMN}\" = %s AND \"org_code\" > %s'\n",
" params = (date_id, last_org_code)\n",
"\n",
" sql = f\"\"\"\n",
" SELECT {quoted_fixed_columns}\n",
" FROM {SOURCE_TABLE}\n",
" WHERE {where_clause}\n",
" ORDER BY \"org_code\"\n",
" LIMIT {BATCH_SIZE};\n",
" \"\"\"\n",
" pg_cur.execute(sql, params)\n",
" rows = pg_cur.fetchall()\n",
" pg_cur.close()\n",
" pg_conn.close()\n",
"\n",
" if not rows:\n",
" break\n",
"\n",
" df_batch = pd.DataFrame(rows, columns=fixed_columns)\n",
" df_batch = normalize_datetime_cols(df_batch, datetime_cols)\n",
" df_batch.to_csv(f\"输出查看{i}.csv\", index=False)\n",
" i += 1\n",
"\n",
" # 更新游标:取本批最后一条的 org_code\n",
" last_org_code = df_batch.iloc[-1][\"org_code\"]\n",
"\n",
" # 清洗并插入 MySQL\n",
" def sanitize_row(row):\n",
" return tuple(\n",
" None if (isinstance(x, float) and pd.isna(x)) or pd.isna(x) else x\n",
" for x in row\n",
" )\n",
"\n",
" data_tuples = [sanitize_row(row) for row in df_batch.values]\n",
" mysql_cursor.executemany(insert_sql, data_tuples)\n",
" mysql_conn.commit()\n",
"\n",
" inserted = len(data_tuples)\n",
" print(f\" date_id={date_id} 已插入 {inserted} 行 (last_org_code={last_org_code})\")\n",
"\n",
" print(f\"✅ date_id={date_id} 同步完成\")\n",
"\n",
" print(f\"\\n🎉 同步完成!数据已写入 MySQL 表 `{TARGET_TABLE_MYSQL}`\")\n",
"\n",
" except Exception as e:\n",
" print(f\"❌ 错误: {e}\")\n",
" mysql_conn.rollback()\n",
" finally:\n",
" mysql_cursor.close()\n",
" mysql_conn.close()\n",
"\n",
"if __name__ == \"__main__\":\n",
" main()"
],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"将同步以下 date_id 分区: ['20260113', '20260112', '20260111']\n",
"检测到 141 个字段,其中时间字段: []...\n",
"✅ MySQL 表 NGV 已创建(时间字段为 DATETIME,其余为 TEXT\n",
"🗑️ 已清空 NGV 表\n",
"\n",
">>> 处理 date_id = 20260113\n",
"✅ date_id=20260113 同步完成\n",
"\n",
">>> 处理 date_id = 20260112\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS201812070004175)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS201903250025112)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS201907240033962)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS201910150040257)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS201912160046642)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202003230057861)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202006080088028)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202010040108419)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202104090119587)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202106210130145)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202108300139429)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202112050146822)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202204250175005)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202209300189703)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202303230219406)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202308210240694)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202402260259380)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202406260273963)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202411040284912)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202503300294788)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202507150304118)\n",
" date_id=20260112 已插入 2000 行 (last_org_code=CHS202511120312981)\n",
" date_id=20260112 已插入 1278 行 (last_org_code=TQB201509180060)\n",
"✅ date_id=20260112 同步完成\n",
"\n",
">>> 处理 date_id = 20260111\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS201812070004175)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS201903250025112)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS201907240033962)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS201910150040257)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS201912160046642)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202003230057861)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202006080088042)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202010050108424)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202104090119621)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202106210130146)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202108300139429)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202112050146822)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202204250175005)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202209300189703)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202303230219406)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202308210240694)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202402260259380)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202406260273963)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202411040284912)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202503300294779)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202507150304098)\n",
" date_id=20260111 已插入 2000 行 (last_org_code=CHS202511120312975)\n",
" date_id=20260111 已插入 1259 行 (last_org_code=TQB201509180060)\n",
"✅ date_id=20260111 同步完成\n",
"\n",
"🎉 同步完成!数据已写入 MySQL 表 `jdy_ngv_data_source`\n"
]
}
],
"execution_count": 7
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}