diff --git a/.idea/sqldialects.xml b/.idea/sqldialects.xml new file mode 100644 index 0000000..250a1dc --- /dev/null +++ b/.idea/sqldialects.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/storage/mysql_agent.py b/storage/mysql_agent.py index 65c4119..69f932f 100644 --- a/storage/mysql_agent.py +++ b/storage/mysql_agent.py @@ -199,7 +199,7 @@ class MySQLAgent: def insert_from_df(self, table_name: str, df: pd.DataFrame, chunk_size: int = 1000, replace: bool = False) -> int: """ - 将DataFrame数据插入到数据库表 + 将DataFrame数据插入到数据库表(修复版) Args: table_name (str): 目标表名 @@ -226,11 +226,28 @@ class MySQLAgent: method = 'replace' if replace else 'append' total_rows = 0 - with self.get_connection() as conn: - # 各平台不同的分批策略 - if platform.system() == 'Windows': - chunk_size = min(chunk_size, 500) # Windows上减小批次 + # 创建临时SQLAlchemy引擎(不创建新连接池) + from sqlalchemy import create_engine + from sqlalchemy.pool import StaticPool + # 获取当前连接并包装 + conn = self.get_connection() + + # 修复连接对象缺少character_set_name的问题 + if not hasattr(conn, 'character_set_name'): + conn.character_set_name = lambda: self.config.get('charset', 'utf8mb4') + + engine = create_engine( + "mysql+pymysql://", + creator=lambda: conn, + poolclass=StaticPool, # 使用静态池避免创建新连接 + connect_args={ + 'charset': self.config.get('charset', 'utf8mb4'), + 'autocommit': True + } + ) + + try: for i in range(0, len(df), chunk_size): chunk = df.iloc[i:i + chunk_size] @@ -241,7 +258,7 @@ class MySQLAgent: chunk.to_sql( table_name, - conn, + engine, if_exists=method, index=False, method='multi' @@ -252,10 +269,14 @@ class MySQLAgent: rows=len(chunk), total_inserted=total_rows) - self.log.info("Data inserted successfully", - table=table_name, - total_rows=total_rows) - return total_rows + self.log.info("Data inserted successfully", + table=table_name, + total_rows=total_rows) + return total_rows + finally: + # 确保连接正确关闭 + engine.dispose() + conn.close() except Exception as e: self.log.error("Data insertion failed", @@ -516,7 +537,7 @@ class MySQLAgent: self.log.debug("Transaction started") return conn except Exception as e: - self.log.error("Begin transaction failed", error=str(e), exc_info=True) + self.log.error("Begin transaction_failed", error=str(e)) raise def commit_transaction(self, conn: pymysql.connections.Connection) -> None: @@ -616,7 +637,7 @@ def get_default_config(): 'port': 3306, 'user': 'root', 'password': '123123', - 'database': 'intelligence_system', + 'database': 'intelligence', 'max_connections': 5 }