From 96c7d2d3b730a0f0742d0fb832169b8580c112c2 Mon Sep 17 00:00:00 2001 From: Doiiars Date: Mon, 3 Nov 2025 22:38:34 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=E5=90=8C=E6=AD=A5MediaCrawler=E4=B8=BA?= =?UTF-8?q?=E6=9C=80=E6=96=B0=E7=89=88=E6=9C=AC=202.=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93not=20null=E9=94=99=E8=AF=AF=203.=20?= =?UTF-8?q?=E6=94=AF=E6=8C=81PG=E6=95=B0=E6=8D=AE=E5=BA=93=204.=20?= =?UTF-8?q?=E8=A7=84=E8=8C=83=E7=8E=AF=E5=A2=83=E5=8F=98=E9=87=8F=E5=8F=8A?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E4=BD=BF=E7=94=A8=205.=20=E8=A7=84=E8=8C=83?= =?UTF-8?q?=E4=B8=BAuv=E5=AE=89=E8=A3=85=206.=20=E4=BD=BF=E7=94=A8loggru?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../BroadTopicExtraction/database_manager.py | 302 +++++++++--------- MindSpider/BroadTopicExtraction/main.py | 101 +++--- .../BroadTopicExtraction/topic_extractor.py | 11 +- .../DeepSentimentCrawling/keyword_manager.py | 113 ++++--- .../DeepSentimentCrawling/platform_crawler.py | 151 ++++++--- MindSpider/README.md | 41 ++- MindSpider/config.py | 38 ++- MindSpider/main.py | 219 +++++++------ MindSpider/requirements.txt | 4 + MindSpider/schema/db_manager.py | 268 +++++++++------- MindSpider/schema/init_database.py | 63 ++-- 11 files changed, 737 insertions(+), 574 deletions(-) diff --git a/MindSpider/BroadTopicExtraction/database_manager.py b/MindSpider/BroadTopicExtraction/database_manager.py index 2e2bf21..8f1ede7 100644 --- a/MindSpider/BroadTopicExtraction/database_manager.py +++ b/MindSpider/BroadTopicExtraction/database_manager.py @@ -7,11 +7,12 @@ BroadTopicExtraction模块 - 数据库管理器 import sys import json -from datetime import datetime, date +from datetime import datetime, date, timedelta from pathlib import Path from typing import List, Dict, Optional -import pymysql -from pymysql.cursors import DictCursor +from sqlalchemy import create_engine, text +from sqlalchemy.engine import Engine +from loguru import logger # 添加项目根目录到路径 project_root = Path(__file__).parent.parent @@ -22,37 +23,44 @@ try: except ImportError: raise ImportError("无法导入config.py配置文件") +from config import settings + class DatabaseManager: """数据库管理器""" def __init__(self): """初始化数据库管理器""" - self.connection = None + self.engine: Engine = None self.connect() def connect(self): """连接数据库""" try: - self.connection = pymysql.connect( - host=config.DB_HOST, - port=config.DB_PORT, - user=config.DB_USER, - password=config.DB_PASSWORD, - database=config.DB_NAME, - charset=config.DB_CHARSET, - autocommit=True, - cursorclass=DictCursor - ) - print(f"成功连接到数据库: {config.DB_NAME}") + dialect = (settings.DB_DIALECT or "mysql").lower() + if dialect in ("postgresql", "postgres"): + url = f"postgresql+psycopg://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}" + else: + url = f"mysql+pymysql://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}?charset={settings.DB_CHARSET}" + self.engine = create_engine(url, future=True) + logger.info(f"成功连接到数据库: {settings.DB_NAME}") + except ModuleNotFoundError as e: + missing: str = str(e) + if "psycopg" in missing: + logger.error("数据库连接失败: 未安装PostgreSQL驱动 psycopg。请安装: psycopg[binary]。参考指令:uv pip install psycopg[binary]") + elif "pymysql" in missing: + logger.error("数据库连接失败: 未安装MySQL驱动 pymysql。请安装: pymysql。参考指令:uv pip install pymysql") + else: + logger.error(f"数据库连接失败(缺少驱动): {e}") + raise except Exception as e: - print(f"数据库连接失败: {e}") + logger.error(f"数据库连接失败: {e}") raise def close(self): """关闭数据库连接""" - if self.connection: - self.connection.close() - print("数据库连接已关闭") + if self.engine: + self.engine.dispose() + logger.info("数据库连接已关闭") def __enter__(self): return self @@ -79,48 +87,49 @@ class DatabaseManager: current_timestamp = int(datetime.now().timestamp()) try: - cursor = self.connection.cursor() - - # 先删除当天所有的新闻记录(覆盖模式) - delete_query = "DELETE FROM daily_news WHERE crawl_date = %s" - deleted_count = cursor.execute(delete_query, (crawl_date,)) - if deleted_count > 0: - print(f"覆盖模式:删除了当天已有的 {deleted_count} 条新闻记录") - - # 批量插入新记录 saved_count = 0 + # 先独立事务执行删除,防止后续插入失败导致无法清理 + with self.engine.begin() as conn: + deleted = conn.execute(text("DELETE FROM daily_news WHERE crawl_date = :d"), {"d": crawl_date}).rowcount + if deleted and deleted > 0: + logger.info(f"覆盖模式:删除了当天已有的 {deleted} 条新闻记录") + + # 逐条插入,单条失败不影响后续(每条独立事务) for news_item in news_data: try: - # 简化的新闻ID生成 news_id = f"{news_item.get('source', 'unknown')}_{news_item.get('id', news_item.get('rank', 0))}" - - # 插入新记录 - insert_query = """ - INSERT INTO daily_news ( - news_id, source_platform, title, url, crawl_date, - rank_position, add_ts - ) VALUES (%s, %s, %s, %s, %s, %s, %s) - """ - cursor.execute(insert_query, ( - news_id, - news_item.get('source', 'unknown'), - news_item.get('title', ''), - news_item.get('url', ''), - crawl_date, - news_item.get('rank', None), - current_timestamp - )) + title_val = (news_item.get("title", "") or "") + if len(title_val) > 500: + title_val = title_val[:500] + with self.engine.begin() as conn: + conn.execute( + text( + """ + INSERT INTO daily_news ( + news_id, source_platform, title, url, crawl_date, + rank_position, add_ts, last_modify_ts + ) VALUES (:news_id, :source_platform, :title, :url, :crawl_date, :rank_position, :add_ts, :last_modify_ts) + """ + ), + { + "news_id": news_id, + "source_platform": news_item.get("source", "unknown"), + "title": title_val, + "url": news_item.get("url", ""), + "crawl_date": crawl_date, + "rank_position": news_item.get("rank", None), + "add_ts": current_timestamp, + "last_modify_ts": current_timestamp, + }, + ) saved_count += 1 - except Exception as e: - print(f"保存单条新闻失败: {e}") + logger.warning(f"保存单条新闻失败: {e}") continue - - print(f"成功保存 {saved_count} 条新闻记录") + logger.info(f"成功保存 {saved_count} 条新闻记录") return saved_count - except Exception as e: - print(f"保存新闻数据失败: {e}") + logger.exception(f"保存新闻数据失败: {e}") return 0 def get_daily_news(self, crawl_date: date = None) -> List[Dict]: @@ -136,15 +145,13 @@ class DatabaseManager: if not crawl_date: crawl_date = date.today() - query = """ - SELECT * FROM daily_news - WHERE crawl_date = %s - ORDER BY rank_position ASC - """ - - cursor = self.connection.cursor() - cursor.execute(query, (crawl_date,)) - return cursor.fetchall() + query = ( + "SELECT * FROM daily_news WHERE crawl_date = :d ORDER BY rank_position ASC" + ) + with self.engine.connect() as conn: + result = conn.execute(text(query), {"d": crawl_date}) + rows = result.mappings().all() + return rows # ==================== 话题数据操作 ==================== @@ -166,37 +173,31 @@ class DatabaseManager: current_timestamp = int(datetime.now().timestamp()) try: - cursor = self.connection.cursor() - - # 检查今天是否已有记录 - check_query = "SELECT id FROM daily_topics WHERE extract_date = %s" - cursor.execute(check_query, (extract_date,)) - existing = cursor.fetchone() - keywords_json = json.dumps(keywords, ensure_ascii=False) - - if existing: - # 更新现有记录 - update_query = """ - UPDATE daily_topics - SET keywords = %s, summary = %s, add_ts = %s - WHERE extract_date = %s - """ - cursor.execute(update_query, (keywords_json, summary, current_timestamp, extract_date)) - print(f"更新了 {extract_date} 的话题分析") - else: - # 插入新记录 - insert_query = """ - INSERT INTO daily_topics (extract_date, keywords, summary, add_ts) - VALUES (%s, %s, %s, %s) - """ - cursor.execute(insert_query, (extract_date, keywords_json, summary, current_timestamp)) - print(f"保存了 {extract_date} 的话题分析") - + with self.engine.begin() as conn: + check = conn.execute( + text("SELECT id FROM daily_topics WHERE extract_date = :d AND topic_id = :tid"), + {"d": extract_date, "tid": "summary"}, + ).first() + if check: + conn.execute( + text( + "UPDATE daily_topics SET keywords = :k, topic_description = :s, add_ts = :ts, last_modify_ts = :lmt, topic_name = :tn WHERE extract_date = :d AND topic_id = :tid" + ), + {"k": keywords_json, "s": summary, "ts": current_timestamp, "lmt": current_timestamp, "d": extract_date, "tid": "summary", "tn": "每日新闻分析"}, + ) + logger.info(f"更新了 {extract_date} 的话题分析") + else: + conn.execute( + text( + "INSERT INTO daily_topics (extract_date, topic_id, topic_name, keywords, topic_description, add_ts, last_modify_ts) VALUES (:d, :tid, :tn, :k, :s, :ts, :lmt)" + ), + {"d": extract_date, "tid": "summary", "tn": "每日新闻分析", "k": keywords_json, "s": summary, "ts": current_timestamp, "lmt": current_timestamp}, + ) + logger.info(f"保存了 {extract_date} 的话题分析") return True - except Exception as e: - print(f"保存话题分析失败: {e}") + logger.exception(f"保存话题分析失败: {e}") return False def get_daily_topics(self, extract_date: date = None) -> Optional[Dict]: @@ -213,20 +214,15 @@ class DatabaseManager: extract_date = date.today() try: - cursor = self.connection.cursor() - query = "SELECT * FROM daily_topics WHERE extract_date = %s" - cursor.execute(query, (extract_date,)) - result = cursor.fetchone() - - if result: - # 解析关键词JSON - result['keywords'] = json.loads(result['keywords']) - return result - else: + with self.engine.connect() as conn: + result = conn.execute(text("SELECT * FROM daily_topics WHERE extract_date = :d"), {"d": extract_date}).mappings().first() + if result: + result = dict(result) # 转为可变dict以支持item赋值 + result["keywords"] = json.loads(result["keywords"]) if result.get("keywords") else [] + return result return None - except Exception as e: - print(f"获取话题分析失败: {e}") + logger.exception(f"获取话题分析失败: {e}") return None def get_recent_topics(self, days: int = 7) -> List[Dict]: @@ -240,23 +236,23 @@ class DatabaseManager: 话题分析列表 """ try: - cursor = self.connection.cursor() - query = """ - SELECT * FROM daily_topics - WHERE extract_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) - ORDER BY extract_date DESC - """ - cursor.execute(query, (days,)) - results = cursor.fetchall() - - # 解析每个结果的关键词JSON - for result in results: - result['keywords'] = json.loads(result['keywords']) - - return results - + start_date = date.today() - timedelta(days=days) + with self.engine.connect() as conn: + results = conn.execute( + text( + """ + SELECT * FROM daily_topics + WHERE extract_date >= :start_date + ORDER BY extract_date DESC + """ + ), + {"start_date": start_date}, + ).mappings().all() + for r in results: + r["keywords"] = json.loads(r["keywords"]) if r.get("keywords") else [] + return results except Exception as e: - print(f"获取最近话题分析失败: {e}") + logger.exception(f"获取最近话题分析失败: {e}") return [] # ==================== 统计查询 ==================== @@ -264,56 +260,48 @@ class DatabaseManager: def get_summary_stats(self, days: int = 7) -> Dict: """获取统计摘要""" try: - cursor = self.connection.cursor() - - # 新闻统计 - news_query = """ - SELECT - crawl_date, - COUNT(*) as news_count, - COUNT(DISTINCT source_platform) as platforms_count - FROM daily_news - WHERE crawl_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) - GROUP BY crawl_date - ORDER BY crawl_date DESC - """ - cursor.execute(news_query, (days,)) - news_stats = cursor.fetchall() - - # 话题统计 - topics_query = """ - SELECT - extract_date, - keywords, - CHAR_LENGTH(summary) as summary_length - FROM daily_topics - WHERE extract_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) - ORDER BY extract_date DESC - """ - cursor.execute(topics_query, (days,)) - topics_stats = cursor.fetchall() - - return { - 'news_stats': news_stats, - 'topics_stats': topics_stats - } - + start_date = date.today() - timedelta(days=days) + with self.engine.connect() as conn: + news_stats = conn.execute( + text( + """ + SELECT crawl_date, COUNT(*) as news_count, COUNT(DISTINCT source_platform) as platforms_count + FROM daily_news + WHERE crawl_date >= :start_date + GROUP BY crawl_date + ORDER BY crawl_date DESC + """ + ), + {"start_date": start_date}, + ).all() + topics_stats = conn.execute( + text( + """ + SELECT extract_date, keywords, CHAR_LENGTH(topic_description) as summary_length + FROM daily_topics + WHERE extract_date >= :start_date + ORDER BY extract_date DESC + """ + ), + {"start_date": start_date}, + ).all() + return {"news_stats": news_stats, "topics_stats": topics_stats} except Exception as e: - print(f"获取统计摘要失败: {e}") - return {'news_stats': [], 'topics_stats': []} + logger.exception(f"获取统计摘要失败: {e}") + return {"news_stats": [], "topics_stats": []} if __name__ == "__main__": # 测试数据库管理器 with DatabaseManager() as db: # 测试获取新闻 news = db.get_daily_news() - print(f"今日新闻数量: {len(news)}") + logger.info(f"今日新闻数量: {len(news)}") # 测试获取话题 topics = db.get_daily_topics() if topics: - print(f"今日话题关键词: {topics['keywords']}") + logger.info(f"今日话题关键词: {topics['keywords']}") else: - print("今日暂无话题分析") + logger.info("今日暂无话题分析") - print("简化数据库管理器测试完成!") + logger.info("简化数据库管理器测试完成!") diff --git a/MindSpider/BroadTopicExtraction/main.py b/MindSpider/BroadTopicExtraction/main.py index 7160ed4..438db73 100644 --- a/MindSpider/BroadTopicExtraction/main.py +++ b/MindSpider/BroadTopicExtraction/main.py @@ -11,6 +11,7 @@ import argparse from datetime import datetime, date from pathlib import Path from typing import List, Dict, Optional +from loguru import logger # 添加项目根目录到路径 project_root = Path(__file__).parent.parent @@ -21,8 +22,8 @@ try: from BroadTopicExtraction.topic_extractor import TopicExtractor from BroadTopicExtraction.database_manager import DatabaseManager except ImportError as e: - print(f"导入模块失败: {e}") - print("请确保在项目根目录运行,并且已安装所有依赖") + logger.exception(f"导入模块失败: {e}") + logger.error("请确保在项目根目录运行,并且已安装所有依赖") sys.exit(1) class BroadTopicExtraction: @@ -34,7 +35,7 @@ class BroadTopicExtraction: self.topic_extractor = TopicExtractor() self.db_manager = DatabaseManager() - print("BroadTopicExtraction 初始化完成") + logger.info("BroadTopicExtraction 初始化完成") def close(self): """关闭资源""" @@ -68,21 +69,22 @@ class BroadTopicExtraction: Returns: 包含完整提取结果的字典 """ - print("\n" + "=" * 80) - print("MindSpider AI爬虫 - 每日话题提取") - print("=" * 80) - print(f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - print(f"目标日期: {date.today()}") + extraction_result_message = "" + extraction_result_message += "\nMindSpider AI爬虫 - 每日话题提取\n" + extraction_result_message += f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" + extraction_result_message += f"目标日期: {date.today()}\n" if news_sources: - print(f"指定平台: {len(news_sources)} 个") + extraction_result_message += f"指定平台: {len(news_sources)} 个\n" for source in news_sources: source_name = SOURCE_NAMES.get(source, source) - print(f" - {source_name}") + extraction_result_message += f" - {source_name}\n" else: - print(f"爬取平台: 全部 {len(SOURCE_NAMES)} 个平台") + extraction_result_message += f"爬取平台: 全部 {len(SOURCE_NAMES)} 个平台\n" - print(f"关键词数: 最多 {max_keywords} 个") + extraction_result_message += f"关键词数: 最多 {max_keywords} 个\n" + + logger.info(extraction_result_message) extraction_result = { 'success': False, @@ -96,7 +98,7 @@ class BroadTopicExtraction: try: # 步骤1: 收集新闻 - print("\n【步骤1】收集热点新闻...") + logger.info("【步骤1】收集热点新闻...") news_result = await self.news_collector.collect_and_save_news( sources=news_sources ) @@ -112,7 +114,7 @@ class BroadTopicExtraction: raise Exception("新闻收集失败或没有获取到新闻") # 步骤2: 提取关键词和生成总结 - print("\n【步骤2】提取关键词和生成总结...") + logger.info("【步骤2】提取关键词和生成总结...") keywords, summary = self.topic_extractor.extract_keywords_and_summary( news_result['news_list'], max_keywords=max_keywords @@ -126,10 +128,10 @@ class BroadTopicExtraction: } if not keywords: - print("警告: 没有提取到有效关键词") + logger.warning("警告: 没有提取到有效关键词") # 步骤3: 保存到数据库 - print("\n【步骤3】保存分析结果到数据库...") + logger.info("【步骤3】保存分析结果到数据库...") save_success = self.db_manager.save_daily_topics( keywords, summary, date.today() ) @@ -141,56 +143,47 @@ class BroadTopicExtraction: extraction_result['success'] = True extraction_result['end_time'] = datetime.now().isoformat() - print("\n" + "=" * 80) - print("每日话题提取流程完成!") - print("=" * 80) + logger.info("每日话题提取流程完成!") return extraction_result except Exception as e: - print(f"\n话题提取流程失败: {e}") + logger.exception(f"话题提取流程失败: {e}") extraction_result['error'] = str(e) extraction_result['end_time'] = datetime.now().isoformat() return extraction_result def print_extraction_results(self, extraction_result: Dict): """打印提取结果""" - print("\n" + "=" * 80) - print("话题提取结果报告") - print("=" * 80) - - if not extraction_result['success']: - print(f"❌ 提取失败: {extraction_result.get('error', '未知错误')}") - return + extraction_result_message = "" # 新闻收集结果 news_data = extraction_result.get('news_collection', {}) - print(f"📰 新闻收集: {news_data.get('total_news', 0)} 条新闻") - print(f" 成功源数: {news_data.get('successful_sources', 0)}/{news_data.get('total_sources', 0)}") + extraction_result_message += f"\n📰 新闻收集: {news_data.get('total_news', 0)} 条新闻\n" + extraction_result_message += f" 成功源数: {news_data.get('successful_sources', 0)}/{news_data.get('total_sources', 0)}\n" # 话题提取结果 topic_data = extraction_result.get('topic_extraction', {}) keywords = topic_data.get('keywords', []) summary = topic_data.get('summary', '') - print(f"\n🔑 提取关键词: {len(keywords)} 个") + extraction_result_message += f"\n🔑 提取关键词: {len(keywords)} 个\n" if keywords: # 每行显示5个关键词 for i in range(0, len(keywords), 5): keyword_group = keywords[i:i+5] - print(f" {', '.join(keyword_group)}") + extraction_result_message += f" {', '.join(keyword_group)}\n" - print(f"\n📝 新闻总结:") - print(f" {summary}") + extraction_result_message += f"\n📝 新闻总结:\n {summary}\n" # 数据库保存结果 db_data = extraction_result.get('database_save', {}) if db_data.get('success'): - print(f"\n💾 数据库保存: 成功") + extraction_result_message += f"\n💾 数据库保存: 成功\n" else: - print(f"\n💾 数据库保存: 失败") + extraction_result_message += f"\n💾 数据库保存: 失败\n" - print("\n" + "=" * 80) + logger.info(extraction_result_message) def get_keywords_for_crawling(self, extract_date: date = None) -> List[str]: """ @@ -207,7 +200,7 @@ class BroadTopicExtraction: topics_data = self.db_manager.get_daily_topics(extract_date) if not topics_data: - print(f"没有找到 {extract_date or date.today()} 的话题数据") + logger.info(f"没有找到 {extract_date or date.today()} 的话题数据") return [] keywords = topics_data['keywords'] @@ -215,11 +208,11 @@ class BroadTopicExtraction: # 生成搜索关键词 search_keywords = self.topic_extractor.get_search_keywords(keywords) - print(f"准备了 {len(search_keywords)} 个关键词用于爬取") + logger.info(f"准备了 {len(search_keywords)} 个关键词用于爬取") return search_keywords except Exception as e: - print(f"获取爬取关键词失败: {e}") + logger.error(f"获取爬取关键词失败: {e}") return [] def get_daily_analysis(self, target_date: date = None) -> Optional[Dict]: @@ -227,7 +220,7 @@ class BroadTopicExtraction: try: return self.db_manager.get_daily_topics(target_date) except Exception as e: - print(f"获取每日分析失败: {e}") + logger.error(f"获取每日分析失败: {e}") return None def get_recent_analysis(self, days: int = 7) -> List[Dict]: @@ -235,7 +228,7 @@ class BroadTopicExtraction: try: return self.db_manager.get_recent_topics(days) except Exception as e: - print(f"获取最近分析失败: {e}") + logger.error(f"获取最近分析失败: {e}") return [] # ==================== 命令行工具 ==================== @@ -260,17 +253,17 @@ async def run_extraction_command(sources=None, keywords_count=100, show_details= news_data = result.get('news_collection', {}) topic_data = result.get('topic_extraction', {}) - print(f"✅ 话题提取成功完成!") - print(f" 收集新闻: {news_data.get('total_news', 0)} 条") - print(f" 提取关键词: {len(topic_data.get('keywords', []))} 个") - print(f" 生成总结: {len(topic_data.get('summary', ''))} 字符") + logger.info(f"✅ 话题提取成功完成!") + logger.info(f" 收集新闻: {news_data.get('total_news', 0)} 条") + logger.info(f" 提取关键词: {len(topic_data.get('keywords', []))} 个") + logger.info(f" 生成总结: {len(topic_data.get('summary', ''))} 字符") # 获取爬取关键词 crawling_keywords = extractor.get_keywords_for_crawling() if crawling_keywords: - print(f"\n🔑 为DeepSentimentCrawling准备的搜索关键词:") - print(f" {', '.join(crawling_keywords)}") + logger.info(f"\n🔑 为DeepSentimentCrawling准备的搜索关键词:") + logger.info(f" {', '.join(crawling_keywords)}") # 保存关键词到文件 keywords_file = project_root / "data" / "daily_keywords.txt" @@ -279,16 +272,16 @@ async def run_extraction_command(sources=None, keywords_count=100, show_details= with open(keywords_file, 'w', encoding='utf-8') as f: f.write('\n'.join(crawling_keywords)) - print(f" 关键词已保存到: {keywords_file}") + logger.info(f" 关键词已保存到: {keywords_file}") return True else: - print(f"❌ 话题提取失败: {result.get('error', '未知错误')}") + logger.error(f"❌ 话题提取失败: {result.get('error', '未知错误')}") return False except Exception as e: - print(f"❌ 执行过程中发生错误: {e}") + logger.error(f"❌ 执行过程中发生错误: {e}") return False def main(): @@ -304,14 +297,14 @@ def main(): # 显示支持的新闻源 if args.list_sources: - print("支持的新闻源平台:") + logger.info("支持的新闻源平台:") for source, name in SOURCE_NAMES.items(): - print(f" {source:<25} {name}") + logger.info(f" {source:<25} {name}") return # 验证参数 if args.keywords < 1 or args.keywords > 200: - print("关键词数量应在1-200之间") + logger.error("关键词数量应在1-200之间") sys.exit(1) # 运行提取 @@ -325,7 +318,7 @@ def main(): sys.exit(0 if success else 1) except KeyboardInterrupt: - print("\n用户中断操作") + logger.info("用户中断操作") sys.exit(1) if __name__ == "__main__": diff --git a/MindSpider/BroadTopicExtraction/topic_extractor.py b/MindSpider/BroadTopicExtraction/topic_extractor.py index d8329c2..1173e4b 100644 --- a/MindSpider/BroadTopicExtraction/topic_extractor.py +++ b/MindSpider/BroadTopicExtraction/topic_extractor.py @@ -18,19 +18,20 @@ sys.path.append(str(project_root)) try: import config + from config import settings except ImportError: - raise ImportError("无法导入config.py配置文件") + raise ImportError("无法导入settings.py配置文件") class TopicExtractor: """话题提取器""" - + def __init__(self): """初始化话题提取器""" self.client = OpenAI( - api_key=config.DEEPSEEK_API_KEY, - base_url="https://api.deepseek.com" + api_key=settings.MINDSPIDER_API_KEY, + base_url=settings.MINDSPIDER_BASE_URL ) - self.model = "deepseek-chat" + self.model = settings.MINDSPIDER_MODEL_NAME def extract_keywords_and_summary(self, news_list: List[Dict], max_keywords: int = 100) -> Tuple[List[str], str]: """ diff --git a/MindSpider/DeepSentimentCrawling/keyword_manager.py b/MindSpider/DeepSentimentCrawling/keyword_manager.py index 1f1267b..59a19db 100644 --- a/MindSpider/DeepSentimentCrawling/keyword_manager.py +++ b/MindSpider/DeepSentimentCrawling/keyword_manager.py @@ -11,8 +11,8 @@ from datetime import date, timedelta, datetime from pathlib import Path from typing import List, Dict, Optional import random -import pymysql -from pymysql.cursors import DictCursor +from sqlalchemy import create_engine, text +from sqlalchemy.engine import Engine # 添加项目根目录到路径 project_root = Path(__file__).parent.parent @@ -23,30 +23,38 @@ try: except ImportError: raise ImportError("无法导入config.py配置文件") +from config import settings +from loguru import logger + class KeywordManager: """关键词管理器""" def __init__(self): """初始化关键词管理器""" - self.connection = None + self.engine: Engine = None self.connect() def connect(self): """连接数据库""" try: - self.connection = pymysql.connect( - host=config.DB_HOST, - port=config.DB_PORT, - user=config.DB_USER, - password=config.DB_PASSWORD, - database=config.DB_NAME, - charset=config.DB_CHARSET, - autocommit=True, - cursorclass=DictCursor - ) - print(f"关键词管理器成功连接到数据库: {config.DB_NAME}") + dialect = (settings.DB_DIALECT or "mysql").lower() + if dialect in ("postgresql", "postgres"): + url = f"postgresql+psycopg://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}" + else: + url = f"mysql+pymysql://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}?charset={settings.DB_CHARSET}" + self.engine = create_engine(url, future=True) + logger.info(f"关键词管理器成功连接到数据库: {settings.DB_NAME}") + except ModuleNotFoundError as e: + missing: str = str(e) + if "psycopg" in missing: + logger.error("数据库连接失败: 未安装PostgreSQL驱动 psycopg。请安装: psycopg[binary]。参考指令:uv pip install psycopg[binary]") + elif "pymysql" in missing: + logger.error("数据库连接失败: 未安装MySQL驱动 pymysql。请安装: pymysql。参考指令:uv pip install pymysql") + else: + logger.error(f"数据库连接失败(缺少驱动): {e}") + raise except Exception as e: - print(f"关键词管理器数据库连接失败: {e}") + logger.exception(f"关键词管理器数据库连接失败: {e}") raise def get_latest_keywords(self, target_date: date = None, max_keywords: int = 100) -> List[str]: @@ -63,24 +71,24 @@ class KeywordManager: if not target_date: target_date = date.today() - print(f"正在获取 {target_date} 的关键词...") + logger.info(f"正在获取 {target_date} 的关键词...") # 首先尝试获取指定日期的关键词 topics_data = self.get_daily_topics(target_date) if topics_data and topics_data.get('keywords'): keywords = topics_data['keywords'] - print(f"成功获取 {target_date} 的 {len(keywords)} 个关键词") + logger.info(f"成功获取 {target_date} 的 {len(keywords)} 个关键词") # 如果关键词太多,随机选择指定数量 if len(keywords) > max_keywords: keywords = random.sample(keywords, max_keywords) - print(f"随机选择了 {max_keywords} 个关键词") + logger.info(f"随机选择了 {max_keywords} 个关键词") return keywords # 如果没有当天的关键词,尝试获取最近几天的 - print(f"{target_date} 没有关键词数据,尝试获取最近的关键词...") + logger.info(f"{target_date} 没有关键词数据,尝试获取最近的关键词...") recent_topics = self.get_recent_topics(days=7) if recent_topics: @@ -95,11 +103,11 @@ class KeywordManager: if len(unique_keywords) > max_keywords: unique_keywords = random.sample(unique_keywords, max_keywords) - print(f"从最近7天的数据中获取到 {len(unique_keywords)} 个关键词") + logger.info(f"从最近7天的数据中获取到 {len(unique_keywords)} 个关键词") return unique_keywords # 如果都没有,返回默认关键词 - print("没有找到任何关键词数据,使用默认关键词") + logger.info("没有找到任何关键词数据,使用默认关键词") return self._get_default_keywords() def get_daily_topics(self, extract_date: date = None) -> Optional[Dict]: @@ -116,20 +124,22 @@ class KeywordManager: extract_date = date.today() try: - cursor = self.connection.cursor() - query = "SELECT * FROM daily_topics WHERE extract_date = %s" - cursor.execute(query, (extract_date,)) - result = cursor.fetchone() + with self.engine.connect() as conn: + result = conn.execute( + text("SELECT * FROM daily_topics WHERE extract_date = :d"), + {"d": extract_date}, + ).mappings().first() if result: - # 解析关键词JSON - result['keywords'] = json.loads(result['keywords']) + # 转为可变dict再赋值 + result = dict(result) + result['keywords'] = json.loads(result['keywords']) if result.get('keywords') else [] return result else: return None except Exception as e: - print(f"获取话题分析失败: {e}") + logger.exception(f"获取话题分析失败: {e}") return None def get_recent_topics(self, days: int = 7) -> List[Dict]: @@ -143,23 +153,28 @@ class KeywordManager: 话题分析列表 """ try: - cursor = self.connection.cursor() - query = """ - SELECT * FROM daily_topics - WHERE extract_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) - ORDER BY extract_date DESC - """ - cursor.execute(query, (days,)) - results = cursor.fetchall() + start_date = date.today() - timedelta(days=days) + with self.engine.connect() as conn: + results = conn.execute( + text( + """ + SELECT * FROM daily_topics + WHERE extract_date >= :start_date + ORDER BY extract_date DESC + """ + ), + {"start_date": start_date}, + ).mappings().all() - # 解析每个结果的关键词JSON + # 转为可变dict列表再处理 + results = [dict(r) for r in results] for result in results: - result['keywords'] = json.loads(result['keywords']) + result['keywords'] = json.loads(result['keywords']) if result.get('keywords') else [] return results except Exception as e: - print(f"获取最近话题分析失败: {e}") + logger.exception(f"获取最近话题分析失败: {e}") return [] def _get_default_keywords(self) -> List[str]: @@ -190,8 +205,8 @@ class KeywordManager: keywords = self.get_latest_keywords(target_date, max_keywords) if keywords: - print(f"为 {len(platforms)} 个平台准备了相同的 {len(keywords)} 个关键词") - print(f"每个关键词将在所有平台上进行爬取") + logger.info(f"为 {len(platforms)} 个平台准备了相同的 {len(keywords)} 个关键词") + logger.info(f"每个关键词将在所有平台上进行爬取") return keywords @@ -210,7 +225,7 @@ class KeywordManager: """ keywords = self.get_latest_keywords(target_date, max_keywords) - print(f"为平台 {platform} 准备了 {len(keywords)} 个关键词(与其他平台相同)") + logger.info(f"为平台 {platform} 准备了 {len(keywords)} 个关键词(与其他平台相同)") return keywords def _filter_keywords_by_platform(self, keywords: List[str], platform: str) -> List[str]: @@ -290,9 +305,9 @@ class KeywordManager: def close(self): """关闭数据库连接""" - if self.connection: - self.connection.close() - print("关键词管理器数据库连接已关闭") + if self.engine: + self.engine.dispose() + logger.info("关键词管理器数据库连接已关闭") def __enter__(self): return self @@ -305,16 +320,16 @@ if __name__ == "__main__": with KeywordManager() as km: # 测试获取关键词 keywords = km.get_latest_keywords(max_keywords=20) - print(f"获取到的关键词: {keywords}") + logger.info(f"获取到的关键词: {keywords}") # 测试平台分配 platforms = ['xhs', 'dy', 'bili'] distribution = km.distribute_keywords_by_platform(keywords, platforms) for platform, kws in distribution.items(): - print(f"{platform}: {kws}") + logger.info(f"{platform}: {kws}") # 测试爬取摘要 summary = km.get_crawling_summary() - print(f"爬取摘要: {summary}") + logger.info(f"爬取摘要: {summary}") - print("关键词管理器测试完成!") + logger.info("关键词管理器测试完成!") diff --git a/MindSpider/DeepSentimentCrawling/platform_crawler.py b/MindSpider/DeepSentimentCrawling/platform_crawler.py index fc6b467..93622b6 100644 --- a/MindSpider/DeepSentimentCrawling/platform_crawler.py +++ b/MindSpider/DeepSentimentCrawling/platform_crawler.py @@ -13,6 +13,7 @@ from datetime import datetime from pathlib import Path from typing import List, Dict, Optional import json +from loguru import logger # 添加项目根目录到路径 project_root = Path(__file__).parent.parent @@ -36,11 +37,15 @@ class PlatformCrawler: if not self.mediacrawler_path.exists(): raise FileNotFoundError(f"MediaCrawler目录不存在: {self.mediacrawler_path}") - print(f"初始化平台爬虫管理器,MediaCrawler路径: {self.mediacrawler_path}") + logger.info(f"初始化平台爬虫管理器,MediaCrawler路径: {self.mediacrawler_path}") def configure_mediacrawler_db(self): - """配置MediaCrawler使用我们的MySQL数据库""" + """配置MediaCrawler使用我们的数据库(MySQL或PostgreSQL)""" try: + # 判断数据库类型 + db_dialect = (config.settings.DB_DIALECT or "mysql").lower() + is_postgresql = db_dialect in ("postgresql", "postgres") + # 修改MediaCrawler的数据库配置 db_config_path = self.mediacrawler_path / "config" / "db_config.py" @@ -48,7 +53,14 @@ class PlatformCrawler: with open(db_config_path, 'r', encoding='utf-8') as f: content = f.read() - # 替换数据库配置 + # PostgreSQL配置值:如果使用PostgreSQL则使用MindSpider配置,否则使用默认值或环境变量 + pg_password = config.settings.DB_PASSWORD if is_postgresql else "bettafish" + pg_user = config.settings.DB_USER if is_postgresql else "bettafish" + pg_host = config.settings.DB_HOST if is_postgresql else "127.0.0.1" + pg_port = config.settings.DB_PORT if is_postgresql else 5432 + pg_db_name = config.settings.DB_NAME if is_postgresql else "bettafish" + + # 替换数据库配置 - 使用MindSpider的数据库配置 new_config = f'''# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: # 1. 不得用于任何商业用途。 # 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 @@ -63,11 +75,19 @@ class PlatformCrawler: import os # mysql config - 使用MindSpider的数据库配置 -MYSQL_DB_PWD = "{config.DB_PASSWORD}" -MYSQL_DB_USER = "{config.DB_USER}" -MYSQL_DB_HOST = "{config.DB_HOST}" -MYSQL_DB_PORT = {config.DB_PORT} -MYSQL_DB_NAME = "{config.DB_NAME}" +MYSQL_DB_PWD = "{config.settings.DB_PASSWORD}" +MYSQL_DB_USER = "{config.settings.DB_USER}" +MYSQL_DB_HOST = "{config.settings.DB_HOST}" +MYSQL_DB_PORT = {config.settings.DB_PORT} +MYSQL_DB_NAME = "{config.settings.DB_NAME}" + +mysql_db_config = {{ + "user": MYSQL_DB_USER, + "password": MYSQL_DB_PWD, + "host": MYSQL_DB_HOST, + "port": MYSQL_DB_PORT, + "db_name": MYSQL_DB_NAME, +}} # redis config @@ -81,17 +101,39 @@ CACHE_TYPE_REDIS = "redis" CACHE_TYPE_MEMORY = "memory" # sqlite config -SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schema", "sqlite_tables.db")''' +SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "database", "sqlite_tables.db") + +sqlite_db_config = {{ + "db_path": SQLITE_DB_PATH +}} + +# postgresql config - 使用MindSpider的数据库配置(如果DB_DIALECT是postgresql)或环境变量 +POSTGRESQL_DB_PWD = os.getenv("POSTGRESQL_DB_PWD", "{pg_password}") +POSTGRESQL_DB_USER = os.getenv("POSTGRESQL_DB_USER", "{pg_user}") +POSTGRESQL_DB_HOST = os.getenv("POSTGRESQL_DB_HOST", "{pg_host}") +POSTGRESQL_DB_PORT = os.getenv("POSTGRESQL_DB_PORT", "{pg_port}") +POSTGRESQL_DB_NAME = os.getenv("POSTGRESQL_DB_NAME", "{pg_db_name}") + +postgresql_db_config = {{ + "user": POSTGRESQL_DB_USER, + "password": POSTGRESQL_DB_PWD, + "host": POSTGRESQL_DB_HOST, + "port": POSTGRESQL_DB_PORT, + "db_name": POSTGRESQL_DB_NAME, +}} + +''' # 写入新配置 with open(db_config_path, 'w', encoding='utf-8') as f: f.write(new_config) - print("已配置MediaCrawler使用MindSpider数据库") + db_type = "PostgreSQL" if is_postgresql else "MySQL" + logger.info(f"已配置MediaCrawler使用MindSpider {db_type}数据库") return True except Exception as e: - print(f"配置MediaCrawler数据库失败: {e}") + logger.exception(f"配置MediaCrawler数据库失败: {e}") return False def create_base_config(self, platform: str, keywords: List[str], @@ -109,6 +151,11 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem 是否配置成功 """ try: + # 判断数据库类型,确定 SAVE_DATA_OPTION + db_dialect = (config.settings.DB_DIALECT or "mysql").lower() + is_postgresql = db_dialect in ("postgresql", "postgres") + save_data_option = "postgresql" if is_postgresql else "db" + base_config_path = self.mediacrawler_path / "config" / "base_config.py" # 将关键词列表转换为逗号分隔的字符串 @@ -130,7 +177,7 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem elif line.startswith('CRAWLER_TYPE = '): new_lines.append(f'CRAWLER_TYPE = "{crawler_type}" # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据)') elif line.startswith('SAVE_DATA_OPTION = '): - new_lines.append('SAVE_DATA_OPTION = "db" # csv or db or json or sqlite') + new_lines.append(f'SAVE_DATA_OPTION = "{save_data_option}" # csv or db or json or sqlite or postgresql') elif line.startswith('CRAWLER_MAX_NOTES_COUNT = '): new_lines.append(f'CRAWLER_MAX_NOTES_COUNT = {max_notes}') elif line.startswith('ENABLE_GET_COMMENTS = '): @@ -146,11 +193,11 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem with open(base_config_path, 'w', encoding='utf-8') as f: f.write('\n'.join(new_lines)) - print(f"已配置 {platform} 平台,关键词数量: {len(keywords)}") + logger.info(f"已配置 {platform} 平台,爬取类型: {crawler_type},关键词数量: {len(keywords)},最大爬取数量: {max_notes},保存数据方式: {save_data_option}") return True except Exception as e: - print(f"创建基础配置失败: {e}") + logger.exception(f"创建基础配置失败: {e}") return False def run_crawler(self, platform: str, keywords: List[str], @@ -173,8 +220,9 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem if not keywords: raise ValueError("关键词列表不能为空") - print(f"\n开始爬取平台: {platform}") - print(f"关键词: {keywords[:5]}{'...' if len(keywords) > 5 else ''} (共{len(keywords)}个)") + start_message = f"\n开始爬取平台: {platform}" + start_message += f"\n关键词: {keywords[:5]}{'...' if len(keywords) > 5 else ''} (共{len(keywords)}个)" + logger.info(start_message) start_time = datetime.now() @@ -187,22 +235,27 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem if not self.create_base_config(platform, keywords, "search", max_notes): return {"success": False, "error": "基础配置创建失败"} + # 判断数据库类型,确定 save_data_option + db_dialect = (config.settings.DB_DIALECT or "mysql").lower() + is_postgresql = db_dialect in ("postgresql", "postgres") + save_data_option = "postgresql" if is_postgresql else "db" + # 构建命令 cmd = [ sys.executable, "main.py", "--platform", platform, "--lt", login_type, "--type", "search", - "--save_data_option", "db" + "--save_data_option", save_data_option ] - print(f"执行命令: {' '.join(cmd)}") + logger.info(f"执行命令: {' '.join(cmd)}") # 切换到MediaCrawler目录并执行 result = subprocess.run( cmd, cwd=self.mediacrawler_path, - timeout=1800 # 30分钟超时 + timeout=3600 # 60分钟超时 ) end_time = datetime.now() @@ -226,17 +279,17 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem self.crawl_stats[platform] = crawl_stats if result.returncode == 0: - print(f"✅ {platform} 爬取完成,耗时: {duration:.1f}秒") + logger.info(f"✅ {platform} 爬取完成,耗时: {duration:.1f}秒") else: - print(f"❌ {platform} 爬取失败,返回码: {result.returncode}") + logger.error(f"❌ {platform} 爬取失败,返回码: {result.returncode}") return crawl_stats except subprocess.TimeoutExpired: - print(f"❌ {platform} 爬取超时") + logger.exception(f"❌ {platform} 爬取超时") return {"success": False, "error": "爬取超时", "platform": platform} except Exception as e: - print(f"❌ {platform} 爬取异常: {e}") + logger.exception(f"❌ {platform} 爬取异常: {e}") return {"success": False, "error": str(e), "platform": platform} def _parse_crawl_output(self, output_lines: List[str], error_lines: List[str]) -> Dict: @@ -291,10 +344,14 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem Returns: 总体爬取统计 """ - print(f"\n🚀 开始全平台关键词爬取") - print(f" 关键词数量: {len(keywords)}") - print(f" 平台数量: {len(platforms)}") - print(f" 总爬取任务: {len(keywords)} × {len(platforms)} = {len(keywords) * len(platforms)}") + + start_message = f"\n🚀 开始全平台关键词爬取" + start_message += f"\n 关键词数量: {len(keywords)}" + start_message += f"\n 平台数量: {len(platforms)}" + start_message += f"\n 登录方式: {login_type}" + start_message += f"\n 每个关键词在每个平台的最大爬取数量: {max_notes_per_keyword}" + start_message += f"\n 总爬取任务: {len(keywords)} × {len(platforms)} = {len(keywords) * len(platforms)}" + logger.info(start_message) total_stats = { "total_keywords": len(keywords), @@ -319,8 +376,8 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem # 对每个平台一次性爬取所有关键词 for platform in platforms: - print(f"\n📝 在 {platform} 平台爬取所有关键词") - print(f" 关键词: {', '.join(keywords[:5])}{'...' if len(keywords) > 5 else ''}") + logger.info(f"\n📝 在 {platform} 平台爬取所有关键词") + logger.info(f" 关键词: {', '.join(keywords[:5])}{'...' if len(keywords) > 5 else ''}") try: # 一次性传递所有关键词给平台 @@ -344,7 +401,7 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem total_stats["keyword_results"][keyword] = {} total_stats["keyword_results"][keyword][platform] = result - print(f" ✅ 成功: {notes_count} 条内容, {comments_count} 条评论") + logger.info(f" ✅ 成功: {notes_count} 条内容, {comments_count} 条评论") else: total_stats["failed_tasks"] += len(keywords) total_stats["platform_summary"][platform]["failed_keywords"] = len(keywords) @@ -355,7 +412,7 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem total_stats["keyword_results"][keyword] = {} total_stats["keyword_results"][keyword][platform] = result - print(f" ❌ 失败: {result.get('error', '未知错误')}") + logger.error(f" ❌ 失败: {result.get('error', '未知错误')}") except Exception as e: total_stats["failed_tasks"] += len(keywords) @@ -368,22 +425,24 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem total_stats["keyword_results"][keyword] = {} total_stats["keyword_results"][keyword][platform] = error_result - print(f" ❌ 异常: {e}") + logger.error(f" ❌ 异常: {e}") # 打印详细统计 - print(f"\n📊 全平台关键词爬取完成!") - print(f" 总任务: {total_stats['total_tasks']}") - print(f" 成功: {total_stats['successful_tasks']}") - print(f" 失败: {total_stats['failed_tasks']}") - print(f" 成功率: {total_stats['successful_tasks']/total_stats['total_tasks']*100:.1f}%") - print(f" 总内容: {total_stats['total_notes']} 条") - print(f" 总评论: {total_stats['total_comments']} 条") + finish_message = f"\n📊 全平台关键词爬取完成!" + finish_message += f"\n 总任务: {total_stats['total_tasks']}" + finish_message += f"\n 成功: {total_stats['successful_tasks']}" + finish_message += f"\n 失败: {total_stats['failed_tasks']}" + finish_message += f"\n 成功率: {total_stats['successful_tasks']/total_stats['total_tasks']*100:.1f}%" + finish_message += f"\n 总内容: {total_stats['total_notes']} 条" + finish_message += f"\n 总评论: {total_stats['total_comments']} 条" + logger.info(finish_message) - print(f"\n📈 各平台统计:") + platform_summary_message = f"\n� 各平台统计:" for platform, stats in total_stats["platform_summary"].items(): success_rate = stats["successful_keywords"] / len(keywords) * 100 if keywords else 0 - print(f" {platform}: {stats['successful_keywords']}/{len(keywords)} 关键词成功 ({success_rate:.1f}%), " - f"{stats['total_notes']} 条内容") + platform_summary_message += f"\n {platform}: {stats['successful_keywords']}/{len(keywords)} 关键词成功 ({success_rate:.1f}%), " + platform_summary_message += f"{stats['total_notes']} 条内容" + logger.info(platform_summary_message) return total_stats @@ -403,9 +462,9 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem try: with open(log_path, 'w', encoding='utf-8') as f: json.dump(self.crawl_stats, f, ensure_ascii=False, indent=2) - print(f"爬取日志已保存到: {log_path}") + logger.info(f"爬取日志已保存到: {log_path}") except Exception as e: - print(f"保存爬取日志失败: {e}") + logger.exception(f"保存爬取日志失败: {e}") if __name__ == "__main__": # 测试平台爬虫管理器 @@ -415,5 +474,5 @@ if __name__ == "__main__": test_keywords = ["科技", "AI", "编程"] result = crawler.run_crawler("xhs", test_keywords, max_notes=5) - print(f"测试结果: {result}") - print("平台爬虫管理器测试完成!") + logger.info(f"测试结果: {result}") + logger.info("平台爬虫管理器测试完成!") diff --git a/MindSpider/README.md b/MindSpider/README.md index dd8514b..29149f1 100644 --- a/MindSpider/README.md +++ b/MindSpider/README.md @@ -217,26 +217,54 @@ git clone https://github.com/yourusername/MindSpider.git cd MindSpider ``` -### 2. 创建并激活Conda环境 +### 2. 创建并激活环境 + +#### Conda配置方法 + +#### Conda配置方法 ```bash +# 创建名为 pytorch_python11 的conda环境并指定Python版本 conda create -n pytorch_python11 python=3.11 +# 激活该环境 conda activate pytorch_python11 ``` +#### UV配置方法 + +> [UV 是一种快速轻量级 Python 包环境管理工具,适用于低依赖及便捷管理需求。可参考:https://github.com/astral-sh/uv] + +- 安装uv(如未安装) +```bash +pip install uv +``` +- 创建虚拟环境并激活 +```bash +uv venv --python 3.11 # 创建3.11环境 +source .venv/bin/activate # Linux/macOS +# 或 +.venv\Scripts\activate # Windows +``` + + ### 3. 安装依赖 ```bash # 安装Python依赖 pip install -r requirements.txt +或 +# uv版本更加快速 +uv pip install -r requirements.txt + + # 安装Playwright浏览器驱动 playwright install ``` ### 4. 配置系统 -编辑 `config.py` 文件,设置数据库和API配置: +复制.env.example文件为.env文件,放置在项目根目录。编辑 `.env` 文件,设置数据库和API配置: ```python # MySQL数据库配置 @@ -248,7 +276,9 @@ DB_NAME = "mindspider" DB_CHARSET = "utf8mb4" # DeepSeek API密钥 -DEEPSEEK_API_KEY = "your_deepseek_api_key" +MINDSPIDER_BASE_URL=your_api_base_url +MINDSPIDER_API_KEY=sk-your-key +MINDSPIDER_MODEL_NAME=deepseek-chat ``` ### 5. 初始化系统 @@ -418,6 +448,11 @@ python main.py --status ```bash # 重新安装 pip install playwright + +或 + +uv pip install playwright + playwright install ``` diff --git a/MindSpider/config.py b/MindSpider/config.py index 7d36612..05d5206 100644 --- a/MindSpider/config.py +++ b/MindSpider/config.py @@ -3,13 +3,33 @@ 存储数据库连接信息和API密钥 """ -# MySQL数据库配置 -DB_HOST = "your_host" -DB_PORT = 3306 -DB_USER = "your_username" -DB_PASSWORD = "your_password" -DB_NAME = "mindspider" -DB_CHARSET = "utf8mb4" +from pydantic_settings import BaseSettings +from typing import Optional +from pydantic import Field +from pathlib import Path -# DeepSeek API密钥 -DEEPSEEK_API_KEY = "your_deepseek_api_key" +# 计算 .env 优先级:优先当前工作目录,其次项目根目录(MindSpider 的上级目录) +PROJECT_ROOT: Path = Path(__file__).resolve().parents[1] +CWD_ENV: Path = Path.cwd() / ".env" +ENV_FILE: str = str(CWD_ENV if CWD_ENV.exists() else (PROJECT_ROOT / ".env")) + +class Settings(BaseSettings): + """全局配置管理,优先从环境变量和.env加载。支持MySQL/PostgreSQL统一数据库参数命名。""" + DB_DIALECT: str = Field("mysql", description="数据库类型,支持'mysql'或'postgresql'") + DB_HOST: str = Field("your_host", description="数据库主机名或IP地址") + DB_PORT: int = Field(3306, description="数据库端口号") + DB_USER: str = Field("your_username", description="数据库用户名") + DB_PASSWORD: str = Field("your_password", description="数据库密码") + DB_NAME: str = Field("mindspider", description="数据库名称") + DB_CHARSET: str = Field("utf8mb4", description="数据库字符集") + MINDSPIDER_API_KEY: Optional[str] = Field(None, description="MINDSPIDER API密钥") + MINDSPIDER_BASE_URL: Optional[str] = Field("https://api.deepseek.com", description="MINDSPIDER API基础URL,推荐deepseek-chat模型使用https://api.deepseek.com") + MINDSPIDER_MODEL_NAME: Optional[str] = Field("deepseek-chat", description="MINDSPIDER API模型名称, 推荐deepseek-chat") + + class Config: + env_file = ENV_FILE + env_prefix = "" + case_sensitive = False + extra = "allow" + +settings = Settings() diff --git a/MindSpider/main.py b/MindSpider/main.py index aff9271..6d8c956 100644 --- a/MindSpider/main.py +++ b/MindSpider/main.py @@ -11,8 +11,13 @@ import argparse from datetime import date, datetime from pathlib import Path import subprocess +import asyncio import pymysql from pymysql.cursors import DictCursor +from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine +from sqlalchemy import inspect, text +from config import settings +from loguru import logger # 添加项目根目录到路径 project_root = Path(__file__).parent @@ -21,8 +26,8 @@ sys.path.append(str(project_root)) try: import config except ImportError: - print("错误:无法导入config.py配置文件") - print("请确保项目根目录下存在config.py文件,并包含数据库和API配置信息") + logger.error("错误:无法导入config.py配置文件") + logger.error("请确保项目根目录下存在config.py文件,并包含数据库和API配置信息") sys.exit(1) class MindSpider: @@ -35,99 +40,110 @@ class MindSpider: self.deep_sentiment_path = self.project_root / "DeepSentimentCrawling" self.schema_path = self.project_root / "schema" - print("MindSpider AI爬虫项目") - print(f"项目路径: {self.project_root}") + logger.info("MindSpider AI爬虫项目") + logger.info(f"项目路径: {self.project_root}") def check_config(self) -> bool: """检查基础配置""" - print("\n检查基础配置...") + logger.info("检查基础配置...") - # 检查config.py配置项 + # 检查settings配置项 required_configs = [ 'DB_HOST', 'DB_PORT', 'DB_USER', 'DB_PASSWORD', 'DB_NAME', 'DB_CHARSET', - 'DEEPSEEK_API_KEY' + 'MINDSPIDER_API_KEY', 'MINDSPIDER_BASE_URL', 'MINDSPIDER_MODEL_NAME' ] missing_configs = [] for config_name in required_configs: - if not hasattr(config, config_name) or not getattr(config, config_name): + if not hasattr(settings, config_name) or not getattr(settings, config_name): missing_configs.append(config_name) if missing_configs: - print(f"配置缺失: {', '.join(missing_configs)}") - print("请检查config.py文件中的配置信息") + logger.error(f"配置缺失: {', '.join(missing_configs)}") + logger.error("请检查config.py文件中的配置信息") return False - print("基础配置检查通过") + logger.info("基础配置检查通过") return True def check_database_connection(self) -> bool: """检查数据库连接""" - print("\n检查数据库连接...") + logger.info("检查数据库连接...") - try: - connection = pymysql.connect( - host=config.DB_HOST, - port=config.DB_PORT, - user=config.DB_USER, - password=config.DB_PASSWORD, - database=config.DB_NAME, - charset=config.DB_CHARSET, - cursorclass=DictCursor + def build_async_url() -> str: + dialect = (settings.DB_DIALECT or "mysql").lower() + if dialect == "postgresql": + return f"postgresql+asyncpg://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}" + # 默认使用 mysql 异步驱动 asyncmy + return ( + f"mysql+asyncmy://{settings.DB_USER}:{settings.DB_PASSWORD}" + f"@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}?charset={settings.DB_CHARSET}" ) - connection.close() - print("数据库连接正常") + + async def _test_connection(db_url: str) -> None: + engine: AsyncEngine = create_async_engine(db_url, pool_pre_ping=True) + try: + async with engine.connect() as conn: + await conn.execute(text("SELECT 1")) + finally: + await engine.dispose() + + try: + db_url: str = build_async_url() + asyncio.run(_test_connection(db_url)) + logger.info("数据库连接正常") return True except Exception as e: - print(f"数据库连接失败: {e}") + logger.exception(f"数据库连接失败: {e}") return False def check_database_tables(self) -> bool: """检查数据库表是否存在""" - print("\n检查数据库表...") + logger.info("检查数据库表...") - try: - connection = pymysql.connect( - host=config.DB_HOST, - port=config.DB_PORT, - user=config.DB_USER, - password=config.DB_PASSWORD, - database=config.DB_NAME, - charset=config.DB_CHARSET, - cursorclass=DictCursor + def build_async_url() -> str: + dialect = (settings.DB_DIALECT or "mysql").lower() + if dialect == "postgresql": + return f"postgresql+asyncpg://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}" + return ( + f"mysql+asyncmy://{settings.DB_USER}:{settings.DB_PASSWORD}" + f"@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}?charset={settings.DB_CHARSET}" ) - - cursor = connection.cursor() - - # 检查核心表是否存在 + + async def _check_tables(db_url: str) -> list[str]: + engine: AsyncEngine = create_async_engine(db_url, pool_pre_ping=True) + try: + async with engine.connect() as conn: + def _get_tables(sync_conn): + return inspect(sync_conn).get_table_names() + tables = await conn.run_sync(_get_tables) + return tables + finally: + await engine.dispose() + + try: + db_url: str = build_async_url() + existing_tables = asyncio.run(_check_tables(db_url)) required_tables = ['daily_news', 'daily_topics'] - cursor.execute("SHOW TABLES") - existing_tables = [row[f'Tables_in_{config.DB_NAME}'] for row in cursor.fetchall()] - - missing_tables = [table for table in required_tables if table not in existing_tables] - - connection.close() - + missing_tables = [t for t in required_tables if t not in existing_tables] if missing_tables: - print(f"缺少数据库表: {', '.join(missing_tables)}") + logger.error(f"缺少数据库表: {', '.join(missing_tables)}") return False - else: - print("数据库表检查通过") - return True - + logger.info("数据库表检查通过") + return True except Exception as e: - print(f"检查数据库表失败: {e}") + logger.exception(f"检查数据库表失败: {e}") return False def initialize_database(self) -> bool: """初始化数据库""" - print("\n初始化数据库...") + logger.info("初始化数据库...") try: # 运行数据库初始化脚本 init_script = self.schema_path / "init_database.py" if not init_script.exists(): - print("错误:找不到数据库初始化脚本") + logger.error("错误:找不到数据库初始化脚本") return False result = subprocess.run( @@ -138,19 +154,19 @@ class MindSpider: ) if result.returncode == 0: - print("数据库初始化成功") + logger.info("数据库初始化成功") return True else: - print(f"数据库初始化失败: {result.stderr}") + logger.error(f"数据库初始化失败: {result.stderr}") return False except Exception as e: - print(f"数据库初始化异常: {e}") + logger.exception(f"数据库初始化异常: {e}") return False def check_dependencies(self) -> bool: """检查依赖环境""" - print("\n检查依赖环境...") + logger.info("检查依赖环境...") # 检查Python包 required_packages = ['pymysql', 'requests', 'playwright'] @@ -163,22 +179,22 @@ class MindSpider: missing_packages.append(package) if missing_packages: - print(f"缺少Python包: {', '.join(missing_packages)}") - print("请运行: pip install -r requirements.txt") + logger.error(f"缺少Python包: {', '.join(missing_packages)}") + logger.info("请运行: pip install -r requirements.txt") return False # 检查MediaCrawler依赖 mediacrawler_path = self.deep_sentiment_path / "MediaCrawler" if not mediacrawler_path.exists(): - print("错误:找不到MediaCrawler目录") + logger.error("错误:找不到MediaCrawler目录") return False - print("依赖环境检查通过") + logger.info("依赖环境检查通过") return True def run_broad_topic_extraction(self, extract_date: date = None, keywords_count: int = 100) -> bool: """运行BroadTopicExtraction模块""" - print(f"\n运行BroadTopicExtraction模块...") + logger.info("运行BroadTopicExtraction模块...") if not extract_date: extract_date = date.today() @@ -186,11 +202,10 @@ class MindSpider: try: cmd = [ sys.executable, "main.py", - "--date", extract_date.strftime("%Y-%m-%d"), "--keywords", str(keywords_count) ] - print(f"执行命令: {' '.join(cmd)}") + logger.info(f"执行命令: {' '.join(cmd)}") result = subprocess.run( cmd, @@ -199,24 +214,24 @@ class MindSpider: ) if result.returncode == 0: - print("BroadTopicExtraction模块执行成功") + logger.info("BroadTopicExtraction模块执行成功") return True else: - print(f"BroadTopicExtraction模块执行失败,返回码: {result.returncode}") + logger.error(f"BroadTopicExtraction模块执行失败,返回码: {result.returncode}") return False except subprocess.TimeoutExpired: - print("BroadTopicExtraction模块执行超时") + logger.error("BroadTopicExtraction模块执行超时") return False except Exception as e: - print(f"BroadTopicExtraction模块执行异常: {e}") + logger.exception(f"BroadTopicExtraction模块执行异常: {e}") return False def run_deep_sentiment_crawling(self, target_date: date = None, platforms: list = None, max_keywords: int = 50, max_notes: int = 50, test_mode: bool = False) -> bool: """运行DeepSentimentCrawling模块""" - print(f"\n运行DeepSentimentCrawling模块...") + logger.info("运行DeepSentimentCrawling模块...") if not target_date: target_date = date.today() @@ -238,7 +253,7 @@ class MindSpider: if test_mode: cmd.append("--test") - print(f"执行命令: {' '.join(cmd)}") + logger.info(f"执行命令: {' '.join(cmd)}") result = subprocess.run( cmd, @@ -247,78 +262,78 @@ class MindSpider: ) if result.returncode == 0: - print("DeepSentimentCrawling模块执行成功") + logger.info("DeepSentimentCrawling模块执行成功") return True else: - print(f"DeepSentimentCrawling模块执行失败,返回码: {result.returncode}") + logger.error(f"DeepSentimentCrawling模块执行失败,返回码: {result.returncode}") return False except subprocess.TimeoutExpired: - print("DeepSentimentCrawling模块执行超时") + logger.error("DeepSentimentCrawling模块执行超时") return False except Exception as e: - print(f"DeepSentimentCrawling模块执行异常: {e}") + logger.exception(f"DeepSentimentCrawling模块执行异常: {e}") return False def run_complete_workflow(self, target_date: date = None, platforms: list = None, keywords_count: int = 100, max_keywords: int = 50, max_notes: int = 50, test_mode: bool = False) -> bool: """运行完整工作流程""" - print(f"\n开始完整的MindSpider工作流程") + logger.info("开始完整的MindSpider工作流程") if not target_date: target_date = date.today() - print(f"目标日期: {target_date}") - print(f"平台列表: {platforms if platforms else '所有支持的平台'}") - print(f"测试模式: {'是' if test_mode else '否'}") + logger.info(f"目标日期: {target_date}") + logger.info(f"平台列表: {platforms if platforms else '所有支持的平台'}") + logger.info(f"测试模式: {'是' if test_mode else '否'}") # 第一步:运行话题提取 - print(f"\n=== 第一步:话题提取 ===") + logger.info("=== 第一步:话题提取 ===") if not self.run_broad_topic_extraction(target_date, keywords_count): - print("话题提取失败,终止流程") + logger.error("话题提取失败,终止流程") return False # 第二步:运行情感爬取 - print(f"\n=== 第二步:情感爬取 ===") + logger.info("=== 第二步:情感爬取 ===") if not self.run_deep_sentiment_crawling(target_date, platforms, max_keywords, max_notes, test_mode): - print("情感爬取失败,但话题提取已完成") + logger.error("情感爬取失败,但话题提取已完成") return False - print(f"\n完整工作流程执行成功!") + logger.info("完整工作流程执行成功!") return True def show_status(self): """显示项目状态""" - print(f"\nMindSpider项目状态:") - print(f"项目路径: {self.project_root}") + logger.info("MindSpider项目状态:") + logger.info(f"项目路径: {self.project_root}") # 配置状态 config_ok = self.check_config() - print(f"配置状态: {'正常' if config_ok else '异常'}") + logger.info(f"配置状态: {'正常' if config_ok else '异常'}") # 数据库状态 if config_ok: db_conn_ok = self.check_database_connection() - print(f"数据库连接: {'正常' if db_conn_ok else '异常'}") + logger.info(f"数据库连接: {'正常' if db_conn_ok else '异常'}") if db_conn_ok: db_tables_ok = self.check_database_tables() - print(f"数据库表: {'正常' if db_tables_ok else '需要初始化'}") + logger.info(f"数据库表: {'正常' if db_tables_ok else '需要初始化'}") # 依赖状态 deps_ok = self.check_dependencies() - print(f"依赖环境: {'正常' if deps_ok else '异常'}") + logger.info(f"依赖环境: {'正常' if deps_ok else '异常'}") # 模块状态 broad_topic_exists = self.broad_topic_path.exists() deep_sentiment_exists = self.deep_sentiment_path.exists() - print(f"BroadTopicExtraction模块: {'存在' if broad_topic_exists else '缺失'}") - print(f"DeepSentimentCrawling模块: {'存在' if deep_sentiment_exists else '缺失'}") + logger.info(f"BroadTopicExtraction模块: {'存在' if broad_topic_exists else '缺失'}") + logger.info(f"DeepSentimentCrawling模块: {'存在' if deep_sentiment_exists else '缺失'}") def setup_project(self) -> bool: """项目初始化设置""" - print(f"\n开始MindSpider项目初始化...") + logger.info("开始MindSpider项目初始化...") # 1. 检查配置 if not self.check_config(): @@ -334,11 +349,11 @@ class MindSpider: # 4. 检查并初始化数据库表 if not self.check_database_tables(): - print("需要初始化数据库表...") + logger.info("需要初始化数据库表...") if not self.initialize_database(): return False - print(f"\nMindSpider项目初始化完成!") + logger.info("MindSpider项目初始化完成!") return True def main(): @@ -373,7 +388,7 @@ def main(): try: target_date = datetime.strptime(args.date, "%Y-%m-%d").date() except ValueError: - print("错误:日期格式不正确,请使用 YYYY-MM-DD 格式") + logger.error("错误:日期格式不正确,请使用 YYYY-MM-DD 格式") return # 创建MindSpider实例 @@ -388,17 +403,17 @@ def main(): # 项目设置 if args.setup: if spider.setup_project(): - print("项目设置完成,可以开始使用MindSpider!") + logger.info("项目设置完成,可以开始使用MindSpider!") else: - print("项目设置失败,请检查配置和环境") + logger.error("项目设置失败,请检查配置和环境") return # 初始化数据库 if args.init_db: if spider.initialize_database(): - print("数据库初始化成功") + logger.info("数据库初始化成功") else: - print("数据库初始化失败") + logger.error("数据库初始化失败") return # 运行模块 @@ -415,16 +430,16 @@ def main(): ) else: # 默认运行完整工作流程 - print("运行完整MindSpider工作流程...") + logger.info("运行完整MindSpider工作流程...") spider.run_complete_workflow( target_date, args.platforms, args.keywords_count, args.max_keywords, args.max_notes, args.test ) except KeyboardInterrupt: - print("\n用户中断操作") + logger.info("用户中断操作") except Exception as e: - print(f"\n执行出错: {e}") + logger.exception(f"执行出错: {e}") if __name__ == "__main__": main() diff --git a/MindSpider/requirements.txt b/MindSpider/requirements.txt index 8f1b411..dd6ec5b 100644 --- a/MindSpider/requirements.txt +++ b/MindSpider/requirements.txt @@ -7,6 +7,8 @@ pymysql==1.1.0 aiomysql==0.2.0 aiosqlite==0.21.0 +asyncpg +sqlalchemy # =============================== # HTTP请求和网络 @@ -42,6 +44,8 @@ wordcloud==1.9.3 matplotlib==3.9.0 parsel==1.9.1 pyexecjs==1.5.1 +typer>=0.12.3 +pyhumps==3.8.0 # =============================== # 工具包 diff --git a/MindSpider/schema/db_manager.py b/MindSpider/schema/db_manager.py index 76303ee..dcf2fc1 100644 --- a/MindSpider/schema/db_manager.py +++ b/MindSpider/schema/db_manager.py @@ -7,10 +7,12 @@ MindSpider AI爬虫项目 - 数据库管理工具 import os import sys -import pymysql +from sqlalchemy import create_engine, text, inspect +from sqlalchemy.engine import Engine import argparse from pathlib import Path from datetime import datetime, timedelta +from loguru import logger # 添加项目根目录到路径 project_root = Path(__file__).parent.parent @@ -19,125 +21,132 @@ sys.path.append(str(project_root)) try: import config except ImportError: - print("错误: 无法导入config.py配置文件") + logger.error("错误: 无法导入config.py配置文件") sys.exit(1) +from MindSpider.config import settings + class DatabaseManager: def __init__(self): - self.connection = None + self.engine: Engine = None self.connect() def connect(self): """连接数据库""" try: - self.connection = pymysql.connect( - host=config.DB_HOST, - port=config.DB_PORT, - user=config.DB_USER, - password=config.DB_PASSWORD, - database=config.DB_NAME, - charset=config.DB_CHARSET, - autocommit=True - ) - print(f"成功连接到数据库: {config.DB_NAME}") + dialect = (settings.DB_DIALECT or "mysql").lower() + if dialect in ("postgresql", "postgres"): + url = f"postgresql+psycopg://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}" + else: + url = f"mysql+pymysql://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}?charset={settings.DB_CHARSET}" + self.engine = create_engine(url, future=True) + logger.info(f"成功连接到数据库: {settings.DB_NAME}") except Exception as e: - print(f"数据库连接失败: {e}") + logger.error(f"数据库连接失败: {e}") sys.exit(1) def close(self): """关闭数据库连接""" - if self.connection: - self.connection.close() + if self.engine: + self.engine.dispose() def show_tables(self): """显示所有表""" - print("\n" + "=" * 60) - print("数据库表列表") - print("=" * 60) + data_list_message = "" + data_list_message += "\n" + "=" * 60 + data_list_message += "数据库表列表" + data_list_message += "=" * 60 + logger.info(data_list_message) - cursor = self.connection.cursor() - cursor.execute("SHOW TABLES") - tables = cursor.fetchall() + inspector = inspect(self.engine) + tables = inspector.get_table_names() if not tables: - print("数据库中没有表") + logger.info("数据库中没有表") return # 分类显示表 mindspider_tables = [] mediacrawler_tables = [] - for table in tables: - table_name = table[0] + for table_name in tables: if table_name in ['daily_news', 'daily_topics', 'topic_news_relation', 'crawling_tasks']: mindspider_tables.append(table_name) else: mediacrawler_tables.append(table_name) - print("MindSpider核心表:") + data_list_message += "MindSpider核心表:" + data_list_message += "\n" for table in mindspider_tables: - cursor.execute(f"SELECT COUNT(*) FROM {table}") - count = cursor.fetchone()[0] - print(f" - {table:<25} ({count:>6} 条记录)") + with self.engine.connect() as conn: + count = conn.execute(text(f"SELECT COUNT(*) FROM {table}")).scalar_one() + data_list_message += f" - {table:<25} ({count:>6} 条记录)" + data_list_message += "\n" - print("\nMediaCrawler平台表:") + data_list_message += "\nMediaCrawler平台表:" + data_list_message += "\n" for table in mediacrawler_tables: try: - cursor.execute(f"SELECT COUNT(*) FROM {table}") - count = cursor.fetchone()[0] - print(f" - {table:<25} ({count:>6} 条记录)") + with self.engine.connect() as conn: + count = conn.execute(text(f"SELECT COUNT(*) FROM {table}")).scalar_one() + data_list_message += f" - {table:<25} ({count:>6} 条记录)" + data_list_message += "\n" except: - print(f" - {table:<25} (查询失败)") + data_list_message += f" - {table:<25} (查询失败)" + data_list_message += "\n" + logger.info(data_list_message) def show_statistics(self): """显示数据统计""" - print("\n" + "=" * 60) - print("数据统计") - print("=" * 60) - - cursor = self.connection.cursor() + data_statistics_message = "" + data_statistics_message += "\n" + "=" * 60 + data_statistics_message += "数据统计" + data_statistics_message += "=" * 60 + data_statistics_message += "\n" try: # 新闻统计 - cursor.execute("SELECT COUNT(*) FROM daily_news") - news_count = cursor.fetchone()[0] - - cursor.execute("SELECT COUNT(DISTINCT crawl_date) FROM daily_news") - news_days = cursor.fetchone()[0] - - cursor.execute("SELECT COUNT(DISTINCT source_platform) FROM daily_news") - platforms = cursor.fetchone()[0] - - print(f"新闻数据:") - print(f" - 总新闻数: {news_count}") - print(f" - 覆盖天数: {news_days}") - print(f" - 新闻平台: {platforms}") + with self.engine.connect() as conn: + news_count = conn.execute(text("SELECT COUNT(*) FROM daily_news")).scalar_one() + news_days = conn.execute(text("SELECT COUNT(DISTINCT crawl_date) FROM daily_news")).scalar_one() + platforms = conn.execute(text("SELECT COUNT(DISTINCT source_platform) FROM daily_news")).scalar_one() + data_statistics_message += "新闻数据:" + data_statistics_message += "\n" + data_statistics_message += f" - 总新闻数: {news_count}" + data_statistics_message += "\n" + data_statistics_message += f" - 覆盖天数: {news_days}" + data_statistics_message += "\n" + data_statistics_message += f" - 新闻平台: {platforms}" + data_statistics_message += "\n" # 话题统计 - cursor.execute("SELECT COUNT(*) FROM daily_topics") - topic_count = cursor.fetchone()[0] + with self.engine.connect() as conn: + topic_count = conn.execute(text("SELECT COUNT(*) FROM daily_topics")).scalar_one() + topic_days = conn.execute(text("SELECT COUNT(DISTINCT extract_date) FROM daily_topics")).scalar_one() - cursor.execute("SELECT COUNT(DISTINCT extract_date) FROM daily_topics") - topic_days = cursor.fetchone()[0] - - print(f"\n话题数据:") - print(f" - 总话题数: {topic_count}") - print(f" - 提取天数: {topic_days}") + data_statistics_message += "话题数据:" + data_statistics_message += "\n" + data_statistics_message += f" - 总话题数: {topic_count}" + data_statistics_message += "\n" + data_statistics_message += f" - 提取天数: {topic_days}" + data_statistics_message += "\n" # 爬取任务统计 - cursor.execute("SELECT COUNT(*) FROM crawling_tasks") - task_count = cursor.fetchone()[0] + with self.engine.connect() as conn: + task_count = conn.execute(text("SELECT COUNT(*) FROM crawling_tasks")).scalar_one() + task_status = conn.execute(text("SELECT task_status, COUNT(*) FROM crawling_tasks GROUP BY task_status")).all() - cursor.execute("SELECT task_status, COUNT(*) FROM crawling_tasks GROUP BY task_status") - task_status = cursor.fetchall() - - print(f"\n爬取任务:") - print(f" - 总任务数: {task_count}") + data_statistics_message += "爬取任务:" + data_statistics_message += "\n" + data_statistics_message += f" - 总任务数: {task_count}" + data_statistics_message += "\n" for status, count in task_status: - print(f" - {status}: {count}") + data_statistics_message += f" - {status}: {count}" + data_statistics_message += "\n" # 爬取内容统计 - print(f"\n平台内容统计:") + data_statistics_message += "平台内容统计:" + data_statistics_message += "\n" platform_tables = { 'xhs_note': '小红书', 'douyin_aweme': '抖音', @@ -150,60 +159,78 @@ class DatabaseManager: for table, platform in platform_tables.items(): try: - cursor.execute(f"SELECT COUNT(*) FROM {table}") - count = cursor.fetchone()[0] - print(f" - {platform}: {count}") + with self.engine.connect() as conn: + count = conn.execute(text(f"SELECT COUNT(*) FROM {table}")).scalar_one() + data_statistics_message += f" - {platform}: {count}" + data_statistics_message += "\n" except: - print(f" - {platform}: 表不存在") - + data_statistics_message += f" - {platform}: 表不存在" + data_statistics_message += "\n" + logger.info(data_statistics_message) except Exception as e: - print(f"统计查询失败: {e}") + data_statistics_message += f"统计查询失败: {e}" + data_statistics_message += "\n" + logger.error(data_statistics_message) def show_recent_data(self, days=7): """显示最近几天的数据""" - print(f"\n" + "=" * 60) - print(f"最近{days}天的数据") - print("=" * 60) - - cursor = self.connection.cursor() + data_recent_message = "" + data_recent_message += "\n" + "=" * 60 + data_recent_message += "最近" + str(days) + "天的数据" + data_recent_message += "=" * 60 + from datetime import date, timedelta + start_date = date.today() - timedelta(days=days) # 最近的新闻 - cursor.execute(""" - SELECT crawl_date, COUNT(*) as news_count, COUNT(DISTINCT source_platform) as platforms - FROM daily_news - WHERE crawl_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) - GROUP BY crawl_date - ORDER BY crawl_date DESC - """, (days,)) - - news_data = cursor.fetchall() + with self.engine.connect() as conn: + news_data = conn.execute( + text( + """ + SELECT crawl_date, COUNT(*) as news_count, COUNT(DISTINCT source_platform) as platforms + FROM daily_news + WHERE crawl_date >= :start_date + GROUP BY crawl_date + ORDER BY crawl_date DESC + """ + ), + {"start_date": start_date}, + ).all() if news_data: - print("每日新闻统计:") + data_recent_message += "每日新闻统计:" + data_recent_message += "\n" for date, count, platforms in news_data: - print(f" {date}: {count} 条新闻, {platforms} 个平台") + data_recent_message += f" {date}: {count} 条新闻, {platforms} 个平台" + data_recent_message += "\n" # 最近的话题 - cursor.execute(""" - SELECT extract_date, COUNT(*) as topic_count - FROM daily_topics - WHERE extract_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) - GROUP BY extract_date - ORDER BY extract_date DESC - """, (days,)) - - topic_data = cursor.fetchall() + with self.engine.connect() as conn: + topic_data = conn.execute( + text( + """ + SELECT extract_date, COUNT(*) as topic_count + FROM daily_topics + WHERE extract_date >= :start_date + GROUP BY extract_date + ORDER BY extract_date DESC + """ + ), + {"start_date": start_date}, + ).all() if topic_data: - print("\n每日话题统计:") + data_recent_message += "每日话题统计:" + data_recent_message += "\n" for date, count in topic_data: - print(f" {date}: {count} 个话题") + data_recent_message += f" {date}: {count} 个话题" + data_recent_message += "\n" + logger.info(data_recent_message) def cleanup_old_data(self, days=90, dry_run=True): """清理旧数据""" - print(f"\n" + "=" * 60) - print(f"清理{days}天前的数据 ({'预览模式' if dry_run else '执行模式'})") - print("=" * 60) + cleanup_message = "" + cleanup_message += "\n" + "=" * 60 + cleanup_message += f"清理{days}天前的数据 ({'预览模式' if dry_run else '执行模式'})" + cleanup_message += "=" * 60 - cursor = self.connection.cursor() cutoff_date = datetime.now() - timedelta(days=days) # 检查要删除的数据 @@ -213,20 +240,25 @@ class DatabaseManager: ("crawling_tasks", f"SELECT COUNT(*) FROM crawling_tasks WHERE scheduled_date < '{cutoff_date.date()}'") ] - for table, query in cleanup_queries: - cursor.execute(query) - count = cursor.fetchone()[0] - if count > 0: - print(f" {table}: {count} 条记录将被删除") - if not dry_run: - delete_query = query.replace("SELECT COUNT(*)", "DELETE") - cursor.execute(delete_query) - print(f" 已删除 {count} 条记录") - else: - print(f" {table}: 无需清理") + with self.engine.begin() as conn: + for table, query in cleanup_queries: + count = conn.execute(text(query)).scalar_one() + if count > 0: + cleanup_message += f" {table}: {count} 条记录将被删除" + cleanup_message += "\n" + if not dry_run: + delete_query = query.replace("SELECT COUNT(*)", "DELETE") + conn.execute(text(delete_query)) + cleanup_message += f" 已删除 {count} 条记录" + cleanup_message += "\n" + else: + cleanup_message += f" {table}: 无需清理" + cleanup_message += "\n" if dry_run: - print("\n这是预览模式,没有实际删除数据。使用 --execute 参数执行实际清理。") + cleanup_message += "\n这是预览模式,没有实际删除数据。使用 --execute 参数执行实际清理。" + cleanup_message += "\n" + logger.info(cleanup_message) def main(): parser = argparse.ArgumentParser(description="MindSpider数据库管理工具") diff --git a/MindSpider/schema/init_database.py b/MindSpider/schema/init_database.py index c0044f7..70e3825 100644 --- a/MindSpider/schema/init_database.py +++ b/MindSpider/schema/init_database.py @@ -9,6 +9,7 @@ import os import sys import pymysql from pathlib import Path +from MindSpider.config import settings # 添加项目根目录到路径 project_root = Path(__file__).parent.parent @@ -26,14 +27,14 @@ def create_database_connection(): """创建数据库连接""" try: connection = pymysql.connect( - host=config.DB_HOST, - port=config.DB_PORT, - user=config.DB_USER, - password=config.DB_PASSWORD, - charset=config.DB_CHARSET, + host=settings.db_host, + port=settings.db_port, + user=settings.db_user, + password=settings.db_password, + charset=settings.db_charset, autocommit=True ) - print(f"成功连接到MySQL服务器: {config.DB_HOST}:{config.DB_PORT}") + print(f"成功连接到MySQL服务器: {settings.db_host}:{settings.db_port}") return connection except Exception as e: print(f"连接数据库失败: {e}") @@ -43,9 +44,9 @@ def create_database(connection): """创建数据库""" try: cursor = connection.cursor() - cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{config.DB_NAME}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci") - cursor.execute(f"USE `{config.DB_NAME}`") - print(f"数据库 '{config.DB_NAME}' 创建/选择成功") + cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{settings.db_name}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci") + cursor.execute(f"USE `{settings.db_name}`") + print(f"数据库 '{settings.db_name}' 创建/选择成功") return True except Exception as e: print(f"创建数据库失败: {e}") @@ -56,18 +57,18 @@ def execute_sql_file(connection, sql_file_path, description=""): if not os.path.exists(sql_file_path): print(f"警告: SQL文件不存在: {sql_file_path}") return False - + try: cursor = connection.cursor() with open(sql_file_path, 'r', encoding='utf-8') as f: sql_content = f.read() - + # 分割SQL语句(简单实现,按分号分割) sql_statements = [stmt.strip() for stmt in sql_content.split(';') if stmt.strip()] - + success_count = 0 error_count = 0 - + for stmt in sql_statements: if not stmt or stmt.startswith('--'): continue @@ -77,10 +78,10 @@ def execute_sql_file(connection, sql_file_path, description=""): except Exception as e: error_count += 1 print(f"执行SQL语句失败: {str(e)[:100]}...") - + print(f"{description} - 成功执行: {success_count} 条语句, 失败: {error_count} 条语句") return error_count == 0 - + except Exception as e: print(f"执行SQL文件失败 {sql_file_path}: {e}") return False @@ -90,44 +91,44 @@ def main(): print("=" * 60) print("MindSpider AI爬虫项目 - 数据库初始化") print("=" * 60) - + # 检查配置 print("检查数据库配置...") - print(f"数据库主机: {config.DB_HOST}") - print(f"数据库端口: {config.DB_PORT}") - print(f"数据库名称: {config.DB_NAME}") - print(f"数据库用户: {config.DB_USER}") - print(f"字符集: {config.DB_CHARSET}") + print(f"数据库主机: {settings.db_host}") + print(f"数据库端口: {settings.db_port}") + print(f"数据库名称: {settings.db_name}") + print(f"数据库用户: {settings.db_user}") + print(f"字符集: {settings.db_charset}") print() - + # 创建数据库连接 print("正在连接数据库...") connection = create_database_connection() if not connection: print("数据库初始化失败!") return False - + try: # 创建数据库 print("正在创建/选择数据库...") if not create_database(connection): return False - + # 获取SQL文件路径 schema_dir = Path(__file__).parent mediacrawler_sql = schema_dir.parent / "DeepSentimentCrawling" / "MediaCrawler" / "schema" / "tables.sql" mindspider_sql = schema_dir / "mindspider_tables.sql" - + print() print("开始执行SQL脚本...") - + # 1. 执行MediaCrawler的原始表结构 if mediacrawler_sql.exists(): print("1. 创建MediaCrawler基础表...") execute_sql_file(connection, str(mediacrawler_sql), "MediaCrawler基础表") else: print("警告: MediaCrawler SQL文件不存在,跳过基础表创建") - + # 2. 执行MindSpider扩展表结构 print("2. 创建MindSpider扩展表...") if mindspider_sql.exists(): @@ -135,18 +136,18 @@ def main(): else: print("错误: MindSpider SQL文件不存在") return False - + print() print("=" * 60) print("数据库初始化完成!") print("=" * 60) - + # 显示创建的表 cursor = connection.cursor() cursor.execute("SHOW TABLES") tables = cursor.fetchall() - - print(f"数据库 '{config.DB_NAME}' 中共创建了 {len(tables)} 个表:") + + print(f"数据库 '{settings.db_name}' 中共创建了 {len(tables)} 个表:") for table in tables: print(f" - {table[0]}")