diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/tools/words.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/tools/words.py index 50ab00f..15e9590 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/tools/words.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/tools/words.py @@ -35,6 +35,8 @@ class AsyncWordCloudGenerator: jieba.add_word(word) def load_stop_words(self): + if not os.path.exists(self.stop_words_file): + return set() with open(self.stop_words_file, 'r', encoding='utf-8') as f: return set(f.read().strip().split('\n')) diff --git a/MindSpider/schema/db_manager.py b/MindSpider/schema/db_manager.py index dcf2fc1..568df6e 100644 --- a/MindSpider/schema/db_manager.py +++ b/MindSpider/schema/db_manager.py @@ -24,7 +24,7 @@ except ImportError: logger.error("错误: 无法导入config.py配置文件") sys.exit(1) -from MindSpider.config import settings +from config import settings class DatabaseManager: def __init__(self): diff --git a/MindSpider/schema/init_database.py b/MindSpider/schema/init_database.py index 70e3825..d561625 100644 --- a/MindSpider/schema/init_database.py +++ b/MindSpider/schema/init_database.py @@ -1,169 +1,119 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- """ -MindSpider AI爬虫项目 - 数据库初始化脚本 -用于创建项目所需的所有数据库表 +MindSpider 数据库初始化(SQLAlchemy 2.x 异步引擎) + +此脚本创建 MindSpider 扩展表(与 MediaCrawler 原始表分离)。 +支持 MySQL 与 PostgreSQL,需已有可连接的数据库实例。 + +数据模型定义位置: +- MindSpider/schema/models_sa.py """ +from __future__ import annotations + +import asyncio import os +from typing import Optional + +from loguru import logger + +from sqlalchemy.ext.asyncio import create_async_engine +from sqlalchemy import text + +from models_sa import Base + +# 导入 models_bigdata 以确保所有表类被注册到 Base.metadata +# models_bigdata 现在也使用 models_sa 的 Base,所以所有表都在同一个 metadata 中 +import models_bigdata # noqa: F401 # 导入以注册所有表类 import sys -import pymysql from pathlib import Path -from MindSpider.config import settings # 添加项目根目录到路径 project_root = Path(__file__).parent.parent sys.path.append(str(project_root)) -# 导入配置 -try: - import config -except ImportError: - print("错误: 无法导入config.py配置文件") - print("请确保config.py文件存在于项目根目录") - sys.exit(1) +from config import settings -def create_database_connection(): - """创建数据库连接""" - try: - connection = pymysql.connect( - host=settings.db_host, - port=settings.db_port, - user=settings.db_user, - password=settings.db_password, - charset=settings.db_charset, - autocommit=True - ) - print(f"成功连接到MySQL服务器: {settings.db_host}:{settings.db_port}") - return connection - except Exception as e: - print(f"连接数据库失败: {e}") - return None +def _env(key: str, default: Optional[str] = None) -> Optional[str]: + v = os.getenv(key) + return v if v not in (None, "") else default -def create_database(connection): - """创建数据库""" - try: - cursor = connection.cursor() - cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{settings.db_name}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci") - cursor.execute(f"USE `{settings.db_name}`") - print(f"数据库 '{settings.db_name}' 创建/选择成功") - return True - except Exception as e: - print(f"创建数据库失败: {e}") - return False -def execute_sql_file(connection, sql_file_path, description=""): - """执行SQL文件""" - if not os.path.exists(sql_file_path): - print(f"警告: SQL文件不存在: {sql_file_path}") - return False +def _build_database_url() -> str: + # 优先 DATABASE_URL + database_url = settings.DATABASE_URL if hasattr(settings, "DATABASE_URL") else None + if database_url: + return database_url - try: - cursor = connection.cursor() - with open(sql_file_path, 'r', encoding='utf-8') as f: - sql_content = f.read() + dialect = (settings.DB_DIALECT or "mysql").lower() + host = settings.DB_HOST or "localhost" + port = str(settings.DB_PORT or ("3306" if dialect == "mysql" else "5432")) + user = settings.DB_USER or "root" + password = settings.DB_PASSWORD or "" + db_name = settings.DB_NAME or "mindspider" - # 分割SQL语句(简单实现,按分号分割) - sql_statements = [stmt.strip() for stmt in sql_content.split(';') if stmt.strip()] + if dialect in ("postgresql", "postgres"): + return f"postgresql+asyncpg://{user}:{password}@{host}:{port}/{db_name}" - success_count = 0 - error_count = 0 + return f"mysql+aiomysql://{user}:{password}@{host}:{port}/{db_name}" - for stmt in sql_statements: - if not stmt or stmt.startswith('--'): - continue - try: - cursor.execute(stmt) - success_count += 1 - except Exception as e: - error_count += 1 - print(f"执行SQL语句失败: {str(e)[:100]}...") - print(f"{description} - 成功执行: {success_count} 条语句, 失败: {error_count} 条语句") - return error_count == 0 +async def _create_views_if_needed(engine_dialect: str): + # 视图为可选;仅当业务需要时创建。两端使用通用 SQL 聚合避免方言函数。 + # 如不需要视图,可跳过。 + engine_dialect = engine_dialect.lower() + v_topic_crawling_stats = ( + "CREATE OR REPLACE VIEW v_topic_crawling_stats AS\n" + "SELECT dt.topic_id, dt.topic_name, dt.extract_date, dt.processing_status,\n" + " COUNT(DISTINCT ct.task_id) AS total_tasks,\n" + " SUM(CASE WHEN ct.task_status = 'completed' THEN 1 ELSE 0 END) AS completed_tasks,\n" + " SUM(CASE WHEN ct.task_status = 'failed' THEN 1 ELSE 0 END) AS failed_tasks,\n" + " SUM(COALESCE(ct.total_crawled,0)) AS total_content_crawled,\n" + " SUM(COALESCE(ct.success_count,0)) AS total_success_count,\n" + " SUM(COALESCE(ct.error_count,0)) AS total_error_count\n" + "FROM daily_topics dt\n" + "LEFT JOIN crawling_tasks ct ON dt.topic_id = ct.topic_id\n" + "GROUP BY dt.topic_id, dt.topic_name, dt.extract_date, dt.processing_status" + ) - except Exception as e: - print(f"执行SQL文件失败 {sql_file_path}: {e}") - return False + v_daily_summary = ( + "CREATE OR REPLACE VIEW v_daily_summary AS\n" + "SELECT dn.crawl_date AS crawl_date,\n" + " COUNT(DISTINCT dn.news_id) AS total_news,\n" + " COUNT(DISTINCT dn.source_platform) AS platforms_covered,\n" + " (SELECT COUNT(*) FROM daily_topics WHERE extract_date = dn.crawl_date) AS topics_extracted,\n" + " (SELECT COUNT(*) FROM crawling_tasks WHERE scheduled_date = dn.crawl_date) AS tasks_created\n" + "FROM daily_news dn\n" + "GROUP BY dn.crawl_date\n" + "ORDER BY dn.crawl_date DESC" + ) -def main(): - """主函数""" - print("=" * 60) - print("MindSpider AI爬虫项目 - 数据库初始化") - print("=" * 60) + # PostgreSQL 的 CREATE OR REPLACE VIEW 也可用;两端均执行 + from sqlalchemy.ext.asyncio import AsyncEngine + engine: AsyncEngine = create_async_engine(_build_database_url()) + async with engine.begin() as conn: + await conn.execute(text(v_topic_crawling_stats)) + await conn.execute(text(v_daily_summary)) + await engine.dispose() - # 检查配置 - print("检查数据库配置...") - print(f"数据库主机: {settings.db_host}") - print(f"数据库端口: {settings.db_port}") - print(f"数据库名称: {settings.db_name}") - print(f"数据库用户: {settings.db_user}") - print(f"字符集: {settings.db_charset}") - print() - # 创建数据库连接 - print("正在连接数据库...") - connection = create_database_connection() - if not connection: - print("数据库初始化失败!") - return False +async def main() -> None: + database_url = _build_database_url() + engine = create_async_engine(database_url, pool_pre_ping=True, pool_recycle=1800) - try: - # 创建数据库 - print("正在创建/选择数据库...") - if not create_database(connection): - return False + # 由于 models_bigdata 和 models_sa 现在共享同一个 Base,所有表都在同一个 metadata 中 + # 只需创建一次,SQLAlchemy 会自动处理表之间的依赖关系 + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) - # 获取SQL文件路径 - schema_dir = Path(__file__).parent - mediacrawler_sql = schema_dir.parent / "DeepSentimentCrawling" / "MediaCrawler" / "schema" / "tables.sql" - mindspider_sql = schema_dir / "mindspider_tables.sql" + # 保持原有视图创建和释放逻辑 + dialect_name = engine.url.get_backend_name() + await _create_views_if_needed(dialect_name) - print() - print("开始执行SQL脚本...") + await engine.dispose() + logger.info("[init_database_sa] 数据表与视图创建完成") - # 1. 执行MediaCrawler的原始表结构 - if mediacrawler_sql.exists(): - print("1. 创建MediaCrawler基础表...") - execute_sql_file(connection, str(mediacrawler_sql), "MediaCrawler基础表") - else: - print("警告: MediaCrawler SQL文件不存在,跳过基础表创建") - - # 2. 执行MindSpider扩展表结构 - print("2. 创建MindSpider扩展表...") - if mindspider_sql.exists(): - execute_sql_file(connection, str(mindspider_sql), "MindSpider扩展表") - else: - print("错误: MindSpider SQL文件不存在") - return False - - print() - print("=" * 60) - print("数据库初始化完成!") - print("=" * 60) - - # 显示创建的表 - cursor = connection.cursor() - cursor.execute("SHOW TABLES") - tables = cursor.fetchall() - - print(f"数据库 '{settings.db_name}' 中共创建了 {len(tables)} 个表:") - for table in tables: - print(f" - {table[0]}") - - print() - print("数据库初始化成功完成!您现在可以开始使用MindSpider了。") - return True - - except Exception as e: - print(f"数据库初始化过程中发生错误: {e}") - return False - - finally: - if connection: - connection.close() - print("数据库连接已关闭") if __name__ == "__main__": - success = main() - sys.exit(0 if success else 1) + asyncio.run(main()) + + diff --git a/MindSpider/schema/init_database_sa.py b/MindSpider/schema/init_database_sa.py deleted file mode 100644 index d561625..0000000 --- a/MindSpider/schema/init_database_sa.py +++ /dev/null @@ -1,119 +0,0 @@ -""" -MindSpider 数据库初始化(SQLAlchemy 2.x 异步引擎) - -此脚本创建 MindSpider 扩展表(与 MediaCrawler 原始表分离)。 -支持 MySQL 与 PostgreSQL,需已有可连接的数据库实例。 - -数据模型定义位置: -- MindSpider/schema/models_sa.py -""" - -from __future__ import annotations - -import asyncio -import os -from typing import Optional - -from loguru import logger - -from sqlalchemy.ext.asyncio import create_async_engine -from sqlalchemy import text - -from models_sa import Base - -# 导入 models_bigdata 以确保所有表类被注册到 Base.metadata -# models_bigdata 现在也使用 models_sa 的 Base,所以所有表都在同一个 metadata 中 -import models_bigdata # noqa: F401 # 导入以注册所有表类 -import sys -from pathlib import Path - -# 添加项目根目录到路径 -project_root = Path(__file__).parent.parent -sys.path.append(str(project_root)) - -from config import settings - -def _env(key: str, default: Optional[str] = None) -> Optional[str]: - v = os.getenv(key) - return v if v not in (None, "") else default - - -def _build_database_url() -> str: - # 优先 DATABASE_URL - database_url = settings.DATABASE_URL if hasattr(settings, "DATABASE_URL") else None - if database_url: - return database_url - - dialect = (settings.DB_DIALECT or "mysql").lower() - host = settings.DB_HOST or "localhost" - port = str(settings.DB_PORT or ("3306" if dialect == "mysql" else "5432")) - user = settings.DB_USER or "root" - password = settings.DB_PASSWORD or "" - db_name = settings.DB_NAME or "mindspider" - - if dialect in ("postgresql", "postgres"): - return f"postgresql+asyncpg://{user}:{password}@{host}:{port}/{db_name}" - - return f"mysql+aiomysql://{user}:{password}@{host}:{port}/{db_name}" - - -async def _create_views_if_needed(engine_dialect: str): - # 视图为可选;仅当业务需要时创建。两端使用通用 SQL 聚合避免方言函数。 - # 如不需要视图,可跳过。 - engine_dialect = engine_dialect.lower() - v_topic_crawling_stats = ( - "CREATE OR REPLACE VIEW v_topic_crawling_stats AS\n" - "SELECT dt.topic_id, dt.topic_name, dt.extract_date, dt.processing_status,\n" - " COUNT(DISTINCT ct.task_id) AS total_tasks,\n" - " SUM(CASE WHEN ct.task_status = 'completed' THEN 1 ELSE 0 END) AS completed_tasks,\n" - " SUM(CASE WHEN ct.task_status = 'failed' THEN 1 ELSE 0 END) AS failed_tasks,\n" - " SUM(COALESCE(ct.total_crawled,0)) AS total_content_crawled,\n" - " SUM(COALESCE(ct.success_count,0)) AS total_success_count,\n" - " SUM(COALESCE(ct.error_count,0)) AS total_error_count\n" - "FROM daily_topics dt\n" - "LEFT JOIN crawling_tasks ct ON dt.topic_id = ct.topic_id\n" - "GROUP BY dt.topic_id, dt.topic_name, dt.extract_date, dt.processing_status" - ) - - v_daily_summary = ( - "CREATE OR REPLACE VIEW v_daily_summary AS\n" - "SELECT dn.crawl_date AS crawl_date,\n" - " COUNT(DISTINCT dn.news_id) AS total_news,\n" - " COUNT(DISTINCT dn.source_platform) AS platforms_covered,\n" - " (SELECT COUNT(*) FROM daily_topics WHERE extract_date = dn.crawl_date) AS topics_extracted,\n" - " (SELECT COUNT(*) FROM crawling_tasks WHERE scheduled_date = dn.crawl_date) AS tasks_created\n" - "FROM daily_news dn\n" - "GROUP BY dn.crawl_date\n" - "ORDER BY dn.crawl_date DESC" - ) - - # PostgreSQL 的 CREATE OR REPLACE VIEW 也可用;两端均执行 - from sqlalchemy.ext.asyncio import AsyncEngine - engine: AsyncEngine = create_async_engine(_build_database_url()) - async with engine.begin() as conn: - await conn.execute(text(v_topic_crawling_stats)) - await conn.execute(text(v_daily_summary)) - await engine.dispose() - - -async def main() -> None: - database_url = _build_database_url() - engine = create_async_engine(database_url, pool_pre_ping=True, pool_recycle=1800) - - # 由于 models_bigdata 和 models_sa 现在共享同一个 Base,所有表都在同一个 metadata 中 - # 只需创建一次,SQLAlchemy 会自动处理表之间的依赖关系 - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.create_all) - - # 保持原有视图创建和释放逻辑 - dialect_name = engine.url.get_backend_name() - await _create_views_if_needed(dialect_name) - - await engine.dispose() - logger.info("[init_database_sa] 数据表与视图创建完成") - - -if __name__ == "__main__": - asyncio.run(main()) - - diff --git a/requirements.txt b/requirements.txt index 90665d4..b442e3a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -34,6 +34,8 @@ pymysql==1.1.0 aiomysql==0.2.0 aiosqlite==0.21.0 redis>=4.6.0 +SQLAlchemy==2.0.35 +asyncpg==0.29.0 # ===== 爬虫相关 ===== playwright==1.45.0 @@ -64,6 +66,7 @@ tqdm>=4.65.0 tenacity==8.2.2 loguru>=0.7.0 pydantic==2.5.2 +pydantic-settings==2.2.1 # ===== 开发工具(可选) ===== pytest>=7.4.0