Files
saas/test/数据库表迁移.py
2026-04-09 09:53:47 +08:00

139 lines
4.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pymysql
import sys
import time
# ================== 配置信息 ==================
SOURCE_CONFIG = {
'host': "f6-public.rwlb.rds.aliyuncs.com",
'user': "rw_operation_data_relay",
'password': "m+q5Z4%IVuF9bf",
'database': "f6operation_data_relay",
'connect_timeout': 30,
'read_timeout': 600,
'write_timeout': 600
}
TARGET_CONFIG = {
'host': "db-f6operation-sst.f6car.org",
'user': "rw_operation",
'password': "tDm45eBj@upzLydHc",
'database': "f6operation_data_relay",
'connect_timeout': 30,
'read_timeout': 600,
'write_timeout': 600
}
TABLE_NAME = 'rpt_customized_maintain_detail'
READ_BATCH_SIZE = 1000 # 每次从源库读 5000 行
WRITE_BATCH_SIZE = 1000 # 每次向目标库写 5000 行
# ==============================================
def main():
print("🔧 正在从源数据库读取表结构...")
# === 第一步:获取建表语句 ===
source_conn = None
try:
source_conn = pymysql.connect(**SOURCE_CONFIG)
with source_conn.cursor() as cursor:
cursor.execute(f"SHOW CREATE TABLE `{TABLE_NAME}`")
result = cursor.fetchone()
if not result:
raise Exception(f"{TABLE_NAME} 不存在于源数据库")
create_table_sql = result[1]
except Exception as e:
print(f"❌ 读取表结构失败: {e}")
sys.exit(1)
finally:
if source_conn:
source_conn.close()
# === 第二步:获取总行数(用于进度)===
total_rows = 0
try:
source_conn = pymysql.connect(**SOURCE_CONFIG)
with source_conn.cursor() as cursor:
cursor.execute(f"SELECT COUNT(*) FROM `{TABLE_NAME}`")
total_rows = cursor.fetchone()[0]
except Exception as e:
print(f"⚠️ 无法获取总行数(不影响迁移): {e}")
finally:
if source_conn:
source_conn.close()
print(f"📊 表共约 {total_rows} 行,将分批读取和写入...")
# === 第三步:重建目标表 ===
target_conn = None
columns = []
try:
target_conn = pymysql.connect(**TARGET_CONFIG)
with target_conn.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS `{TABLE_NAME}`")
new_create_sql = create_table_sql.replace(f"`{SOURCE_CONFIG['database']}`.", "")
cursor.execute(new_create_sql)
# 获取列名(从建表语句解析较复杂,改用查一次空结果)
cursor.execute(f"SELECT * FROM `{TABLE_NAME}` LIMIT 0")
columns = [desc[0] for desc in cursor.description]
target_conn.commit()
print("✅ 目标表已重建")
except Exception as e:
print(f"❌ 重建目标表失败: {e}")
if target_conn:
target_conn.close()
sys.exit(1)
# === 第四步:分批读取 + 分批写入 ===
offset = 0
inserted_total = 0
while True:
time.sleep(5)
# 从源库读取一批
batch_rows = []
try:
source_conn = pymysql.connect(**SOURCE_CONFIG)
with source_conn.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(
f"SELECT * FROM `{TABLE_NAME}` LIMIT %s OFFSET %s",
(READ_BATCH_SIZE, offset)
)
batch_rows = cursor.fetchall()
source_conn.close()
except Exception as e:
print(f"❌ 读取第 {offset//READ_BATCH_SIZE + 1} 批数据失败: {e}")
break
if not batch_rows:
print("🔚 数据读取完成")
break
# 转换为元组
data_tuples = [tuple(row[col] for col in columns) for row in batch_rows]
# 写入目标库(可再分小批,但这里 batch_rows 已是 5000
try:
with target_conn.cursor() as cursor:
placeholders = ', '.join(['%s'] * len(columns))
insert_sql = f"INSERT INTO `{TABLE_NAME}` (`{'`, `'.join(columns)}`) VALUES ({placeholders})"
cursor.executemany(insert_sql, data_tuples)
target_conn.commit()
inserted_total += len(data_tuples)
print(f"📤 已写入 {inserted_total} / {total_rows}")
except Exception as e:
print(f"❌ 写入失败(批次 offset={offset}: {e}")
target_conn.rollback()
break
offset += READ_BATCH_SIZE
# === 清理 ===
if target_conn and target_conn.open:
target_conn.close()
print(f"🎉 迁移完成!共写入 {inserted_total}")
if __name__ == "__main__":
main()