Files
bettafish-company/MindSpider/BroadTopicExtraction/get_today_news.py
T
2025-08-27 13:49:07 +08:00

301 lines
9.8 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
BroadTopicExtraction模块 - 新闻获取和收集
整合新闻API调用和数据库存储功能
"""
import sys
import asyncio
import httpx
import json
from datetime import datetime, date
from pathlib import Path
from typing import List, Dict, Optional
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
sys.path.append(str(project_root))
try:
from BroadTopicExtraction.database_manager import DatabaseManager
except ImportError as e:
raise ImportError(f"导入模块失败: {e}")
# 新闻API基础URL
BASE_URL = "https://newsnow.busiyi.world"
# 新闻源中文名称映射
SOURCE_NAMES = {
"weibo": "微博热搜",
"zhihu": "知乎热榜",
"bilibili-hot-search": "B站热搜",
"toutiao": "今日头条",
"douyin": "抖音热榜",
"github-trending-today": "GitHub趋势",
"coolapk": "酷安热榜",
"tieba": "百度贴吧",
"wallstreetcn": "华尔街见闻",
"thepaper": "澎湃新闻",
"cls-hot": "财联社",
"xueqiu": "雪球热榜",
"kuaishou": "快手热榜"
}
class NewsCollector:
"""新闻收集器 - 整合API调用和数据库存储"""
def __init__(self):
"""初始化新闻收集器"""
self.db_manager = DatabaseManager()
self.supported_sources = list(SOURCE_NAMES.keys())
def close(self):
"""关闭资源"""
if self.db_manager:
self.db_manager.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.close()
# ==================== 新闻API调用 ====================
async def fetch_news(self, source: str) -> dict:
"""从指定源获取最新新闻"""
url = f"{BASE_URL}/api/s?id={source}&latest"
headers = {"Accept": "application/json"}
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
# 解析JSON响应
data = json.loads(response.text)
return {
"source": source,
"status": "success",
"data": data,
"timestamp": datetime.now().isoformat()
}
except httpx.TimeoutException:
return {
"source": source,
"status": "timeout",
"error": "请求超时",
"timestamp": datetime.now().isoformat()
}
except httpx.HTTPStatusError as e:
return {
"source": source,
"status": "http_error",
"error": f"HTTP错误: {e.response.status_code}",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"source": source,
"status": "error",
"error": f"未知错误: {str(e)}",
"timestamp": datetime.now().isoformat()
}
async def get_popular_news(self, sources: List[str] = None) -> List[dict]:
"""获取热门新闻"""
if sources is None:
sources = list(SOURCE_NAMES.keys())
print(f"正在获取 {len(sources)} 个新闻源的最新内容...")
print("=" * 80)
results = []
for source in sources:
source_name = SOURCE_NAMES.get(source, source)
print(f"正在获取 {source_name} 的新闻...")
result = await self.fetch_news(source)
results.append(result)
if result["status"] == "success":
data = result["data"]
if 'items' in data and isinstance(data['items'], list):
count = len(data['items'])
print(f"{source_name}: 获取成功,共 {count} 条新闻")
else:
print(f"{source_name}: 获取成功")
else:
print(f"{source_name}: {result.get('error', '获取失败')}")
# 避免请求过快
await asyncio.sleep(0.5)
return results
# ==================== 数据处理和存储 ====================
async def collect_and_save_news(self, sources: Optional[List[str]] = None) -> Dict:
"""
收集并保存每日热点新闻
Args:
sources: 指定的新闻源列表,None表示使用所有支持的源
Returns:
包含收集结果的字典
"""
print(f"开始收集每日热点新闻...")
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 选择新闻源
if sources is None:
# 使用所有支持的新闻源
sources = list(SOURCE_NAMES.keys())
print(f"将从 {len(sources)} 个新闻源收集数据:")
for source in sources:
source_name = SOURCE_NAMES.get(source, source)
print(f" - {source_name}")
try:
# 获取新闻数据
results = await self.get_popular_news(sources)
# 处理结果
processed_data = self._process_news_results(results)
# 保存到数据库(覆盖模式)
if processed_data['news_list']:
saved_count = self.db_manager.save_daily_news(
processed_data['news_list'],
date.today()
)
processed_data['saved_count'] = saved_count
# 打印统计信息
self._print_collection_summary(processed_data)
return processed_data
except Exception as e:
print(f"收集新闻失败: {e}")
return {
'success': False,
'error': str(e),
'news_list': [],
'total_news': 0
}
def _process_news_results(self, results: List[Dict]) -> Dict:
"""处理新闻获取结果"""
news_list = []
successful_sources = 0
total_news = 0
for result in results:
source = result['source']
status = result['status']
if status == 'success':
successful_sources += 1
data = result['data']
if 'items' in data and isinstance(data['items'], list):
source_news_count = len(data['items'])
total_news += source_news_count
# 处理该源的新闻
for i, item in enumerate(data['items'], 1):
processed_news = self._process_news_item(item, source, i)
if processed_news:
news_list.append(processed_news)
return {
'success': True,
'news_list': news_list,
'successful_sources': successful_sources,
'total_sources': len(results),
'total_news': total_news,
'collection_time': datetime.now().isoformat()
}
def _process_news_item(self, item: Dict, source: str, rank: int) -> Optional[Dict]:
"""处理单条新闻"""
try:
if isinstance(item, dict):
title = item.get('title', '无标题').strip()
url = item.get('url', '')
# 生成新闻ID
news_id = f"{source}_{item.get('id', f'rank_{rank}')}"
return {
'id': news_id,
'title': title,
'url': url,
'source': source,
'rank': rank
}
else:
# 处理字符串类型的新闻
title = str(item)[:100] if len(str(item)) > 100 else str(item)
return {
'id': f"{source}_rank_{rank}",
'title': title,
'url': '',
'source': source,
'rank': rank
}
except Exception as e:
print(f"处理新闻项失败: {e}")
return None
def _print_collection_summary(self, data: Dict):
"""打印收集摘要"""
print("\n" + "=" * 50)
print("新闻收集摘要")
print("=" * 50)
print(f"总新闻源: {data['total_sources']}")
print(f"成功源数: {data['successful_sources']}")
print(f"总新闻数: {data['total_news']}")
if 'saved_count' in data:
print(f"已保存数: {data['saved_count']}")
print("=" * 50)
def get_today_news(self) -> List[Dict]:
"""获取今天的新闻"""
try:
return self.db_manager.get_daily_news(date.today())
except Exception as e:
print(f"获取今日新闻失败: {e}")
return []
async def main():
"""测试新闻收集器"""
print("测试新闻收集器...")
async with NewsCollector() as collector:
# 收集新闻
result = await collector.collect_and_save_news(
sources=["weibo", "zhihu"] # 测试用,只使用两个源
)
if result['success']:
print(f"收集成功!共获取 {result['total_news']} 条新闻")
else:
print(f"收集失败: {result.get('error', '未知错误')}")
if __name__ == "__main__":
asyncio.run(main())