Uploading the AI Crawler System: MindSpider

This commit is contained in:
戒酒的李白
2025-08-27 13:49:07 +08:00
parent 822bad557f
commit 587e709e82
174 changed files with 34562 additions and 25 deletions
@@ -0,0 +1,11 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
@@ -0,0 +1,249 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import os
import platform
import subprocess
import time
import socket
from typing import Optional, List, Tuple
import asyncio
from pathlib import Path
from tools import utils
class BrowserLauncher:
"""
浏览器启动器,用于检测和启动用户的Chrome/Edge浏览器
支持Windows和macOS系统
"""
def __init__(self):
self.system = platform.system()
self.browser_process = None
self.debug_port = None
def detect_browser_paths(self) -> List[str]:
"""
检测系统中可用的浏览器路径
返回按优先级排序的浏览器路径列表
"""
paths = []
if self.system == "Windows":
# Windows下的常见Chrome/Edge安装路径
possible_paths = [
# Chrome路径
os.path.expandvars(r"%PROGRAMFILES%\Google\Chrome\Application\chrome.exe"),
os.path.expandvars(r"%PROGRAMFILES(X86)%\Google\Chrome\Application\chrome.exe"),
os.path.expandvars(r"%LOCALAPPDATA%\Google\Chrome\Application\chrome.exe"),
# Edge路径
os.path.expandvars(r"%PROGRAMFILES%\Microsoft\Edge\Application\msedge.exe"),
os.path.expandvars(r"%PROGRAMFILES(X86)%\Microsoft\Edge\Application\msedge.exe"),
# Chrome Beta/Dev/Canary
os.path.expandvars(r"%LOCALAPPDATA%\Google\Chrome Beta\Application\chrome.exe"),
os.path.expandvars(r"%LOCALAPPDATA%\Google\Chrome Dev\Application\chrome.exe"),
os.path.expandvars(r"%LOCALAPPDATA%\Google\Chrome SxS\Application\chrome.exe"),
]
elif self.system == "Darwin": # macOS
# macOS下的常见Chrome/Edge安装路径
possible_paths = [
# Chrome路径
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Google Chrome Beta.app/Contents/MacOS/Google Chrome Beta",
"/Applications/Google Chrome Dev.app/Contents/MacOS/Google Chrome Dev",
"/Applications/Google Chrome Canary.app/Contents/MacOS/Google Chrome Canary",
# Edge路径
"/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge",
"/Applications/Microsoft Edge Beta.app/Contents/MacOS/Microsoft Edge Beta",
"/Applications/Microsoft Edge Dev.app/Contents/MacOS/Microsoft Edge Dev",
"/Applications/Microsoft Edge Canary.app/Contents/MacOS/Microsoft Edge Canary",
]
else:
# Linux等其他系统
possible_paths = [
"/usr/bin/google-chrome",
"/usr/bin/google-chrome-stable",
"/usr/bin/google-chrome-beta",
"/usr/bin/google-chrome-unstable",
"/usr/bin/chromium-browser",
"/usr/bin/chromium",
"/snap/bin/chromium",
"/usr/bin/microsoft-edge",
"/usr/bin/microsoft-edge-stable",
"/usr/bin/microsoft-edge-beta",
"/usr/bin/microsoft-edge-dev",
]
# 检查路径是否存在且可执行
for path in possible_paths:
if os.path.isfile(path) and os.access(path, os.X_OK):
paths.append(path)
return paths
def find_available_port(self, start_port: int = 9222) -> int:
"""
查找可用的端口
"""
port = start_port
while port < start_port + 100: # 最多尝试100个端口
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('localhost', port))
return port
except OSError:
port += 1
raise RuntimeError(f"无法找到可用的端口,已尝试 {start_port}{port-1}")
def launch_browser(self, browser_path: str, debug_port: int, headless: bool = False,
user_data_dir: Optional[str] = None) -> subprocess.Popen:
"""
启动浏览器进程
"""
# 基本启动参数
args = [
browser_path,
f"--remote-debugging-port={debug_port}",
"--remote-debugging-address=0.0.0.0", # 允许远程访问
"--no-first-run",
"--no-default-browser-check",
"--disable-background-timer-throttling",
"--disable-backgrounding-occluded-windows",
"--disable-renderer-backgrounding",
"--disable-features=TranslateUI",
"--disable-ipc-flooding-protection",
"--disable-hang-monitor",
"--disable-prompt-on-repost",
"--disable-sync",
"--disable-web-security", # 可能有助于某些网站的访问
"--disable-features=VizDisplayCompositor",
"--disable-dev-shm-usage", # 避免共享内存问题
"--no-sandbox", # 在CDP模式下关闭沙箱
]
# 无头模式
if headless:
args.extend([
"--headless",
"--disable-gpu",
])
else:
# 非无头模式下也保持一些稳定性参数
args.extend([
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
])
# 用户数据目录
if user_data_dir:
args.append(f"--user-data-dir={user_data_dir}")
utils.logger.info(f"[BrowserLauncher] 启动浏览器: {browser_path}")
utils.logger.info(f"[BrowserLauncher] 调试端口: {debug_port}")
utils.logger.info(f"[BrowserLauncher] 无头模式: {headless}")
try:
# 在Windows上,使用CREATE_NEW_PROCESS_GROUP避免Ctrl+C影响子进程
if self.system == "Windows":
process = subprocess.Popen(
args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
)
else:
process = subprocess.Popen(
args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
preexec_fn=os.setsid # 创建新的进程组
)
return process
except Exception as e:
utils.logger.error(f"[BrowserLauncher] 启动浏览器失败: {e}")
raise
def wait_for_browser_ready(self, debug_port: int, timeout: int = 30) -> bool:
"""
等待浏览器准备就绪
"""
utils.logger.info(f"[BrowserLauncher] 等待浏览器在端口 {debug_port} 上准备就绪...")
start_time = time.time()
while time.time() - start_time < timeout:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
result = s.connect_ex(('localhost', debug_port))
if result == 0:
utils.logger.info(f"[BrowserLauncher] 浏览器已在端口 {debug_port} 上准备就绪")
return True
except Exception:
pass
time.sleep(0.5)
utils.logger.error(f"[BrowserLauncher] 浏览器在 {timeout} 秒内未能准备就绪")
return False
def get_browser_info(self, browser_path: str) -> Tuple[str, str]:
"""
获取浏览器信息(名称和版本)
"""
try:
if "chrome" in browser_path.lower():
name = "Google Chrome"
elif "edge" in browser_path.lower() or "msedge" in browser_path.lower():
name = "Microsoft Edge"
elif "chromium" in browser_path.lower():
name = "Chromium"
else:
name = "Unknown Browser"
# 尝试获取版本信息
try:
result = subprocess.run([browser_path, "--version"],
capture_output=True, text=True, timeout=5)
version = result.stdout.strip() if result.stdout else "Unknown Version"
except:
version = "Unknown Version"
return name, version
except Exception:
return "Unknown Browser", "Unknown Version"
def cleanup(self):
"""
清理资源,关闭浏览器进程
"""
if self.browser_process:
try:
utils.logger.info("[BrowserLauncher] 正在关闭浏览器进程...")
if self.system == "Windows":
# Windows下使用taskkill强制终止进程树
subprocess.run(["taskkill", "/F", "/T", "/PID", str(self.browser_process.pid)],
capture_output=True)
else:
# Unix系统下终止进程组
os.killpg(os.getpgid(self.browser_process.pid), 9)
self.browser_process = None
utils.logger.info("[BrowserLauncher] 浏览器进程已关闭")
except Exception as e:
utils.logger.warning(f"[BrowserLauncher] 关闭浏览器进程时出错: {e}")
@@ -0,0 +1,341 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import os
import asyncio
import socket
import httpx
from typing import Optional, Dict, Any
from playwright.async_api import Browser, BrowserContext, Playwright
import config
from tools.browser_launcher import BrowserLauncher
from tools import utils
class CDPBrowserManager:
"""
CDP浏览器管理器,负责启动和管理通过CDP连接的浏览器
"""
def __init__(self):
self.launcher = BrowserLauncher()
self.browser: Optional[Browser] = None
self.browser_context: Optional[BrowserContext] = None
self.debug_port: Optional[int] = None
async def launch_and_connect(
self,
playwright: Playwright,
playwright_proxy: Optional[Dict] = None,
user_agent: Optional[str] = None,
headless: bool = False,
) -> BrowserContext:
"""
启动浏览器并通过CDP连接
"""
try:
# 1. 检测浏览器路径
browser_path = await self._get_browser_path()
# 2. 获取可用端口
self.debug_port = self.launcher.find_available_port(config.CDP_DEBUG_PORT)
# 3. 启动浏览器
await self._launch_browser(browser_path, headless)
# 4. 通过CDP连接
await self._connect_via_cdp(playwright)
# 5. 创建浏览器上下文
browser_context = await self._create_browser_context(
playwright_proxy, user_agent
)
self.browser_context = browser_context
return browser_context
except Exception as e:
utils.logger.error(f"[CDPBrowserManager] CDP浏览器启动失败: {e}")
await self.cleanup()
raise
async def _get_browser_path(self) -> str:
"""
获取浏览器路径
"""
# 优先使用用户自定义路径
if config.CUSTOM_BROWSER_PATH and os.path.isfile(config.CUSTOM_BROWSER_PATH):
utils.logger.info(
f"[CDPBrowserManager] 使用自定义浏览器路径: {config.CUSTOM_BROWSER_PATH}"
)
return config.CUSTOM_BROWSER_PATH
# 自动检测浏览器路径
browser_paths = self.launcher.detect_browser_paths()
if not browser_paths:
raise RuntimeError(
"未找到可用的浏览器。请确保已安装Chrome或Edge浏览器,"
"或在配置文件中设置CUSTOM_BROWSER_PATH指定浏览器路径。"
)
browser_path = browser_paths[0] # 使用第一个找到的浏览器
browser_name, browser_version = self.launcher.get_browser_info(browser_path)
utils.logger.info(
f"[CDPBrowserManager] 检测到浏览器: {browser_name} ({browser_version})"
)
utils.logger.info(f"[CDPBrowserManager] 浏览器路径: {browser_path}")
return browser_path
async def _test_cdp_connection(self, debug_port: int) -> bool:
"""
测试CDP连接是否可用
"""
try:
# 简单的socket连接测试
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(5)
result = s.connect_ex(("localhost", debug_port))
if result == 0:
utils.logger.info(
f"[CDPBrowserManager] CDP端口 {debug_port} 可访问"
)
return True
else:
utils.logger.warning(
f"[CDPBrowserManager] CDP端口 {debug_port} 不可访问"
)
return False
except Exception as e:
utils.logger.warning(f"[CDPBrowserManager] CDP连接测试失败: {e}")
return False
async def _launch_browser(self, browser_path: str, headless: bool):
"""
启动浏览器进程
"""
# 设置用户数据目录(如果启用了保存登录状态)
user_data_dir = None
if config.SAVE_LOGIN_STATE:
user_data_dir = os.path.join(
os.getcwd(),
"browser_data",
f"cdp_{config.USER_DATA_DIR % config.PLATFORM}",
)
os.makedirs(user_data_dir, exist_ok=True)
utils.logger.info(f"[CDPBrowserManager] 用户数据目录: {user_data_dir}")
# 启动浏览器
self.launcher.browser_process = self.launcher.launch_browser(
browser_path=browser_path,
debug_port=self.debug_port,
headless=headless,
user_data_dir=user_data_dir,
)
# 等待浏览器准备就绪
if not self.launcher.wait_for_browser_ready(
self.debug_port, config.BROWSER_LAUNCH_TIMEOUT
):
raise RuntimeError(f"浏览器在 {config.BROWSER_LAUNCH_TIMEOUT} 秒内未能启动")
# 额外等待一秒让CDP服务完全启动
await asyncio.sleep(1)
# 测试CDP连接
if not await self._test_cdp_connection(self.debug_port):
utils.logger.warning(
"[CDPBrowserManager] CDP连接测试失败,但将继续尝试连接"
)
async def _get_browser_websocket_url(self, debug_port: int) -> str:
"""
获取浏览器的WebSocket连接URL
"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"http://localhost:{debug_port}/json/version", timeout=10
)
if response.status_code == 200:
data = response.json()
ws_url = data.get("webSocketDebuggerUrl")
if ws_url:
utils.logger.info(
f"[CDPBrowserManager] 获取到浏览器WebSocket URL: {ws_url}"
)
return ws_url
else:
raise RuntimeError("未找到webSocketDebuggerUrl")
else:
raise RuntimeError(f"HTTP {response.status_code}: {response.text}")
except Exception as e:
utils.logger.error(f"[CDPBrowserManager] 获取WebSocket URL失败: {e}")
raise
async def _connect_via_cdp(self, playwright: Playwright):
"""
通过CDP连接到浏览器
"""
try:
# 获取正确的WebSocket URL
ws_url = await self._get_browser_websocket_url(self.debug_port)
utils.logger.info(f"[CDPBrowserManager] 正在通过CDP连接到浏览器: {ws_url}")
# 使用Playwright的connectOverCDP方法连接
self.browser = await playwright.chromium.connect_over_cdp(ws_url)
if self.browser.is_connected():
utils.logger.info("[CDPBrowserManager] 成功连接到浏览器")
utils.logger.info(
f"[CDPBrowserManager] 浏览器上下文数量: {len(self.browser.contexts)}"
)
else:
raise RuntimeError("CDP连接失败")
except Exception as e:
utils.logger.error(f"[CDPBrowserManager] CDP连接失败: {e}")
raise
async def _create_browser_context(
self, playwright_proxy: Optional[Dict] = None, user_agent: Optional[str] = None
) -> BrowserContext:
"""
创建或获取浏览器上下文
"""
if not self.browser:
raise RuntimeError("浏览器未连接")
# 获取现有上下文或创建新的上下文
contexts = self.browser.contexts
if contexts:
# 使用现有的第一个上下文
browser_context = contexts[0]
utils.logger.info("[CDPBrowserManager] 使用现有的浏览器上下文")
else:
# 创建新的上下文
context_options = {
"viewport": {"width": 1920, "height": 1080},
"accept_downloads": True,
}
# 设置用户代理
if user_agent:
context_options["user_agent"] = user_agent
utils.logger.info(f"[CDPBrowserManager] 设置用户代理: {user_agent}")
# 注意:CDP模式下代理设置可能不生效,因为浏览器已经启动
if playwright_proxy:
utils.logger.warning(
"[CDPBrowserManager] 警告: CDP模式下代理设置可能不生效,"
"建议在浏览器启动前配置系统代理或浏览器代理扩展"
)
browser_context = await self.browser.new_context(**context_options)
utils.logger.info("[CDPBrowserManager] 创建新的浏览器上下文")
return browser_context
async def add_stealth_script(self, script_path: str = "libs/stealth.min.js"):
"""
添加反检测脚本
"""
if self.browser_context and os.path.exists(script_path):
try:
await self.browser_context.add_init_script(path=script_path)
utils.logger.info(
f"[CDPBrowserManager] 已添加反检测脚本: {script_path}"
)
except Exception as e:
utils.logger.warning(f"[CDPBrowserManager] 添加反检测脚本失败: {e}")
async def add_cookies(self, cookies: list):
"""
添加Cookie
"""
if self.browser_context:
try:
await self.browser_context.add_cookies(cookies)
utils.logger.info(f"[CDPBrowserManager] 已添加 {len(cookies)} 个Cookie")
except Exception as e:
utils.logger.warning(f"[CDPBrowserManager] 添加Cookie失败: {e}")
async def get_cookies(self) -> list:
"""
获取当前Cookie
"""
if self.browser_context:
try:
cookies = await self.browser_context.cookies()
return cookies
except Exception as e:
utils.logger.warning(f"[CDPBrowserManager] 获取Cookie失败: {e}")
return []
return []
async def cleanup(self):
"""
清理资源
"""
try:
# 关闭浏览器上下文
# if self.browser_context:
# await self.browser_context.close()
# self.browser_context = None
# utils.logger.info("[CDPBrowserManager] 浏览器上下文已关闭")
# # 断开浏览器连接
# if self.browser:
# await self.browser.close()
# self.browser = None
# utils.logger.info("[CDPBrowserManager] 浏览器连接已断开")
# 关闭浏览器进程(如果配置为自动关闭)
if config.AUTO_CLOSE_BROWSER:
self.launcher.cleanup()
else:
utils.logger.info(
"[CDPBrowserManager] 浏览器进程保持运行(AUTO_CLOSE_BROWSER=False"
)
except Exception as e:
utils.logger.error(f"[CDPBrowserManager] 清理资源时出错: {e}")
def is_connected(self) -> bool:
"""
检查是否已连接到浏览器
"""
return self.browser is not None and self.browser.is_connected()
async def get_browser_info(self) -> Dict[str, Any]:
"""
获取浏览器信息
"""
if not self.browser:
return {}
try:
version = self.browser.version
contexts_count = len(self.browser.contexts)
return {
"version": version,
"contexts_count": contexts_count,
"debug_port": self.debug_port,
"is_connected": self.is_connected(),
}
except Exception as e:
utils.logger.warning(f"[CDPBrowserManager] 获取浏览器信息失败: {e}")
return {}
@@ -0,0 +1,212 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 12:53
# @Desc : 爬虫相关的工具函数
import base64
import json
import random
import re
import urllib
import urllib.parse
from io import BytesIO
from typing import Dict, List, Optional, Tuple, cast
import httpx
from PIL import Image, ImageDraw, ImageShow
from playwright.async_api import Cookie, Page
from . import utils
async def find_login_qrcode(page: Page, selector: str) -> str:
"""find login qrcode image from target selector"""
try:
elements = await page.wait_for_selector(
selector=selector,
)
login_qrcode_img = str(await elements.get_property("src")) # type: ignore
if "http://" in login_qrcode_img or "https://" in login_qrcode_img:
async with httpx.AsyncClient(follow_redirects=True) as client:
utils.logger.info(f"[find_login_qrcode] get qrcode by url:{login_qrcode_img}")
resp = await client.get(login_qrcode_img, headers={"User-Agent": get_user_agent()})
if resp.status_code == 200:
image_data = resp.content
base64_image = base64.b64encode(image_data).decode('utf-8')
return base64_image
raise Exception(f"fetch login image url failed, response message:{resp.text}")
return login_qrcode_img
except Exception as e:
print(e)
return ""
async def find_qrcode_img_from_canvas(page: Page, canvas_selector: str) -> str:
"""
find qrcode image from canvas element
Args:
page:
canvas_selector:
Returns:
"""
# 等待Canvas元素加载完成
canvas = await page.wait_for_selector(canvas_selector)
# 截取Canvas元素的截图
screenshot = await canvas.screenshot()
# 将截图转换为base64格式
base64_image = base64.b64encode(screenshot).decode('utf-8')
return base64_image
def show_qrcode(qr_code) -> None: # type: ignore
"""parse base64 encode qrcode image and show it"""
if "," in qr_code:
qr_code = qr_code.split(",")[1]
qr_code = base64.b64decode(qr_code)
image = Image.open(BytesIO(qr_code))
# Add a square border around the QR code and display it within the border to improve scanning accuracy.
width, height = image.size
new_image = Image.new('RGB', (width + 20, height + 20), color=(255, 255, 255))
new_image.paste(image, (10, 10))
draw = ImageDraw.Draw(new_image)
draw.rectangle((0, 0, width + 19, height + 19), outline=(0, 0, 0), width=1)
del ImageShow.UnixViewer.options["save_all"]
new_image.show()
def get_user_agent() -> str:
ua_list = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.79 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.5112.79 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.5060.53 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.4844.84 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.5112.79 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.5060.53 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.4844.84 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.5112.79 Safari/537.36"
]
return random.choice(ua_list)
def get_mobile_user_agent() -> str:
ua_list = [
"Mozilla/5.0 (iPhone; CPU iPhone OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (iPad; CPU OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (iPhone; CPU iPhone OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/114.0.5735.99 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (iPad; CPU OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/114.0.5735.124 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (Linux; Android 13; SAMSUNG SM-S918B) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/21.0 Chrome/110.0.5481.154 Mobile Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 OPR/99.0.0.0",
"Mozilla/5.0 (Linux; Android 10; JNY-LX1; HMSCore 6.11.0.302) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.88 HuaweiBrowser/13.0.5.303 Mobile Safari/537.36"
]
return random.choice(ua_list)
def convert_cookies(cookies: Optional[List[Cookie]]) -> Tuple[str, Dict]:
if not cookies:
return "", {}
cookies_str = ";".join([f"{cookie.get('name')}={cookie.get('value')}" for cookie in cookies])
cookie_dict = dict()
for cookie in cookies:
cookie_dict[cookie.get('name')] = cookie.get('value')
return cookies_str, cookie_dict
def convert_str_cookie_to_dict(cookie_str: str) -> Dict:
cookie_dict: Dict[str, str] = dict()
if not cookie_str:
return cookie_dict
for cookie in cookie_str.split(";"):
cookie = cookie.strip()
if not cookie:
continue
cookie_list = cookie.split("=")
if len(cookie_list) != 2:
continue
cookie_value = cookie_list[1]
if isinstance(cookie_value, list):
cookie_value = "".join(cookie_value)
cookie_dict[cookie_list[0]] = cookie_value
return cookie_dict
def match_interact_info_count(count_str: str) -> int:
if not count_str:
return 0
match = re.search(r'\d+', count_str)
if match:
number = match.group()
return int(number)
else:
return 0
def format_proxy_info(ip_proxy_info) -> Tuple[Optional[Dict], Optional[str]]:
"""format proxy info for playwright and httpx"""
# fix circular import issue
from proxy.proxy_ip_pool import IpInfoModel
ip_proxy_info = cast(IpInfoModel, ip_proxy_info)
playwright_proxy = {
"server": f"{ip_proxy_info.protocol}{ip_proxy_info.ip}:{ip_proxy_info.port}",
"username": ip_proxy_info.user,
"password": ip_proxy_info.password,
}
# httpx 0.28.1 需要直接传入代理URL字符串,而不是字典
if ip_proxy_info.user and ip_proxy_info.password:
httpx_proxy = f"http://{ip_proxy_info.user}:{ip_proxy_info.password}@{ip_proxy_info.ip}:{ip_proxy_info.port}"
else:
httpx_proxy = f"http://{ip_proxy_info.ip}:{ip_proxy_info.port}"
return playwright_proxy, httpx_proxy
def extract_text_from_html(html: str) -> str:
"""Extract text from HTML, removing all tags."""
if not html:
return ""
# Remove script and style elements
clean_html = re.sub(r'<(script|style)[^>]*>.*?</\1>', '', html, flags=re.DOTALL)
# Remove all other tags
clean_text = re.sub(r'<[^>]+>', '', clean_html).strip()
return clean_text
def extract_url_params_to_dict(url: str) -> Dict:
"""Extract URL parameters to dict"""
url_params_dict = dict()
if not url:
return url_params_dict
parsed_url = urllib.parse.urlparse(url)
url_params_dict = dict(urllib.parse.parse_qsl(parsed_url.query))
return url_params_dict
@@ -0,0 +1,81 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# copy from https://github.com/aneasystone/selenium-test/blob/master/12-slider-captcha.py
# thanks to aneasystone for his great work
import math
from typing import List, Tuple
import numpy as np
# https://github.com/gdsmith/jquery.easing/blob/master/jquery.easing.js
def ease_in_quad(x):
return x * x
def ease_out_quad(x):
return 1 - (1 - x) * (1 - x)
def ease_out_quart(x):
return 1 - pow(1 - x, 4)
def ease_out_expo(x):
if x == 1:
return 1
else:
return 1 - pow(2, -10 * x)
def ease_out_bounce(x):
n1 = 7.5625
d1 = 2.75
if x < 1 / d1:
return n1 * x * x
elif x < 2 / d1:
x -= 1.5 / d1
return n1 * x * x + 0.75
elif x < 2.5 / d1:
x -= 2.25 / d1
return n1 * x * x + 0.9375
else:
x -= 2.625 / d1
return n1 * x * x + 0.984375
def ease_out_elastic(x):
if x == 0:
return 0
elif x == 1:
return 1
else:
c4 = (2 * math.pi) / 3
return pow(2, -10 * x) * math.sin((x * 10 - 0.75) * c4) + 1
def get_tracks(distance, seconds, ease_func) -> Tuple[List[int], List[int]]:
tracks = [0]
offsets = [0]
for t in np.arange(0.0, seconds, 0.1):
ease = globals()[ease_func]
offset = round(ease(t / seconds) * distance)
tracks.append(offset - offsets[-1])
offsets.append(offset)
return offsets, tracks
if __name__ == '__main__':
o, tl = get_tracks(129, 3, "ease_out_expo")
print(tl)
@@ -0,0 +1,175 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 12:55
# @Desc : 滑块相关的工具包
import os
from typing import List
from urllib.parse import urlparse
import cv2
import httpx
import numpy as np
class Slide:
"""
copy from https://blog.csdn.net/weixin_43582101 thanks for author
update: relakkes
"""
def __init__(self, gap, bg, gap_size=None, bg_size=None, out=None):
"""
:param gap: 缺口图片链接或者url
:param bg: 带缺口的图片链接或者url
"""
self.img_dir = os.path.join(os.getcwd(), 'temp_image')
if not os.path.exists(self.img_dir):
os.makedirs(self.img_dir)
bg_resize = bg_size if bg_size else (340, 212)
gap_size = gap_size if gap_size else (68, 68)
self.bg = self.check_is_img_path(bg, 'bg', resize=bg_resize)
self.gap = self.check_is_img_path(gap, 'gap', resize=gap_size)
self.out = out if out else os.path.join(self.img_dir, 'out.jpg')
@staticmethod
def check_is_img_path(img, img_type, resize):
if img.startswith('http'):
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;"
"q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en-GB;q=0.8,en;q=0.7,ja;q=0.6",
"AbstractCache-Control": "max-age=0",
"Connection": "keep-alive",
"Host": urlparse(img).hostname,
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/91.0.4472.164 Safari/537.36",
}
img_res = httpx.get(img, headers=headers)
if img_res.status_code == 200:
img_path = f'./temp_image/{img_type}.jpg'
image = np.asarray(bytearray(img_res.content), dtype="uint8")
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
if resize:
image = cv2.resize(image, dsize=resize)
cv2.imwrite(img_path, image)
return img_path
else:
raise Exception(f"保存{img_type}图片失败")
else:
return img
@staticmethod
def clear_white(img):
"""清除图片的空白区域,这里主要清除滑块的空白"""
img = cv2.imread(img)
rows, cols, channel = img.shape
min_x = 255
min_y = 255
max_x = 0
max_y = 0
for x in range(1, rows):
for y in range(1, cols):
t = set(img[x, y])
if len(t) >= 2:
if x <= min_x:
min_x = x
elif x >= max_x:
max_x = x
if y <= min_y:
min_y = y
elif y >= max_y:
max_y = y
img1 = img[min_x:max_x, min_y: max_y]
return img1
def template_match(self, tpl, target):
th, tw = tpl.shape[:2]
result = cv2.matchTemplate(target, tpl, cv2.TM_CCOEFF_NORMED)
# 寻找矩阵(一维数组当作向量,用Mat定义) 中最小值和最大值的位置
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
tl = max_loc
br = (tl[0] + tw, tl[1] + th)
# 绘制矩形边框,将匹配区域标注出来
# target:目标图像
# tl:矩形定点
# br:矩形的宽高
# (0,0,255):矩形边框颜色
# 1:矩形边框大小
cv2.rectangle(target, tl, br, (0, 0, 255), 2)
cv2.imwrite(self.out, target)
return tl[0]
@staticmethod
def image_edge_detection(img):
edges = cv2.Canny(img, 100, 200)
return edges
def discern(self):
img1 = self.clear_white(self.gap)
img1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY)
slide = self.image_edge_detection(img1)
back = cv2.imread(self.bg, cv2.COLOR_RGB2GRAY)
back = self.image_edge_detection(back)
slide_pic = cv2.cvtColor(slide, cv2.COLOR_GRAY2RGB)
back_pic = cv2.cvtColor(back, cv2.COLOR_GRAY2RGB)
x = self.template_match(slide_pic, back_pic)
# 输出横坐标, 即 滑块在图片上的位置
return x
def get_track_simple(distance) -> List[int]:
# 有的检测移动速度的 如果匀速移动会被识别出来,来个简单点的 渐进
# distance为传入的总距离
# 移动轨迹
track: List[int] = []
# 当前位移
current = 0
# 减速阈值
mid = distance * 4 / 5
# 计算间隔
t = 0.2
# 初速度
v = 1
while current < distance:
if current < mid:
# 加速度为2
a = 4
else:
# 加速度为-2
a = -3
v0 = v
# 当前速度
v = v0 + a * t # type: ignore
# 移动距离
move = v0 * t + 1 / 2 * a * t * t
# 当前位移
current += move # type: ignore
# 加入轨迹
track.append(round(move))
return track
def get_tracks(distance: int, level: str = "easy") -> List[int]:
if level == "easy":
return get_track_simple(distance)
else:
from . import easing
_, tricks = easing.get_tracks(distance, seconds=2, ease_func="ease_out_expo")
return tricks
@@ -0,0 +1,117 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 12:52
# @Desc : 时间相关的工具函数
import time
from datetime import datetime, timedelta, timezone
def get_current_timestamp() -> int:
"""
获取当前的时间戳(13 位)1701493264496
:return:
"""
return int(time.time() * 1000)
def get_current_time() -> str:
"""
获取当前的时间:'2023-12-02 13:01:23'
:return:
"""
return time.strftime('%Y-%m-%d %X', time.localtime())
def get_current_date() -> str:
"""
获取当前的日期:'2023-12-02'
:return:
"""
return time.strftime('%Y-%m-%d', time.localtime())
def get_time_str_from_unix_time(unixtime):
"""
unix 整数类型时间戳 ==> 字符串日期时间
:param unixtime:
:return:
"""
if int(unixtime) > 1000000000000:
unixtime = int(unixtime) / 1000
return time.strftime('%Y-%m-%d %X', time.localtime(unixtime))
def get_date_str_from_unix_time(unixtime):
"""
unix 整数类型时间戳 ==> 字符串日期
:param unixtime:
:return:
"""
if int(unixtime) > 1000000000000:
unixtime = int(unixtime) / 1000
return time.strftime('%Y-%m-%d', time.localtime(unixtime))
def get_unix_time_from_time_str(time_str):
"""
字符串时间 ==> unix 整数类型时间戳,精确到秒
:param time_str:
:return:
"""
try:
format_str = "%Y-%m-%d %H:%M:%S"
tm_object = time.strptime(str(time_str), format_str)
return int(time.mktime(tm_object))
except Exception as e:
return 0
pass
def get_unix_timestamp():
return int(time.time())
def rfc2822_to_china_datetime(rfc2822_time):
# 定义RFC 2822格式
rfc2822_format = "%a %b %d %H:%M:%S %z %Y"
# 将RFC 2822时间字符串转换为datetime对象
dt_object = datetime.strptime(rfc2822_time, rfc2822_format)
# 将datetime对象的时区转换为中国时区
dt_object_china = dt_object.astimezone(timezone(timedelta(hours=8)))
return dt_object_china
def rfc2822_to_timestamp(rfc2822_time):
# 定义RFC 2822格式
rfc2822_format = "%a %b %d %H:%M:%S %z %Y"
# 将RFC 2822时间字符串转换为datetime对象
dt_object = datetime.strptime(rfc2822_time, rfc2822_format)
# 将datetime对象转换为UTC时间
dt_utc = dt_object.replace(tzinfo=timezone.utc)
# 计算UTC时间对应的Unix时间戳
timestamp = int(dt_utc.timestamp())
return timestamp
if __name__ == '__main__':
# 示例用法
_rfc2822_time = "Sat Dec 23 17:12:54 +0800 2023"
print(rfc2822_to_china_datetime(_rfc2822_time))
@@ -0,0 +1,42 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import argparse
import logging
from .crawler_util import *
from .slider_util import *
from .time_util import *
def init_loging_config():
level = logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s %(name)s %(levelname)s (%(filename)s:%(lineno)d) - %(message)s",
datefmt='%Y-%m-%d %H:%M:%S'
)
_logger = logging.getLogger("MediaCrawler")
_logger.setLevel(level)
return _logger
logger = init_loging_config()
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
@@ -0,0 +1,83 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import asyncio
import json
import logging
from collections import Counter
import aiofiles
import jieba
import matplotlib.pyplot as plt
from wordcloud import WordCloud
import config
from tools import utils
plot_lock = asyncio.Lock()
class AsyncWordCloudGenerator:
def __init__(self):
logging.getLogger('jieba').setLevel(logging.WARNING)
self.stop_words_file = config.STOP_WORDS_FILE
self.lock = asyncio.Lock()
self.stop_words = self.load_stop_words()
self.custom_words = config.CUSTOM_WORDS
for word, group in self.custom_words.items():
jieba.add_word(word)
def load_stop_words(self):
with open(self.stop_words_file, 'r', encoding='utf-8') as f:
return set(f.read().strip().split('\n'))
async def generate_word_frequency_and_cloud(self, data, save_words_prefix):
all_text = ' '.join(item['content'] for item in data)
words = [word for word in jieba.lcut(all_text) if word not in self.stop_words and len(word.strip()) > 0]
word_freq = Counter(words)
# Save word frequency to file
freq_file = f"{save_words_prefix}_word_freq.json"
async with aiofiles.open(freq_file, 'w', encoding='utf-8') as file:
await file.write(json.dumps(word_freq, ensure_ascii=False, indent=4))
# Try to acquire the plot lock without waiting
if plot_lock.locked():
utils.logger.info("Skipping word cloud generation as the lock is held.")
return
await self.generate_word_cloud(word_freq, save_words_prefix)
async def generate_word_cloud(self, word_freq, save_words_prefix):
await plot_lock.acquire()
top_20_word_freq = {word: freq for word, freq in
sorted(word_freq.items(), key=lambda item: item[1], reverse=True)[:20]}
wordcloud = WordCloud(
font_path=config.FONT_PATH,
width=800,
height=400,
background_color='white',
max_words=200,
stopwords=self.stop_words,
colormap='viridis',
contour_color='steelblue',
contour_width=1
).generate_from_frequencies(top_20_word_freq)
# Save word cloud image
plt.figure(figsize=(10, 5), facecolor='white')
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.tight_layout(pad=0)
plt.savefig(f"{save_words_prefix}_word_cloud.png", format='png', dpi=300)
plt.close()
plot_lock.release()