diff --git a/spider/spiderData.py b/spider/spiderData.py index 57a2e26..82987cb 100644 --- a/spider/spiderData.py +++ b/spider/spiderData.py @@ -11,6 +11,9 @@ from bs4 import BeautifulSoup from datetime import datetime from utils.logger import spider_logger as logging from utils.db_manager import DatabaseManager +from cachetools import TTLCache, LRUCache +from typing import List, Dict, Any +import pandas as pd def spiderData(): if not os.path.exists(navAddr): @@ -28,17 +31,70 @@ class SpiderData: } self.base_url = 'https://s.weibo.com' self.db = DatabaseManager() - - def crawl_topic(self, topic, depth=3, interval=5, max_retries=3, timeout=30): - """ - 爬取指定话题的微博内容 - :param topic: 要爬取的话题 - :param depth: 爬取深度(页数) - :param interval: 请求间隔时间(秒) - :param max_retries: 最大重试次数 - :param timeout: 请求超时时间(秒) - """ + # 初始化缓存 + self.data_cache = TTLCache(maxsize=1000, ttl=3600) # 1小时TTL缓存 + self.html_cache = LRUCache(maxsize=100) # LRU缓存最近的100个页面 + + # 批量插入缓冲区 + self.insert_buffer = [] + self.buffer_size = 50 # 每50条数据批量插入一次 + + def _get_cached_page(self, url: str) -> str: + """获取缓存的页面内容""" + return self.html_cache.get(url) + + def _cache_page(self, url: str, content: str): + """缓存页面内容""" + self.html_cache[url] = content + + def _get_cached_data(self, key: str) -> Dict[str, Any]: + """获取缓存的数据""" + return self.data_cache.get(key) + + def _cache_data(self, key: str, data: Dict[str, Any]): + """缓存数据""" + self.data_cache[key] = data + + def _flush_buffer(self): + """将缓冲区数据批量插入数据库""" + if not self.insert_buffer: + return + + try: + connection = self.db.get_connection() + with connection.cursor() as cursor: + # 使用pandas进行高效的批量插入 + df = pd.DataFrame(self.insert_buffer) + + # 构建批量插入SQL + columns = ', '.join(df.columns) + values = ', '.join(['%s'] * len(df.columns)) + sql = f""" + INSERT INTO article ({columns}) + VALUES ({values}) + ON DUPLICATE KEY UPDATE + forward_count = VALUES(forward_count), + comment_count = VALUES(comment_count), + like_count = VALUES(like_count), + crawl_time = VALUES(crawl_time) + """ + + # 执行批量插入 + cursor.executemany(sql, df.values.tolist()) + connection.commit() + + logging.info(f"成功批量插入 {len(self.insert_buffer)} 条数据") + self.insert_buffer.clear() + + except Exception as e: + logging.error(f"批量插入数据失败: {e}") + if connection: + connection.rollback() + + def crawl_topic(self, topic: str, depth: int = 3, interval: int = 5, + max_retries: int = 3, timeout: int = 30): + """爬取指定话题的微博内容""" # 参数验证 if not isinstance(depth, int) or depth < 1 or depth > 10: raise ValueError("爬取深度必须在1-10页之间") @@ -56,9 +112,19 @@ class SpiderData: while retries < max_retries: try: url = f"{self.base_url}/weibo?q={topic}&page={page}" + + # 检查缓存 + cached_content = self._get_cached_page(url) + if cached_content: + self._parse_page(cached_content) + logging.info(f"使用缓存数据: {topic} 第 {page} 页") + break + response = requests.get(url, headers=self.headers, timeout=timeout) if response.status_code == 200: + # 缓存页面内容 + self._cache_page(url, response.text) self._parse_page(response.text) logging.info(f"成功爬取话题 {topic} 第 {page} 页") break @@ -84,13 +150,12 @@ class SpiderData: sleep_time = interval * (1 + random.random()) logging.info(f"等待 {sleep_time:.2f} 秒后继续...") time.sleep(sleep_time) - - def _parse_page(self, html_content): - """ - 解析页面内容并保存数据 - :param html_content: 页面HTML内容 - """ + # 最后刷新缓冲区 + self._flush_buffer() + + def _parse_page(self, html_content: str): + """解析页面内容并保存数据""" try: soup = BeautifulSoup(html_content, 'html.parser') weibo_items = soup.find_all('div', class_='card-wrap') @@ -124,8 +189,12 @@ class SpiderData: 'crawl_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } - # 保存到数据库 - self._save_to_database(weibo_data) + # 添加到插入缓冲区 + self.insert_buffer.append(weibo_data) + + # 如果缓冲区达到阈值,执行批量插入 + if len(self.insert_buffer) >= self.buffer_size: + self._flush_buffer() except Exception as e: logging.error(f"解析微博项时出错: {e}") @@ -134,52 +203,12 @@ class SpiderData: except Exception as e: logging.error(f"解析页面时出错: {e}") - def _extract_number(self, text): - """ - 从文本中提取数字 - - :param text: 包含数字的文本 - :return: 提取的数字,如果没有找到则返回0 - """ + def _extract_number(self, text: str) -> int: + """从文本中提取数字""" try: return int(''.join(filter(str.isdigit, text))) except ValueError: return 0 - - def _save_to_database(self, data): - """ - 将数据保存到数据库 - - :param data: 要保存的数据字典 - """ - connection = None - try: - connection = self.db.get_connection() - - with connection.cursor() as cursor: - # 插入文章数据 - sql = """ - INSERT INTO article (content, user_name, publish_time, forward_count, - comment_count, like_count, crawl_time) - VALUES (%s, %s, %s, %s, %s, %s, %s) - """ - cursor.execute(sql, ( - data['content'], - data['user_name'], - data['publish_time'], - data['forward_count'], - data['comment_count'], - data['like_count'], - data['crawl_time'] - )) - - connection.commit() - logging.info(f"成功保存微博数据: {data['content'][:30]}...") - - except Exception as e: - logging.error(f"保存数据时出错: {e}") - if connection: - connection.rollback() if __name__ == '__main__': spiderData() \ No newline at end of file diff --git a/views/spider_control.py b/views/spider_control.py index 7595fc2..bd062a7 100644 --- a/views/spider_control.py +++ b/views/spider_control.py @@ -10,6 +10,10 @@ import logging from spider.spiderData import SpiderData from openai import OpenAI from anthropic import Anthropic +import aiohttp +from concurrent.futures import ThreadPoolExecutor +from ratelimit import limits, sleep_and_retry +from tenacity import retry, stop_after_attempt, wait_exponential # 创建蓝图 spider_bp = Blueprint('spider', __name__) @@ -24,137 +28,150 @@ websocket_connections = set() # 创建消息队列 message_queue = Queue() +# 创建线程池 +thread_pool = ThreadPoolExecutor(max_workers=3) + +# 创建异步事件循环 +loop = asyncio.new_event_loop() +asyncio.set_event_loop(loop) + # 默认配置 DEFAULT_CONFIG = { 'crawlDepth': 3, 'interval': 5, 'maxRetries': 3, - 'timeout': 30 + 'timeout': 30, + 'maxConcurrent': 2 } -def load_config(): - """加载爬虫配置""" - config_path = os.path.join(os.path.dirname(__file__), '../spider/config.json') - try: - if os.path.exists(config_path): - with open(config_path, 'r', encoding='utf-8') as f: - return json.load(f) - except Exception as e: - logger.error(f"加载配置文件失败: {e}") - return DEFAULT_CONFIG +# 限流装饰器 +@sleep_and_retry +@limits(calls=100, period=60) # 每分钟最多100个请求 +def rate_limited_request(): + pass -def save_config(config): - """保存爬虫配置""" - config_path = os.path.join(os.path.dirname(__file__), '../spider/config.json') - try: - with open(config_path, 'w', encoding='utf-8') as f: - json.dump(config, f, ensure_ascii=False, indent=4) - return True - except Exception as e: - logger.error(f"保存配置文件失败: {e}") - return False +class SpiderWorker: + def __init__(self, topics, parameters): + self.topics = topics + self.parameters = parameters + self.total_topics = len(topics) + self.completed_topics = 0 + self.spider = SpiderData() + self.message_buffer = [] + self.message_buffer_size = 10 + self.semaphore = asyncio.Semaphore(parameters.get('maxConcurrent', DEFAULT_CONFIG['maxConcurrent'])) + + async def send_message(self, message): + """异步发送消息,使用缓冲区优化""" + self.message_buffer.append(message) + if len(self.message_buffer) >= self.message_buffer_size: + await self.flush_messages() + + async def flush_messages(self): + """刷新消息缓冲区""" + if not self.message_buffer: + return + + try: + await broadcast_message(self.message_buffer) + self.message_buffer.clear() + except Exception as e: + logger.error(f"发送消息失败: {e}") + + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) + async def crawl_single_topic(self, topic): + """爬取单个话题""" + try: + rate_limited_request() + + await self.send_message({ + 'type': 'log', + 'message': f'开始爬取话题: {topic}' + }) + + async with self.semaphore: + await asyncio.get_event_loop().run_in_executor( + thread_pool, + self.spider.crawl_topic, + topic, + self.parameters['crawlDepth'], + self.parameters['interval'], + self.parameters['maxRetries'], + self.parameters['timeout'] + ) + + self.completed_topics += 1 + progress = int((self.completed_topics / self.total_topics) * 100) + + await self.send_message({ + 'type': 'progress', + 'value': progress + }) + + await self.send_message({ + 'type': 'log', + 'message': f'话题 {topic} 爬取完成' + }) + + except Exception as e: + logger.error(f"爬取话题 {topic} 失败: {e}") + await self.send_message({ + 'type': 'log', + 'message': f'爬取话题 {topic} 时出错: {str(e)}' + }) + raise + + async def run(self): + """运行爬虫任务""" + try: + tasks = [self.crawl_single_topic(topic) for topic in self.topics] + await asyncio.gather(*tasks) + await self.flush_messages() + + await self.send_message({ + 'type': 'log', + 'message': '所有话题爬取完成' + }) + + except Exception as e: + logger.error(f"爬虫任务执行出错: {e}") + await self.send_message({ + 'type': 'log', + 'message': f'爬虫任务执行出错: {str(e)}' + }) + finally: + await self.flush_messages() -async def broadcast_message(message): +async def broadcast_message(messages): """广播消息到所有WebSocket连接""" if not websocket_connections: return for websocket in websocket_connections.copy(): try: - await websocket.send(json.dumps(message)) + if isinstance(messages, list): + for message in messages: + await websocket.send(json.dumps(message)) + else: + await websocket.send(json.dumps(messages)) except websockets.exceptions.ConnectionClosed: websocket_connections.remove(websocket) except Exception as e: logger.error(f"发送WebSocket消息失败: {e}") websocket_connections.remove(websocket) -def spider_worker(topics, parameters): - """爬虫工作线程""" - total_topics = len(topics) - completed_topics = 0 - - async def send_message(message): - """异步发送消息的包装函数""" - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - await broadcast_message(message) - finally: - loop.close() - - try: - spider = SpiderData() - - for topic in topics: - try: - # 更新进度 - progress = int((completed_topics / total_topics) * 100) - asyncio.run(send_message({ - 'type': 'progress', - 'value': progress - })) - - # 发送开始爬取的日志 - asyncio.run(send_message({ - 'type': 'log', - 'message': f'开始爬取话题: {topic}' - })) - - # 执行爬取 - spider.crawl_topic( - topic=topic, - depth=parameters['crawlDepth'], - interval=parameters['interval'], - max_retries=parameters['maxRetries'], - timeout=parameters['timeout'] - ) - - completed_topics += 1 - - # 发送完成爬取的日志 - asyncio.run(send_message({ - 'type': 'log', - 'message': f'话题 {topic} 爬取完成' - })) - - except Exception as e: - # 发送错误日志 - asyncio.run(send_message({ - 'type': 'log', - 'message': f'爬取话题 {topic} 时出错: {str(e)}' - })) - - # 更新最终进度 - asyncio.run(send_message({ - 'type': 'progress', - 'value': 100 - })) - - # 发送完成消息 - asyncio.run(send_message({ - 'type': 'log', - 'message': '所有话题爬取完成' - })) - - except Exception as e: - # 发送错误日志 - asyncio.run(send_message({ - 'type': 'log', - 'message': f'爬虫任务执行出错: {str(e)}' - })) - @spider_bp.route('/spider/control') def spider_control(): """渲染爬虫控制页面""" return render_template('spider_control.html') @spider_bp.route('/api/spider/start', methods=['POST']) -def start_spider(): +async def start_spider(): """启动爬虫任务""" try: data = request.get_json() topics = data.get('topics', []) - parameters = data.get('parameters', DEFAULT_CONFIG) + parameters = {**DEFAULT_CONFIG, **data.get('parameters', {})} if not topics: return jsonify({ @@ -162,13 +179,11 @@ def start_spider(): 'message': '请选择至少一个话题' }) - # 启动爬虫线程 - thread = threading.Thread( - target=spider_worker, - args=(topics, parameters), - daemon=True - ) - thread.start() + # 创建爬虫工作器 + worker = SpiderWorker(topics, parameters) + + # 在事件循环中运行爬虫任务 + asyncio.create_task(worker.run()) return jsonify({ 'success': True,