优化RSS订阅收集器,增加父目录路径支持以便导入utils模块,改进数据库写入错误日志,更新插入操作以忽略重复记录,并调整错误信息的截断长度以保留重要信息。
This commit is contained in:
Generated
+7
@@ -0,0 +1,7 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project version="4">
|
||||||
|
<component name="DataSourcePerFileMappings">
|
||||||
|
<file url="file://$PROJECT_DIR$/tools/SQL.sql" value="36976640-4e4b-40d7-80c5-f77ff8c735e5" />
|
||||||
|
<file url="file://$PROJECT_DIR$/tools/情报收集.sql" value="36976640-4e4b-40d7-80c5-f77ff8c735e5" />
|
||||||
|
</component>
|
||||||
|
</project>
|
||||||
Binary file not shown.
Binary file not shown.
@@ -5,11 +5,19 @@ import pandas as pd
|
|||||||
import os
|
import os
|
||||||
import pickle
|
import pickle
|
||||||
import time
|
import time
|
||||||
|
import sys
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from utils.mysql_agent import MySQLAgent
|
|
||||||
from typing import Dict, List, Optional, Any
|
from typing import Dict, List, Optional, Any
|
||||||
|
|
||||||
|
# Add the parent directory to the Python path to find utils module
|
||||||
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
parent_dir = os.path.dirname(current_dir)
|
||||||
|
if parent_dir not in sys.path:
|
||||||
|
sys.path.insert(0, parent_dir)
|
||||||
|
|
||||||
|
from utils.mysql_agent import MySQLAgent
|
||||||
|
|
||||||
# 数据库连接配置
|
# 数据库连接配置
|
||||||
local_DB_Config = {
|
local_DB_Config = {
|
||||||
'host': "localhost",
|
'host': "localhost",
|
||||||
@@ -245,7 +253,7 @@ class NewsAPIClient:
|
|||||||
table_name=table_name,
|
table_name=table_name,
|
||||||
df=df,
|
df=df,
|
||||||
chunk_size=500,
|
chunk_size=500,
|
||||||
replace=False
|
ignore_duplicates=True
|
||||||
)
|
)
|
||||||
|
|
||||||
self.logger.info(f"成功写入 {inserted_rows}/{len(df)} 条记录")
|
self.logger.info(f"成功写入 {inserted_rows}/{len(df)} 条记录")
|
||||||
@@ -256,7 +264,15 @@ class NewsAPIClient:
|
|||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f"数据库写入失败: {str(e)}", exc_info=True)
|
self.logger.error(
|
||||||
|
"数据库写入失败",
|
||||||
|
error=str(e),
|
||||||
|
error_type=type(e).__name__,
|
||||||
|
table_name=table_name,
|
||||||
|
record_count=len(df),
|
||||||
|
sample_records=df.head(2).to_dict('records') if not df.empty else [],
|
||||||
|
exc_info=True
|
||||||
|
)
|
||||||
return self._format_result(False, f"数据库操作失败: {str(e)}")
|
return self._format_result(False, f"数据库操作失败: {str(e)}")
|
||||||
@classmethod
|
@classmethod
|
||||||
def main(cls):
|
def main(cls):
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -65260,3 +65260,10 @@
|
|||||||
|
|
||||||
2025-09-19 13:57:31.840 | ERROR | mysql_agent:293 - 表 collector_rss_subscriptions 插入失败记录详情
|
2025-09-19 13:57:31.840 | ERROR | mysql_agent:293 - 表 collector_rss_subscriptions 插入失败记录详情
|
||||||
|
|
||||||
|
2025-09-25 09:26:52.783 | ERROR | mysql_agent:293 - 表 collector_rss_subscriptions 插入失败记录详情
|
||||||
|
|
||||||
|
2025-10-16 11:32:41.446 | ERROR | mysql_agent:304 - 表 collector_rss_subscriptions 插入失败记录详情
|
||||||
|
→ module: 'MySQLAgent(Windows)'
|
||||||
|
→ failed_records_summary: [{'index': 7, 'type': 'duplicate', 'error_code': 1062, 'error_message': "Duplicate entry '中外专家学者齐聚广西共探六大前沿材料领域发å' for key 'collector_rss_subscriptions.idx_title_pubtime'"}, {'index': 9, 'type': 'du...
|
||||||
|
→ detailed_failed_records: [{'index': 7, 'type': 'duplicate', 'error_code': 1062, 'error_message': "Duplicate entry '中外专家学者齐聚广西共探六大前沿材料领域发å' for key 'collector_rss_subscriptions.idx_title_pubtime'", 'record': {'文章标题': '中外专家学...
|
||||||
|
|
||||||
|
|||||||
Binary file not shown.
@@ -0,0 +1 @@
|
|||||||
|
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
+8
-3
@@ -76,7 +76,11 @@ class CrossPlatformLog:
|
|||||||
if key == "extra_output": # 跳过自己,避免递归
|
if key == "extra_output": # 跳过自己,避免递归
|
||||||
continue
|
continue
|
||||||
value_repr = repr(value)
|
value_repr = repr(value)
|
||||||
if len(value_repr) > 200:
|
# 对于错误信息,增加截断长度限制,避免丢失重要信息
|
||||||
|
if key in ["error", "error_message", "sql", "params"]:
|
||||||
|
if len(value_repr) > 500:
|
||||||
|
value_repr = value_repr[:497] + "..."
|
||||||
|
elif len(value_repr) > 200:
|
||||||
value_repr = value_repr[:197] + "..."
|
value_repr = value_repr[:197] + "..."
|
||||||
extra_items.append(f"\n → {key}: {value_repr}")
|
extra_items.append(f"\n → {key}: {value_repr}")
|
||||||
extra_str = "".join(extra_items)
|
extra_str = "".join(extra_items)
|
||||||
@@ -92,9 +96,10 @@ class CrossPlatformLog:
|
|||||||
logger.add(
|
logger.add(
|
||||||
str(error_log),
|
str(error_log),
|
||||||
level="ERROR",
|
level="ERROR",
|
||||||
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | ERROR | {module}:{line} - {message}\n{exception}",
|
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | ERROR | {module}:{line} - {message}{extra[extra_output]}\n{exception}",
|
||||||
rotation="10 MB",
|
rotation="10 MB",
|
||||||
retention="90 days"
|
retention="90 days",
|
||||||
|
enqueue=True
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|||||||
+22
-4
@@ -87,7 +87,13 @@ class MySQLAgent:
|
|||||||
self.log.warning("Windows连接超时,正在重试...")
|
self.log.warning("Windows连接超时,正在重试...")
|
||||||
return self._retry_connection()
|
return self._retry_connection()
|
||||||
|
|
||||||
self.log.error("连接失败", error=error_msg, exc_info=True)
|
self.log.error("连接失败",
|
||||||
|
error=error_msg,
|
||||||
|
error_type=type(e).__name__,
|
||||||
|
host=self.config.get('host'),
|
||||||
|
port=self.config.get('port'),
|
||||||
|
database=self.config.get('database'),
|
||||||
|
exc_info=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _retry_connection(self, max_retries: int = 3) -> Any | None:
|
def _retry_connection(self, max_retries: int = 3) -> Any | None:
|
||||||
@@ -129,7 +135,12 @@ class MySQLAgent:
|
|||||||
return df
|
return df
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.error("SQL查询失败", sql=sql, params=params, error=str(e), exc_info=True)
|
self.log.error("SQL查询失败",
|
||||||
|
sql=sql,
|
||||||
|
params=params,
|
||||||
|
error=str(e),
|
||||||
|
error_type=type(e).__name__,
|
||||||
|
exc_info=True)
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
if 'engine' in locals():
|
if 'engine' in locals():
|
||||||
@@ -309,12 +320,18 @@ class MySQLAgent:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
if conn:
|
if conn:
|
||||||
conn.rollback()
|
conn.rollback()
|
||||||
self.log.error(f"表 {table_name} 批量插入失败", error=str(e), exc_info=True)
|
self.log.error(f"表 {table_name} 批量插入失败",
|
||||||
|
error=str(e),
|
||||||
|
error_type=type(e).__name__,
|
||||||
|
table_name=table_name,
|
||||||
|
total_records=len(df) if not df.empty else 0,
|
||||||
|
exc_info=True)
|
||||||
# 记录事务回滚时的失败记录
|
# 记录事务回滚时的失败记录
|
||||||
if failed_records:
|
if failed_records:
|
||||||
self.log.error(
|
self.log.error(
|
||||||
f"表 {table_name} 事务回滚,已失败的记录",
|
f"表 {table_name} 事务回滚,已失败的记录",
|
||||||
failed_records=failed_records
|
failed_records=failed_records,
|
||||||
|
failed_count=len(failed_records)
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
@@ -601,6 +618,7 @@ class MySQLAgent:
|
|||||||
sql=sql,
|
sql=sql,
|
||||||
params=params,
|
params=params,
|
||||||
error=str(e),
|
error=str(e),
|
||||||
|
error_type=type(e).__name__,
|
||||||
exc_info=True)
|
exc_info=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user