309 lines
10 KiB
Python
309 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
BroadTopicExtraction模块 - 新闻获取和收集
|
|
整合新闻API调用和数据库存储功能
|
|
"""
|
|
|
|
import sys
|
|
import asyncio
|
|
import httpx
|
|
import json
|
|
from datetime import datetime, date
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional
|
|
from loguru import logger
|
|
|
|
# 添加项目根目录到路径
|
|
project_root = Path(__file__).parent.parent
|
|
sys.path.append(str(project_root))
|
|
|
|
try:
|
|
from BroadTopicExtraction.database_manager import DatabaseManager
|
|
except ImportError as e:
|
|
raise ImportError(f"导入模块失败: {e}")
|
|
|
|
# 新闻API基础URL
|
|
BASE_URL = "https://newsnow.busiyi.world"
|
|
|
|
# 新闻源中文名称映射
|
|
SOURCE_NAMES = {
|
|
"weibo": "微博热搜",
|
|
"zhihu": "知乎热榜",
|
|
"bilibili-hot-search": "B站热搜",
|
|
"toutiao": "今日头条",
|
|
"douyin": "抖音热榜",
|
|
"github-trending-today": "GitHub趋势",
|
|
"coolapk": "酷安热榜",
|
|
"tieba": "百度贴吧",
|
|
"wallstreetcn": "华尔街见闻",
|
|
"thepaper": "澎湃新闻",
|
|
"cls-hot": "财联社",
|
|
"xueqiu": "雪球热榜"
|
|
}
|
|
|
|
class NewsCollector:
|
|
"""新闻收集器 - 整合API调用和数据库存储"""
|
|
|
|
def __init__(self):
|
|
"""初始化新闻收集器"""
|
|
self.db_manager = DatabaseManager()
|
|
self.supported_sources = list(SOURCE_NAMES.keys())
|
|
|
|
def close(self):
|
|
"""关闭资源"""
|
|
if self.db_manager:
|
|
self.db_manager.close()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
|
|
async def __aenter__(self):
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
|
|
# ==================== 新闻API调用 ====================
|
|
|
|
async def fetch_news(self, source: str) -> dict:
|
|
"""从指定源获取最新新闻"""
|
|
url = f"{BASE_URL}/api/s?id={source}&latest"
|
|
headers = {
|
|
"Accept": "application/json, text/plain, */*",
|
|
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/124.0.0.0 Safari/537.36"
|
|
),
|
|
"Referer": BASE_URL,
|
|
"Connection": "keep-alive",
|
|
}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
|
|
response = await client.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
|
|
# 解析JSON响应
|
|
data = response.json()
|
|
return {
|
|
"source": source,
|
|
"status": "success",
|
|
"data": data,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
except httpx.TimeoutException:
|
|
return {
|
|
"source": source,
|
|
"status": "timeout",
|
|
"error": f"请求超时: {source}({url})",
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
except httpx.HTTPStatusError as e:
|
|
return {
|
|
"source": source,
|
|
"status": "http_error",
|
|
"error": f"HTTP错误: {source}({url}) - {e.response.status_code}",
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"source": source,
|
|
"status": "error",
|
|
"error": f"未知错误: {source}({url}) - {str(e)}",
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
async def get_popular_news(self, sources: List[str] = None) -> List[dict]:
|
|
"""获取热门新闻"""
|
|
if sources is None:
|
|
sources = list(SOURCE_NAMES.keys())
|
|
|
|
logger.info(f"正在获取 {len(sources)} 个新闻源的最新内容...")
|
|
logger.info("=" * 80)
|
|
|
|
results = []
|
|
for source in sources:
|
|
source_name = SOURCE_NAMES.get(source, source)
|
|
logger.info(f"正在获取 {source_name} 的新闻...")
|
|
result = await self.fetch_news(source)
|
|
results.append(result)
|
|
|
|
if result["status"] == "success":
|
|
data = result["data"]
|
|
if 'items' in data and isinstance(data['items'], list):
|
|
count = len(data['items'])
|
|
logger.info(f"✓ {source_name}: 获取成功,共 {count} 条新闻")
|
|
else:
|
|
logger.info(f"✓ {source_name}: 获取成功")
|
|
else:
|
|
logger.error(f"✗ {source_name}: {result.get('error', '获取失败')}")
|
|
|
|
# 避免请求过快
|
|
await asyncio.sleep(0.5)
|
|
|
|
return results
|
|
|
|
# ==================== 数据处理和存储 ====================
|
|
|
|
async def collect_and_save_news(self, sources: Optional[List[str]] = None) -> Dict:
|
|
"""
|
|
收集并保存每日热点新闻
|
|
|
|
Args:
|
|
sources: 指定的新闻源列表,None表示使用所有支持的源
|
|
|
|
Returns:
|
|
包含收集结果的字典
|
|
"""
|
|
collection_summary_message = ""
|
|
collection_summary_message += "\n开始收集每日热点新闻...\n"
|
|
collection_summary_message += f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
|
|
|
|
# 选择新闻源
|
|
if sources is None:
|
|
# 使用所有支持的新闻源
|
|
sources = list(SOURCE_NAMES.keys())
|
|
|
|
collection_summary_message += f"将从 {len(sources)} 个新闻源收集数据:\n"
|
|
for source in sources:
|
|
source_name = SOURCE_NAMES.get(source, source)
|
|
collection_summary_message += f" - {source_name}\n"
|
|
|
|
logger.info(collection_summary_message)
|
|
|
|
try:
|
|
# 获取新闻数据
|
|
results = await self.get_popular_news(sources)
|
|
|
|
# 处理结果
|
|
processed_data = self._process_news_results(results)
|
|
|
|
# 保存到数据库(覆盖模式)
|
|
if processed_data['news_list']:
|
|
saved_count = self.db_manager.save_daily_news(
|
|
processed_data['news_list'],
|
|
date.today()
|
|
)
|
|
processed_data['saved_count'] = saved_count
|
|
|
|
# 打印统计信息
|
|
self._print_collection_summary(processed_data)
|
|
|
|
return processed_data
|
|
|
|
except Exception as e:
|
|
logger.exception(f"收集新闻失败: {e}")
|
|
return {
|
|
'success': False,
|
|
'error': str(e),
|
|
'news_list': [],
|
|
'total_news': 0
|
|
}
|
|
|
|
def _process_news_results(self, results: List[Dict]) -> Dict:
|
|
"""处理新闻获取结果"""
|
|
news_list = []
|
|
successful_sources = 0
|
|
total_news = 0
|
|
|
|
for result in results:
|
|
source = result['source']
|
|
status = result['status']
|
|
|
|
if status == 'success':
|
|
successful_sources += 1
|
|
data = result['data']
|
|
|
|
if 'items' in data and isinstance(data['items'], list):
|
|
source_news_count = len(data['items'])
|
|
total_news += source_news_count
|
|
|
|
# 处理该源的新闻
|
|
for i, item in enumerate(data['items'], 1):
|
|
processed_news = self._process_news_item(item, source, i)
|
|
if processed_news:
|
|
news_list.append(processed_news)
|
|
|
|
return {
|
|
'success': True,
|
|
'news_list': news_list,
|
|
'successful_sources': successful_sources,
|
|
'total_sources': len(results),
|
|
'total_news': total_news,
|
|
'collection_time': datetime.now().isoformat()
|
|
}
|
|
|
|
def _process_news_item(self, item: Dict, source: str, rank: int) -> Optional[Dict]:
|
|
"""处理单条新闻"""
|
|
try:
|
|
if isinstance(item, dict):
|
|
title = item.get('title', '无标题').strip()
|
|
url = item.get('url', '')
|
|
|
|
# 生成新闻ID
|
|
news_id = f"{source}_{item.get('id', f'rank_{rank}')}"
|
|
|
|
return {
|
|
'id': news_id,
|
|
'title': title,
|
|
'url': url,
|
|
'source': source,
|
|
'rank': rank
|
|
}
|
|
else:
|
|
# 处理字符串类型的新闻
|
|
title = str(item)[:100] if len(str(item)) > 100 else str(item)
|
|
return {
|
|
'id': f"{source}_rank_{rank}",
|
|
'title': title,
|
|
'url': '',
|
|
'source': source,
|
|
'rank': rank
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.exception(f"处理新闻项失败: {e}")
|
|
return None
|
|
|
|
def _print_collection_summary(self, data: Dict):
|
|
"""打印收集摘要"""
|
|
collection_summary_message = ""
|
|
collection_summary_message += f"\n总新闻源: {data['total_sources']}\n"
|
|
collection_summary_message += f"成功源数: {data['successful_sources']}\n"
|
|
collection_summary_message += f"总新闻数: {data['total_news']}\n"
|
|
if 'saved_count' in data:
|
|
collection_summary_message += f"已保存数: {data['saved_count']}\n"
|
|
logger.info(collection_summary_message)
|
|
|
|
def get_today_news(self) -> List[Dict]:
|
|
"""获取今天的新闻"""
|
|
try:
|
|
return self.db_manager.get_daily_news(date.today())
|
|
except Exception as e:
|
|
logger.exception(f"获取今日新闻失败: {e}")
|
|
return []
|
|
|
|
async def main():
|
|
"""测试新闻收集器"""
|
|
logger.info("测试新闻收集器...")
|
|
|
|
async with NewsCollector() as collector:
|
|
# 收集新闻
|
|
result = await collector.collect_and_save_news(
|
|
sources=["weibo", "zhihu"] # 测试用,只使用两个源
|
|
)
|
|
|
|
if result['success']:
|
|
logger.info(f"收集成功!共获取 {result['total_news']} 条新闻")
|
|
else:
|
|
logger.error(f"收集失败: {result.get('error', '未知错误')}")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|