import pymysql import sys import time # ================== 配置信息 ================== SOURCE_CONFIG = { 'host': "f6-public.rwlb.rds.aliyuncs.com", 'user': "rw_operation_data_relay", 'password': "m+q5Z4%IVuF9bf", 'database': "f6operation_data_relay", 'connect_timeout': 30, 'read_timeout': 600, 'write_timeout': 600 } TARGET_CONFIG = { 'host': "db-f6operation-sst.f6car.org", 'user': "rw_operation", 'password': "tDm45eBj@upzLydHc", 'database': "f6operation_data_relay", 'connect_timeout': 30, 'read_timeout': 600, 'write_timeout': 600 } TABLE_NAME = 'rpt_customized_maintain_detail' READ_BATCH_SIZE = 1000 # 每次从源库读 5000 行 WRITE_BATCH_SIZE = 1000 # 每次向目标库写 5000 行 # ============================================== def main(): print("🔧 正在从源数据库读取表结构...") # === 第一步:获取建表语句 === source_conn = None try: source_conn = pymysql.connect(**SOURCE_CONFIG) with source_conn.cursor() as cursor: cursor.execute(f"SHOW CREATE TABLE `{TABLE_NAME}`") result = cursor.fetchone() if not result: raise Exception(f"表 {TABLE_NAME} 不存在于源数据库") create_table_sql = result[1] except Exception as e: print(f"❌ 读取表结构失败: {e}") sys.exit(1) finally: if source_conn: source_conn.close() # === 第二步:获取总行数(用于进度)=== total_rows = 0 try: source_conn = pymysql.connect(**SOURCE_CONFIG) with source_conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM `{TABLE_NAME}`") total_rows = cursor.fetchone()[0] except Exception as e: print(f"⚠️ 无法获取总行数(不影响迁移): {e}") finally: if source_conn: source_conn.close() print(f"📊 表共约 {total_rows} 行,将分批读取和写入...") # === 第三步:重建目标表 === target_conn = None columns = [] try: target_conn = pymysql.connect(**TARGET_CONFIG) with target_conn.cursor() as cursor: cursor.execute(f"DROP TABLE IF EXISTS `{TABLE_NAME}`") new_create_sql = create_table_sql.replace(f"`{SOURCE_CONFIG['database']}`.", "") cursor.execute(new_create_sql) # 获取列名(从建表语句解析较复杂,改用查一次空结果) cursor.execute(f"SELECT * FROM `{TABLE_NAME}` LIMIT 0") columns = [desc[0] for desc in cursor.description] target_conn.commit() print("✅ 目标表已重建") except Exception as e: print(f"❌ 重建目标表失败: {e}") if target_conn: target_conn.close() sys.exit(1) # === 第四步:分批读取 + 分批写入 === offset = 0 inserted_total = 0 while True: time.sleep(5) # 从源库读取一批 batch_rows = [] try: source_conn = pymysql.connect(**SOURCE_CONFIG) with source_conn.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute( f"SELECT * FROM `{TABLE_NAME}` LIMIT %s OFFSET %s", (READ_BATCH_SIZE, offset) ) batch_rows = cursor.fetchall() source_conn.close() except Exception as e: print(f"❌ 读取第 {offset//READ_BATCH_SIZE + 1} 批数据失败: {e}") break if not batch_rows: print("🔚 数据读取完成") break # 转换为元组 data_tuples = [tuple(row[col] for col in columns) for row in batch_rows] # 写入目标库(可再分小批,但这里 batch_rows 已是 5000) try: with target_conn.cursor() as cursor: placeholders = ', '.join(['%s'] * len(columns)) insert_sql = f"INSERT INTO `{TABLE_NAME}` (`{'`, `'.join(columns)}`) VALUES ({placeholders})" cursor.executemany(insert_sql, data_tuples) target_conn.commit() inserted_total += len(data_tuples) print(f"📤 已写入 {inserted_total} / {total_rows} 行") except Exception as e: print(f"❌ 写入失败(批次 offset={offset}): {e}") target_conn.rollback() break offset += READ_BATCH_SIZE # === 清理 === if target_conn and target_conn.open: target_conn.close() print(f"🎉 迁移完成!共写入 {inserted_total} 行") if __name__ == "__main__": main()