修复news-id冲突问题
This commit is contained in:
@@ -25,14 +25,15 @@ except ImportError:
|
|||||||
|
|
||||||
from config import settings
|
from config import settings
|
||||||
|
|
||||||
|
|
||||||
class DatabaseManager:
|
class DatabaseManager:
|
||||||
"""数据库管理器"""
|
"""数据库管理器"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
"""初始化数据库管理器"""
|
"""初始化数据库管理器"""
|
||||||
self.engine: Engine = None
|
self.engine: Engine = None
|
||||||
self.connect()
|
self.connect()
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
"""连接数据库"""
|
"""连接数据库"""
|
||||||
try:
|
try:
|
||||||
@@ -46,46 +47,47 @@ class DatabaseManager:
|
|||||||
except ModuleNotFoundError as e:
|
except ModuleNotFoundError as e:
|
||||||
missing: str = str(e)
|
missing: str = str(e)
|
||||||
if "psycopg" in missing:
|
if "psycopg" in missing:
|
||||||
logger.error("数据库连接失败: 未安装PostgreSQL驱动 psycopg。请安装: psycopg[binary]。参考指令:uv pip install psycopg[binary]")
|
logger.error(
|
||||||
|
"数据库连接失败: 未安装PostgreSQL驱动 psycopg。请安装: psycopg[binary]。参考指令:uv pip install psycopg[binary]")
|
||||||
elif "pymysql" in missing:
|
elif "pymysql" in missing:
|
||||||
logger.error("数据库连接失败: 未安装MySQL驱动 pymysql。请安装: pymysql。参考指令:uv pip install pymysql")
|
logger.error("数据库连接失败: 未安装MySQL驱动 pymysql。请安装: pymysql。参考指令:uv pip install pymysql")
|
||||||
else:
|
else:
|
||||||
logger.error(f"数据库连接失败(缺少驱动): {e}")
|
logger.error(f"数据库连接失败(缺少驱动): {e}")
|
||||||
raise
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"数据库连接失败: {e}")
|
logger.exception(f"数据库连接失败: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
"""关闭数据库连接"""
|
"""关闭数据库连接"""
|
||||||
if self.engine:
|
if self.engine:
|
||||||
self.engine.dispose()
|
self.engine.dispose()
|
||||||
logger.info("数据库连接已关闭")
|
logger.info("数据库连接已关闭")
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
self.close()
|
self.close()
|
||||||
|
|
||||||
# ==================== 新闻数据操作 ====================
|
# ==================== 新闻数据操作 ====================
|
||||||
|
|
||||||
def save_daily_news(self, news_data: List[Dict], crawl_date: date = None) -> int:
|
def save_daily_news(self, news_data: List[Dict], crawl_date: date = None) -> int:
|
||||||
"""
|
"""
|
||||||
保存每日新闻数据,如果当天已有数据则覆盖
|
保存每日新闻数据,如果当天已有数据则覆盖
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
news_data: 新闻数据列表
|
news_data: 新闻数据列表
|
||||||
crawl_date: 爬取日期,默认为今天
|
crawl_date: 爬取日期,默认为今天
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
保存的新闻数量
|
保存的新闻数量
|
||||||
"""
|
"""
|
||||||
if not crawl_date:
|
if not crawl_date:
|
||||||
crawl_date = date.today()
|
crawl_date = date.today()
|
||||||
|
|
||||||
current_timestamp = int(datetime.now().timestamp())
|
current_timestamp = int(datetime.now().timestamp())
|
||||||
|
|
||||||
try:
|
try:
|
||||||
saved_count = 0
|
saved_count = 0
|
||||||
# 先独立事务执行删除,防止后续插入失败导致无法清理
|
# 先独立事务执行删除,防止后续插入失败导致无法清理
|
||||||
@@ -97,7 +99,13 @@ class DatabaseManager:
|
|||||||
# 逐条插入,单条失败不影响后续(每条独立事务)
|
# 逐条插入,单条失败不影响后续(每条独立事务)
|
||||||
for news_item in news_data:
|
for news_item in news_data:
|
||||||
try:
|
try:
|
||||||
news_id = f"{news_item.get('source', 'unknown')}_{news_item.get('id', news_item.get('rank', 0))}"
|
# news_item.get('id') 已经是完整的 news_id(格式:source_item_id)
|
||||||
|
# 为了支持同一条新闻在不同日期出现,将 crawl_date 加入到 news_id 中
|
||||||
|
base_news_id = news_item.get(
|
||||||
|
'id') or f"{news_item.get('source', 'unknown')}_rank_{news_item.get('rank', 0)}"
|
||||||
|
# 将日期格式化为字符串并加入到 news_id 中,确保全局唯一性
|
||||||
|
news_id = f"{base_news_id}_{crawl_date.strftime('%Y%m%d')}"
|
||||||
|
|
||||||
title_val = (news_item.get("title", "") or "")
|
title_val = (news_item.get("title", "") or "")
|
||||||
if len(title_val) > 500:
|
if len(title_val) > 500:
|
||||||
title_val = title_val[:500]
|
title_val = title_val[:500]
|
||||||
@@ -124,27 +132,27 @@ class DatabaseManager:
|
|||||||
)
|
)
|
||||||
saved_count += 1
|
saved_count += 1
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"保存单条新闻失败: {e}")
|
logger.exception(f"保存单条新闻失败: {e}")
|
||||||
continue
|
continue
|
||||||
logger.info(f"成功保存 {saved_count} 条新闻记录")
|
logger.info(f"成功保存 {saved_count} 条新闻记录")
|
||||||
return saved_count
|
return saved_count
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(f"保存新闻数据失败: {e}")
|
logger.exception(f"保存新闻数据失败: {e}")
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
def get_daily_news(self, crawl_date: date = None) -> List[Dict]:
|
def get_daily_news(self, crawl_date: date = None) -> List[Dict]:
|
||||||
"""
|
"""
|
||||||
获取每日新闻数据
|
获取每日新闻数据
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
crawl_date: 爬取日期,默认为今天
|
crawl_date: 爬取日期,默认为今天
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
新闻列表
|
新闻列表
|
||||||
"""
|
"""
|
||||||
if not crawl_date:
|
if not crawl_date:
|
||||||
crawl_date = date.today()
|
crawl_date = date.today()
|
||||||
|
|
||||||
query = (
|
query = (
|
||||||
"SELECT * FROM daily_news WHERE crawl_date = :d ORDER BY rank_position ASC"
|
"SELECT * FROM daily_news WHERE crawl_date = :d ORDER BY rank_position ASC"
|
||||||
)
|
)
|
||||||
@@ -152,39 +160,43 @@ class DatabaseManager:
|
|||||||
result = conn.execute(text(query), {"d": crawl_date})
|
result = conn.execute(text(query), {"d": crawl_date})
|
||||||
rows = result.mappings().all()
|
rows = result.mappings().all()
|
||||||
return rows
|
return rows
|
||||||
|
|
||||||
# ==================== 话题数据操作 ====================
|
# ==================== 话题数据操作 ====================
|
||||||
|
|
||||||
def save_daily_topics(self, keywords: List[str], summary: str, extract_date: date = None) -> bool:
|
def save_daily_topics(self, keywords: List[str], summary: str, extract_date: date = None) -> bool:
|
||||||
"""
|
"""
|
||||||
保存每日话题分析
|
保存每日话题分析
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
keywords: 话题关键词列表
|
keywords: 话题关键词列表
|
||||||
summary: 新闻分析总结
|
summary: 新闻分析总结
|
||||||
extract_date: 提取日期,默认为今天
|
extract_date: 提取日期,默认为今天
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
是否保存成功
|
是否保存成功
|
||||||
"""
|
"""
|
||||||
if not extract_date:
|
if not extract_date:
|
||||||
extract_date = date.today()
|
extract_date = date.today()
|
||||||
|
|
||||||
current_timestamp = int(datetime.now().timestamp())
|
current_timestamp = int(datetime.now().timestamp())
|
||||||
|
|
||||||
try:
|
try:
|
||||||
keywords_json = json.dumps(keywords, ensure_ascii=False)
|
keywords_json = json.dumps(keywords, ensure_ascii=False)
|
||||||
|
# 为了支持外键引用,topic_id 需要全局唯一,所以将日期加入到 topic_id 中
|
||||||
|
topic_id = f"summary_{extract_date.strftime('%Y%m%d')}"
|
||||||
|
|
||||||
with self.engine.begin() as conn:
|
with self.engine.begin() as conn:
|
||||||
check = conn.execute(
|
check = conn.execute(
|
||||||
text("SELECT id FROM daily_topics WHERE extract_date = :d AND topic_id = :tid"),
|
text("SELECT id FROM daily_topics WHERE extract_date = :d AND topic_id = :tid"),
|
||||||
{"d": extract_date, "tid": "summary"},
|
{"d": extract_date, "tid": topic_id},
|
||||||
).first()
|
).first()
|
||||||
if check:
|
if check:
|
||||||
conn.execute(
|
conn.execute(
|
||||||
text(
|
text(
|
||||||
"UPDATE daily_topics SET keywords = :k, topic_description = :s, add_ts = :ts, last_modify_ts = :lmt, topic_name = :tn WHERE extract_date = :d AND topic_id = :tid"
|
"UPDATE daily_topics SET keywords = :k, topic_description = :s, add_ts = :ts, last_modify_ts = :lmt, topic_name = :tn WHERE extract_date = :d AND topic_id = :tid"
|
||||||
),
|
),
|
||||||
{"k": keywords_json, "s": summary, "ts": current_timestamp, "lmt": current_timestamp, "d": extract_date, "tid": "summary", "tn": "每日新闻分析"},
|
{"k": keywords_json, "s": summary, "ts": current_timestamp, "lmt": current_timestamp,
|
||||||
|
"d": extract_date, "tid": topic_id, "tn": "每日新闻分析"},
|
||||||
)
|
)
|
||||||
logger.info(f"更新了 {extract_date} 的话题分析")
|
logger.info(f"更新了 {extract_date} 的话题分析")
|
||||||
else:
|
else:
|
||||||
@@ -192,30 +204,32 @@ class DatabaseManager:
|
|||||||
text(
|
text(
|
||||||
"INSERT INTO daily_topics (extract_date, topic_id, topic_name, keywords, topic_description, add_ts, last_modify_ts) VALUES (:d, :tid, :tn, :k, :s, :ts, :lmt)"
|
"INSERT INTO daily_topics (extract_date, topic_id, topic_name, keywords, topic_description, add_ts, last_modify_ts) VALUES (:d, :tid, :tn, :k, :s, :ts, :lmt)"
|
||||||
),
|
),
|
||||||
{"d": extract_date, "tid": "summary", "tn": "每日新闻分析", "k": keywords_json, "s": summary, "ts": current_timestamp, "lmt": current_timestamp},
|
{"d": extract_date, "tid": topic_id, "tn": "每日新闻分析", "k": keywords_json, "s": summary,
|
||||||
|
"ts": current_timestamp, "lmt": current_timestamp},
|
||||||
)
|
)
|
||||||
logger.info(f"保存了 {extract_date} 的话题分析")
|
logger.info(f"保存了 {extract_date} 的话题分析")
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(f"保存话题分析失败: {e}")
|
logger.exception(f"保存话题分析失败: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get_daily_topics(self, extract_date: date = None) -> Optional[Dict]:
|
def get_daily_topics(self, extract_date: date = None) -> Optional[Dict]:
|
||||||
"""
|
"""
|
||||||
获取每日话题分析
|
获取每日话题分析
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
extract_date: 提取日期,默认为今天
|
extract_date: 提取日期,默认为今天
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
话题分析数据,如果不存在返回None
|
话题分析数据,如果不存在返回None
|
||||||
"""
|
"""
|
||||||
if not extract_date:
|
if not extract_date:
|
||||||
extract_date = date.today()
|
extract_date = date.today()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with self.engine.connect() as conn:
|
with self.engine.connect() as conn:
|
||||||
result = conn.execute(text("SELECT * FROM daily_topics WHERE extract_date = :d"), {"d": extract_date}).mappings().first()
|
result = conn.execute(text("SELECT * FROM daily_topics WHERE extract_date = :d"),
|
||||||
|
{"d": extract_date}).mappings().first()
|
||||||
if result:
|
if result:
|
||||||
result = dict(result) # 转为可变dict以支持item赋值
|
result = dict(result) # 转为可变dict以支持item赋值
|
||||||
result["keywords"] = json.loads(result["keywords"]) if result.get("keywords") else []
|
result["keywords"] = json.loads(result["keywords"]) if result.get("keywords") else []
|
||||||
@@ -224,14 +238,14 @@ class DatabaseManager:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(f"获取话题分析失败: {e}")
|
logger.exception(f"获取话题分析失败: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def get_recent_topics(self, days: int = 7) -> List[Dict]:
|
def get_recent_topics(self, days: int = 7) -> List[Dict]:
|
||||||
"""
|
"""
|
||||||
获取最近几天的话题分析
|
获取最近几天的话题分析
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
days: 天数
|
days: 天数
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
话题分析列表
|
话题分析列表
|
||||||
"""
|
"""
|
||||||
@@ -254,9 +268,9 @@ class DatabaseManager:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(f"获取最近话题分析失败: {e}")
|
logger.exception(f"获取最近话题分析失败: {e}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# ==================== 统计查询 ====================
|
# ==================== 统计查询 ====================
|
||||||
|
|
||||||
def get_summary_stats(self, days: int = 7) -> Dict:
|
def get_summary_stats(self, days: int = 7) -> Dict:
|
||||||
"""获取统计摘要"""
|
"""获取统计摘要"""
|
||||||
try:
|
try:
|
||||||
@@ -290,18 +304,19 @@ class DatabaseManager:
|
|||||||
logger.exception(f"获取统计摘要失败: {e}")
|
logger.exception(f"获取统计摘要失败: {e}")
|
||||||
return {"news_stats": [], "topics_stats": []}
|
return {"news_stats": [], "topics_stats": []}
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# 测试数据库管理器
|
# 测试数据库管理器
|
||||||
with DatabaseManager() as db:
|
with DatabaseManager() as db:
|
||||||
# 测试获取新闻
|
# 测试获取新闻
|
||||||
news = db.get_daily_news()
|
news = db.get_daily_news()
|
||||||
logger.info(f"今日新闻数量: {len(news)}")
|
logger.info(f"今日新闻数量: {len(news)}")
|
||||||
|
|
||||||
# 测试获取话题
|
# 测试获取话题
|
||||||
topics = db.get_daily_topics()
|
topics = db.get_daily_topics()
|
||||||
if topics:
|
if topics:
|
||||||
logger.info(f"今日话题关键词: {topics['keywords']}")
|
logger.info(f"今日话题关键词: {topics['keywords']}")
|
||||||
else:
|
else:
|
||||||
logger.info("今日暂无话题分析")
|
logger.info("今日暂无话题分析")
|
||||||
|
|
||||||
logger.info("简化数据库管理器测试完成!")
|
logger.info("简化数据库管理器测试完成!")
|
||||||
|
|||||||
Reference in New Issue
Block a user