Spider System Performance & Stability Enhancement

This commit is contained in:
戒酒的李白
2025-03-04 10:45:37 +08:00
parent e0719583fc
commit f85298c021
2 changed files with 211 additions and 167 deletions
+89 -60
View File
@@ -11,6 +11,9 @@ from bs4 import BeautifulSoup
from datetime import datetime from datetime import datetime
from utils.logger import spider_logger as logging from utils.logger import spider_logger as logging
from utils.db_manager import DatabaseManager from utils.db_manager import DatabaseManager
from cachetools import TTLCache, LRUCache
from typing import List, Dict, Any
import pandas as pd
def spiderData(): def spiderData():
if not os.path.exists(navAddr): if not os.path.exists(navAddr):
@@ -28,17 +31,70 @@ class SpiderData:
} }
self.base_url = 'https://s.weibo.com' self.base_url = 'https://s.weibo.com'
self.db = DatabaseManager() self.db = DatabaseManager()
def crawl_topic(self, topic, depth=3, interval=5, max_retries=3, timeout=30):
"""
爬取指定话题的微博内容
:param topic: 要爬取的话题 # 初始化缓存
:param depth: 爬取深度(页数) self.data_cache = TTLCache(maxsize=1000, ttl=3600) # 1小时TTL缓存
:param interval: 请求间隔时间(秒) self.html_cache = LRUCache(maxsize=100) # LRU缓存最近的100个页面
:param max_retries: 最大重试次数
:param timeout: 请求超时时间(秒) # 批量插入缓冲区
""" self.insert_buffer = []
self.buffer_size = 50 # 每50条数据批量插入一次
def _get_cached_page(self, url: str) -> str:
"""获取缓存的页面内容"""
return self.html_cache.get(url)
def _cache_page(self, url: str, content: str):
"""缓存页面内容"""
self.html_cache[url] = content
def _get_cached_data(self, key: str) -> Dict[str, Any]:
"""获取缓存的数据"""
return self.data_cache.get(key)
def _cache_data(self, key: str, data: Dict[str, Any]):
"""缓存数据"""
self.data_cache[key] = data
def _flush_buffer(self):
"""将缓冲区数据批量插入数据库"""
if not self.insert_buffer:
return
try:
connection = self.db.get_connection()
with connection.cursor() as cursor:
# 使用pandas进行高效的批量插入
df = pd.DataFrame(self.insert_buffer)
# 构建批量插入SQL
columns = ', '.join(df.columns)
values = ', '.join(['%s'] * len(df.columns))
sql = f"""
INSERT INTO article ({columns})
VALUES ({values})
ON DUPLICATE KEY UPDATE
forward_count = VALUES(forward_count),
comment_count = VALUES(comment_count),
like_count = VALUES(like_count),
crawl_time = VALUES(crawl_time)
"""
# 执行批量插入
cursor.executemany(sql, df.values.tolist())
connection.commit()
logging.info(f"成功批量插入 {len(self.insert_buffer)} 条数据")
self.insert_buffer.clear()
except Exception as e:
logging.error(f"批量插入数据失败: {e}")
if connection:
connection.rollback()
def crawl_topic(self, topic: str, depth: int = 3, interval: int = 5,
max_retries: int = 3, timeout: int = 30):
"""爬取指定话题的微博内容"""
# 参数验证 # 参数验证
if not isinstance(depth, int) or depth < 1 or depth > 10: if not isinstance(depth, int) or depth < 1 or depth > 10:
raise ValueError("爬取深度必须在1-10页之间") raise ValueError("爬取深度必须在1-10页之间")
@@ -56,9 +112,19 @@ class SpiderData:
while retries < max_retries: while retries < max_retries:
try: try:
url = f"{self.base_url}/weibo?q={topic}&page={page}" url = f"{self.base_url}/weibo?q={topic}&page={page}"
# 检查缓存
cached_content = self._get_cached_page(url)
if cached_content:
self._parse_page(cached_content)
logging.info(f"使用缓存数据: {topic}{page}")
break
response = requests.get(url, headers=self.headers, timeout=timeout) response = requests.get(url, headers=self.headers, timeout=timeout)
if response.status_code == 200: if response.status_code == 200:
# 缓存页面内容
self._cache_page(url, response.text)
self._parse_page(response.text) self._parse_page(response.text)
logging.info(f"成功爬取话题 {topic}{page}") logging.info(f"成功爬取话题 {topic}{page}")
break break
@@ -84,13 +150,12 @@ class SpiderData:
sleep_time = interval * (1 + random.random()) sleep_time = interval * (1 + random.random())
logging.info(f"等待 {sleep_time:.2f} 秒后继续...") logging.info(f"等待 {sleep_time:.2f} 秒后继续...")
time.sleep(sleep_time) time.sleep(sleep_time)
def _parse_page(self, html_content):
"""
解析页面内容并保存数据
:param html_content: 页面HTML内容 # 最后刷新缓冲区
""" self._flush_buffer()
def _parse_page(self, html_content: str):
"""解析页面内容并保存数据"""
try: try:
soup = BeautifulSoup(html_content, 'html.parser') soup = BeautifulSoup(html_content, 'html.parser')
weibo_items = soup.find_all('div', class_='card-wrap') weibo_items = soup.find_all('div', class_='card-wrap')
@@ -124,8 +189,12 @@ class SpiderData:
'crawl_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S') 'crawl_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
} }
# 保存到数据库 # 添加到插入缓冲区
self._save_to_database(weibo_data) self.insert_buffer.append(weibo_data)
# 如果缓冲区达到阈值,执行批量插入
if len(self.insert_buffer) >= self.buffer_size:
self._flush_buffer()
except Exception as e: except Exception as e:
logging.error(f"解析微博项时出错: {e}") logging.error(f"解析微博项时出错: {e}")
@@ -134,52 +203,12 @@ class SpiderData:
except Exception as e: except Exception as e:
logging.error(f"解析页面时出错: {e}") logging.error(f"解析页面时出错: {e}")
def _extract_number(self, text): def _extract_number(self, text: str) -> int:
""" """从文本中提取数字"""
从文本中提取数字
:param text: 包含数字的文本
:return: 提取的数字,如果没有找到则返回0
"""
try: try:
return int(''.join(filter(str.isdigit, text))) return int(''.join(filter(str.isdigit, text)))
except ValueError: except ValueError:
return 0 return 0
def _save_to_database(self, data):
"""
将数据保存到数据库
:param data: 要保存的数据字典
"""
connection = None
try:
connection = self.db.get_connection()
with connection.cursor() as cursor:
# 插入文章数据
sql = """
INSERT INTO article (content, user_name, publish_time, forward_count,
comment_count, like_count, crawl_time)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, (
data['content'],
data['user_name'],
data['publish_time'],
data['forward_count'],
data['comment_count'],
data['like_count'],
data['crawl_time']
))
connection.commit()
logging.info(f"成功保存微博数据: {data['content'][:30]}...")
except Exception as e:
logging.error(f"保存数据时出错: {e}")
if connection:
connection.rollback()
if __name__ == '__main__': if __name__ == '__main__':
spiderData() spiderData()
+122 -107
View File
@@ -10,6 +10,10 @@ import logging
from spider.spiderData import SpiderData from spider.spiderData import SpiderData
from openai import OpenAI from openai import OpenAI
from anthropic import Anthropic from anthropic import Anthropic
import aiohttp
from concurrent.futures import ThreadPoolExecutor
from ratelimit import limits, sleep_and_retry
from tenacity import retry, stop_after_attempt, wait_exponential
# 创建蓝图 # 创建蓝图
spider_bp = Blueprint('spider', __name__) spider_bp = Blueprint('spider', __name__)
@@ -24,137 +28,150 @@ websocket_connections = set()
# 创建消息队列 # 创建消息队列
message_queue = Queue() message_queue = Queue()
# 创建线程池
thread_pool = ThreadPoolExecutor(max_workers=3)
# 创建异步事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 默认配置 # 默认配置
DEFAULT_CONFIG = { DEFAULT_CONFIG = {
'crawlDepth': 3, 'crawlDepth': 3,
'interval': 5, 'interval': 5,
'maxRetries': 3, 'maxRetries': 3,
'timeout': 30 'timeout': 30,
'maxConcurrent': 2
} }
def load_config(): # 限流装饰器
"""加载爬虫配置""" @sleep_and_retry
config_path = os.path.join(os.path.dirname(__file__), '../spider/config.json') @limits(calls=100, period=60) # 每分钟最多100个请求
try: def rate_limited_request():
if os.path.exists(config_path): pass
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"加载配置文件失败: {e}")
return DEFAULT_CONFIG
def save_config(config): class SpiderWorker:
"""保存爬虫配置""" def __init__(self, topics, parameters):
config_path = os.path.join(os.path.dirname(__file__), '../spider/config.json') self.topics = topics
try: self.parameters = parameters
with open(config_path, 'w', encoding='utf-8') as f: self.total_topics = len(topics)
json.dump(config, f, ensure_ascii=False, indent=4) self.completed_topics = 0
return True self.spider = SpiderData()
except Exception as e: self.message_buffer = []
logger.error(f"保存配置文件失败: {e}") self.message_buffer_size = 10
return False self.semaphore = asyncio.Semaphore(parameters.get('maxConcurrent', DEFAULT_CONFIG['maxConcurrent']))
async def send_message(self, message):
"""异步发送消息,使用缓冲区优化"""
self.message_buffer.append(message)
if len(self.message_buffer) >= self.message_buffer_size:
await self.flush_messages()
async def flush_messages(self):
"""刷新消息缓冲区"""
if not self.message_buffer:
return
try:
await broadcast_message(self.message_buffer)
self.message_buffer.clear()
except Exception as e:
logger.error(f"发送消息失败: {e}")
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def crawl_single_topic(self, topic):
"""爬取单个话题"""
try:
rate_limited_request()
await self.send_message({
'type': 'log',
'message': f'开始爬取话题: {topic}'
})
async with self.semaphore:
await asyncio.get_event_loop().run_in_executor(
thread_pool,
self.spider.crawl_topic,
topic,
self.parameters['crawlDepth'],
self.parameters['interval'],
self.parameters['maxRetries'],
self.parameters['timeout']
)
self.completed_topics += 1
progress = int((self.completed_topics / self.total_topics) * 100)
await self.send_message({
'type': 'progress',
'value': progress
})
await self.send_message({
'type': 'log',
'message': f'话题 {topic} 爬取完成'
})
except Exception as e:
logger.error(f"爬取话题 {topic} 失败: {e}")
await self.send_message({
'type': 'log',
'message': f'爬取话题 {topic} 时出错: {str(e)}'
})
raise
async def run(self):
"""运行爬虫任务"""
try:
tasks = [self.crawl_single_topic(topic) for topic in self.topics]
await asyncio.gather(*tasks)
await self.flush_messages()
await self.send_message({
'type': 'log',
'message': '所有话题爬取完成'
})
except Exception as e:
logger.error(f"爬虫任务执行出错: {e}")
await self.send_message({
'type': 'log',
'message': f'爬虫任务执行出错: {str(e)}'
})
finally:
await self.flush_messages()
async def broadcast_message(message): async def broadcast_message(messages):
"""广播消息到所有WebSocket连接""" """广播消息到所有WebSocket连接"""
if not websocket_connections: if not websocket_connections:
return return
for websocket in websocket_connections.copy(): for websocket in websocket_connections.copy():
try: try:
await websocket.send(json.dumps(message)) if isinstance(messages, list):
for message in messages:
await websocket.send(json.dumps(message))
else:
await websocket.send(json.dumps(messages))
except websockets.exceptions.ConnectionClosed: except websockets.exceptions.ConnectionClosed:
websocket_connections.remove(websocket) websocket_connections.remove(websocket)
except Exception as e: except Exception as e:
logger.error(f"发送WebSocket消息失败: {e}") logger.error(f"发送WebSocket消息失败: {e}")
websocket_connections.remove(websocket) websocket_connections.remove(websocket)
def spider_worker(topics, parameters):
"""爬虫工作线程"""
total_topics = len(topics)
completed_topics = 0
async def send_message(message):
"""异步发送消息的包装函数"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
await broadcast_message(message)
finally:
loop.close()
try:
spider = SpiderData()
for topic in topics:
try:
# 更新进度
progress = int((completed_topics / total_topics) * 100)
asyncio.run(send_message({
'type': 'progress',
'value': progress
}))
# 发送开始爬取的日志
asyncio.run(send_message({
'type': 'log',
'message': f'开始爬取话题: {topic}'
}))
# 执行爬取
spider.crawl_topic(
topic=topic,
depth=parameters['crawlDepth'],
interval=parameters['interval'],
max_retries=parameters['maxRetries'],
timeout=parameters['timeout']
)
completed_topics += 1
# 发送完成爬取的日志
asyncio.run(send_message({
'type': 'log',
'message': f'话题 {topic} 爬取完成'
}))
except Exception as e:
# 发送错误日志
asyncio.run(send_message({
'type': 'log',
'message': f'爬取话题 {topic} 时出错: {str(e)}'
}))
# 更新最终进度
asyncio.run(send_message({
'type': 'progress',
'value': 100
}))
# 发送完成消息
asyncio.run(send_message({
'type': 'log',
'message': '所有话题爬取完成'
}))
except Exception as e:
# 发送错误日志
asyncio.run(send_message({
'type': 'log',
'message': f'爬虫任务执行出错: {str(e)}'
}))
@spider_bp.route('/spider/control') @spider_bp.route('/spider/control')
def spider_control(): def spider_control():
"""渲染爬虫控制页面""" """渲染爬虫控制页面"""
return render_template('spider_control.html') return render_template('spider_control.html')
@spider_bp.route('/api/spider/start', methods=['POST']) @spider_bp.route('/api/spider/start', methods=['POST'])
def start_spider(): async def start_spider():
"""启动爬虫任务""" """启动爬虫任务"""
try: try:
data = request.get_json() data = request.get_json()
topics = data.get('topics', []) topics = data.get('topics', [])
parameters = data.get('parameters', DEFAULT_CONFIG) parameters = {**DEFAULT_CONFIG, **data.get('parameters', {})}
if not topics: if not topics:
return jsonify({ return jsonify({
@@ -162,13 +179,11 @@ def start_spider():
'message': '请选择至少一个话题' 'message': '请选择至少一个话题'
}) })
# 启动爬虫线程 # 创建爬虫工作器
thread = threading.Thread( worker = SpiderWorker(topics, parameters)
target=spider_worker,
args=(topics, parameters), # 在事件循环中运行爬虫任务
daemon=True asyncio.create_task(worker.run())
)
thread.start()
return jsonify({ return jsonify({
'success': True, 'success': True,