The framework has been restructured again, and the Flask framework has been abandoned.

This commit is contained in:
戒酒的李白
2025-08-22 13:52:05 +08:00
parent 15b3a3343b
commit 0c31be4287
279 changed files with 2725 additions and 1648837 deletions
@@ -1,13 +0,0 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
from .core import ZhihuCrawler
@@ -1,568 +0,0 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
import asyncio
import json
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import urlencode
import httpx
from httpx import Response
from playwright.async_api import BrowserContext, Page
from tenacity import retry, stop_after_attempt, wait_fixed
import config
from base.base_crawler import AbstractApiClient
from constant import zhihu as zhihu_constant
from model.m_zhihu import ZhihuComment, ZhihuContent, ZhihuCreator
from tools import utils
from .exception import DataFetchError, ForbiddenError
from .field import SearchSort, SearchTime, SearchType
from .help import ZhihuExtractor, sign
class ZhiHuClient(AbstractApiClient):
def __init__(
self,
timeout=10,
proxy=None,
*,
headers: Dict[str, str],
playwright_page: Page,
cookie_dict: Dict[str, str],
):
self.proxy = proxy
self.timeout = timeout
self.default_headers = headers
self.cookie_dict = cookie_dict
self._extractor = ZhihuExtractor()
async def _pre_headers(self, url: str) -> Dict:
"""
请求头参数签名
Args:
url: 请求的URL需要包含请求的参数
Returns:
"""
d_c0 = self.cookie_dict.get("d_c0")
if not d_c0:
raise Exception("d_c0 not found in cookies")
sign_res = sign(url, self.default_headers["cookie"])
headers = self.default_headers.copy()
headers['x-zst-81'] = sign_res["x-zst-81"]
headers['x-zse-96'] = sign_res["x-zse-96"]
return headers
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def request(self, method, url, **kwargs) -> Union[str, Any]:
"""
封装httpx的公共请求方法,对请求响应做一些处理
Args:
method: 请求方法
url: 请求的URL
**kwargs: 其他请求参数,例如请求头、请求体等
Returns:
"""
# return response.text
return_response = kwargs.pop('return_response', False)
async with httpx.AsyncClient(proxy=self.proxy) as client:
response = await client.request(method, url, timeout=self.timeout, **kwargs)
if response.status_code != 200:
utils.logger.error(f"[ZhiHuClient.request] Requset Url: {url}, Request error: {response.text}")
if response.status_code == 403:
raise ForbiddenError(response.text)
elif response.status_code == 404: # 如果一个content没有评论也是404
return {}
raise DataFetchError(response.text)
if return_response:
return response.text
try:
data: Dict = response.json()
if data.get("error"):
utils.logger.error(f"[ZhiHuClient.request] Request error: {data}")
raise DataFetchError(data.get("error", {}).get("message"))
return data
except json.JSONDecodeError:
utils.logger.error(f"[ZhiHuClient.request] Request error: {response.text}")
raise DataFetchError(response.text)
async def get(self, uri: str, params=None, **kwargs) -> Union[Response, Dict, str]:
"""
GET请求,对请求头签名
Args:
uri: 请求路由
params: 请求参数
Returns:
"""
final_uri = uri
if isinstance(params, dict):
final_uri += '?' + urlencode(params)
headers = await self._pre_headers(final_uri)
base_url = (zhihu_constant.ZHIHU_URL if "/p/" not in uri else zhihu_constant.ZHIHU_ZHUANLAN_URL)
return await self.request(method="GET", url=base_url + final_uri, headers=headers, **kwargs)
async def pong(self) -> bool:
"""
用于检查登录态是否失效了
Returns:
"""
utils.logger.info("[ZhiHuClient.pong] Begin to pong zhihu...")
ping_flag = False
try:
res = await self.get_current_user_info()
if res.get("uid") and res.get("name"):
ping_flag = True
utils.logger.info("[ZhiHuClient.pong] Ping zhihu successfully")
else:
utils.logger.error(f"[ZhiHuClient.pong] Ping zhihu failed, response data: {res}")
except Exception as e:
utils.logger.error(f"[ZhiHuClient.pong] Ping zhihu failed: {e}, and try to login again...")
ping_flag = False
return ping_flag
async def update_cookies(self, browser_context: BrowserContext):
"""
API客户端提供的更新cookies方法,一般情况下登录成功后会调用此方法
Args:
browser_context: 浏览器上下文对象
Returns:
"""
cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
self.default_headers["cookie"] = cookie_str
self.cookie_dict = cookie_dict
async def get_current_user_info(self) -> Dict:
"""
获取当前登录用户信息
Returns:
"""
params = {"include": "email,is_active,is_bind_phone"}
return await self.get("/api/v4/me", params)
async def get_note_by_keyword(
self,
keyword: str,
page: int = 1,
page_size: int = 20,
sort: SearchSort = SearchSort.DEFAULT,
note_type: SearchType = SearchType.DEFAULT,
search_time: SearchTime = SearchTime.DEFAULT,
) -> List[ZhihuContent]:
"""
根据关键词搜索
Args:
keyword: 关键词
page: 第几页
page_size: 分页size
sort: 排序
note_type: 搜索结果类型
search_time: 搜索多久时间的结果
Returns:
"""
uri = "/api/v4/search_v3"
params = {
"gk_version": "gz-gaokao",
"t": "general",
"q": keyword,
"correction": 1,
"offset": (page - 1) * page_size,
"limit": page_size,
"filter_fields": "",
"lc_idx": (page - 1) * page_size,
"show_all_topics": 0,
"search_source": "Filter",
"time_interval": search_time.value,
"sort": sort.value,
"vertical": note_type.value,
}
search_res = await self.get(uri, params)
utils.logger.info(f"[ZhiHuClient.get_note_by_keyword] Search result: {search_res}")
return self._extractor.extract_contents_from_search(search_res)
async def get_root_comments(
self,
content_id: str,
content_type: str,
offset: str = "",
limit: int = 10,
order_by: str = "score",
) -> Dict:
"""
获取内容的一级评论
Args:
content_id: 内容ID
content_type: 内容类型(answer, article, zvideo)
offset:
limit:
order_by:
Returns:
"""
uri = f"/api/v4/comment_v5/{content_type}s/{content_id}/root_comment"
params = {"order": order_by, "offset": offset, "limit": limit}
return await self.get(uri, params)
# uri = f"/api/v4/{content_type}s/{content_id}/root_comments"
# params = {
# "order": order_by,
# "offset": offset,
# "limit": limit
# }
# return await self.get(uri, params)
async def get_child_comments(
self,
root_comment_id: str,
offset: str = "",
limit: int = 10,
order_by: str = "sort",
) -> Dict:
"""
获取一级评论下的子评论
Args:
root_comment_id:
offset:
limit:
order_by:
Returns:
"""
uri = f"/api/v4/comment_v5/comment/{root_comment_id}/child_comment"
params = {
"order": order_by,
"offset": offset,
"limit": limit,
}
return await self.get(uri, params)
async def get_note_all_comments(
self,
content: ZhihuContent,
crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
) -> List[ZhihuComment]:
"""
获取指定帖子下的所有一级评论,该方法会一直查找一个帖子下的所有评论信息
Args:
content: 内容详情对象(问题|文章|视频)
crawl_interval: 爬取一次笔记的延迟单位(秒)
callback: 一次笔记爬取结束后
Returns:
"""
result: List[ZhihuComment] = []
is_end: bool = False
offset: str = ""
limit: int = 10
while not is_end:
root_comment_res = await self.get_root_comments(content.content_id, content.content_type, offset, limit)
if not root_comment_res:
break
paging_info = root_comment_res.get("paging", {})
is_end = paging_info.get("is_end")
offset = self._extractor.extract_offset(paging_info)
comments = self._extractor.extract_comments(content, root_comment_res.get("data"))
if not comments:
break
if callback:
await callback(comments)
result.extend(comments)
await self.get_comments_all_sub_comments(content, comments, crawl_interval=crawl_interval, callback=callback)
await asyncio.sleep(crawl_interval)
return result
async def get_comments_all_sub_comments(
self,
content: ZhihuContent,
comments: List[ZhihuComment],
crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
) -> List[ZhihuComment]:
"""
获取指定评论下的所有子评论
Args:
content: 内容详情对象(问题|文章|视频)
comments: 评论列表
crawl_interval: 爬取一次笔记的延迟单位(秒)
callback: 一次笔记爬取结束后
Returns:
"""
if not config.ENABLE_GET_SUB_COMMENTS:
return []
all_sub_comments: List[ZhihuComment] = []
for parment_comment in comments:
if parment_comment.sub_comment_count == 0:
continue
is_end: bool = False
offset: str = ""
limit: int = 10
while not is_end:
child_comment_res = await self.get_child_comments(parment_comment.comment_id, offset, limit)
if not child_comment_res:
break
paging_info = child_comment_res.get("paging", {})
is_end = paging_info.get("is_end")
offset = self._extractor.extract_offset(paging_info)
sub_comments = self._extractor.extract_comments(content, child_comment_res.get("data"))
if not sub_comments:
break
if callback:
await callback(sub_comments)
all_sub_comments.extend(sub_comments)
await asyncio.sleep(crawl_interval)
return all_sub_comments
async def get_creator_info(self, url_token: str) -> Optional[ZhihuCreator]:
"""
获取创作者信息
Args:
url_token:
Returns:
"""
uri = f"/people/{url_token}"
html_content: str = await self.get(uri, return_response=True)
return self._extractor.extract_creator(url_token, html_content)
async def get_creator_answers(self, url_token: str, offset: int = 0, limit: int = 20) -> Dict:
"""
获取创作者的回答
Args:
url_token:
offset:
limit:
Returns:
"""
uri = f"/api/v4/members/{url_token}/answers"
params = {
"include":
"data[*].is_normal,admin_closed_comment,reward_info,is_collapsed,annotation_action,annotation_detail,collapse_reason,collapsed_by,suggest_edit,comment_count,can_comment,content,editable_content,attachment,voteup_count,reshipment_settings,comment_permission,created_time,updated_time,review_info,excerpt,paid_info,reaction_instruction,is_labeled,label_info,relationship.is_authorized,voting,is_author,is_thanked,is_nothelp;data[*].vessay_info;data[*].author.badge[?(type=best_answerer)].topics;data[*].author.vip_info;data[*].question.has_publishing_draft,relationship",
"offset": offset,
"limit": limit,
"order_by": "created"
}
return await self.get(uri, params)
async def get_creator_articles(self, url_token: str, offset: int = 0, limit: int = 20) -> Dict:
"""
获取创作者的文章
Args:
url_token:
offset:
limit:
Returns:
"""
uri = f"/api/v4/members/{url_token}/articles"
params = {
"include":
"data[*].comment_count,suggest_edit,is_normal,thumbnail_extra_info,thumbnail,can_comment,comment_permission,admin_closed_comment,content,voteup_count,created,updated,upvoted_followees,voting,review_info,reaction_instruction,is_labeled,label_info;data[*].vessay_info;data[*].author.badge[?(type=best_answerer)].topics;data[*].author.vip_info;",
"offset": offset,
"limit": limit,
"order_by": "created"
}
return await self.get(uri, params)
async def get_creator_videos(self, url_token: str, offset: int = 0, limit: int = 20) -> Dict:
"""
获取创作者的视频
Args:
url_token:
offset:
limit:
Returns:
"""
uri = f"/api/v4/members/{url_token}/zvideos"
params = {
"include": "similar_zvideo,creation_relationship,reaction_instruction",
"offset": offset,
"limit": limit,
"similar_aggregation": "true",
}
return await self.get(uri, params)
async def get_all_anwser_by_creator(self, creator: ZhihuCreator, crawl_interval: float = 1.0, callback: Optional[Callable] = None) -> List[ZhihuContent]:
"""
获取创作者的所有回答
Args:
creator: 创作者信息
crawl_interval: 爬取一次笔记的延迟单位(秒)
callback: 一次笔记爬取结束后
Returns:
"""
all_contents: List[ZhihuContent] = []
is_end: bool = False
offset: int = 0
limit: int = 20
while not is_end:
res = await self.get_creator_answers(creator.url_token, offset, limit)
if not res:
break
utils.logger.info(f"[ZhiHuClient.get_all_anwser_by_creator] Get creator {creator.url_token} answers: {res}")
paging_info = res.get("paging", {})
is_end = paging_info.get("is_end")
contents = self._extractor.extract_content_list_from_creator(res.get("data"))
if callback:
await callback(contents)
all_contents.extend(contents)
offset += limit
await asyncio.sleep(crawl_interval)
return all_contents
async def get_all_articles_by_creator(
self,
creator: ZhihuCreator,
crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
) -> List[ZhihuContent]:
"""
获取创作者的所有文章
Args:
creator:
crawl_interval:
callback:
Returns:
"""
all_contents: List[ZhihuContent] = []
is_end: bool = False
offset: int = 0
limit: int = 20
while not is_end:
res = await self.get_creator_articles(creator.url_token, offset, limit)
if not res:
break
paging_info = res.get("paging", {})
is_end = paging_info.get("is_end")
contents = self._extractor.extract_content_list_from_creator(res.get("data"))
if callback:
await callback(contents)
all_contents.extend(contents)
offset += limit
await asyncio.sleep(crawl_interval)
return all_contents
async def get_all_videos_by_creator(
self,
creator: ZhihuCreator,
crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
) -> List[ZhihuContent]:
"""
获取创作者的所有视频
Args:
creator:
crawl_interval:
callback:
Returns:
"""
all_contents: List[ZhihuContent] = []
is_end: bool = False
offset: int = 0
limit: int = 20
while not is_end:
res = await self.get_creator_videos(creator.url_token, offset, limit)
if not res:
break
paging_info = res.get("paging", {})
is_end = paging_info.get("is_end")
contents = self._extractor.extract_content_list_from_creator(res.get("data"))
if callback:
await callback(contents)
all_contents.extend(contents)
offset += limit
await asyncio.sleep(crawl_interval)
return all_contents
async def get_answer_info(
self,
question_id: str,
answer_id: str,
) -> Optional[ZhihuContent]:
"""
获取回答信息
Args:
question_id:
answer_id:
Returns:
"""
uri = f"/question/{question_id}/answer/{answer_id}"
response_html = await self.get(uri, return_response=True)
return self._extractor.extract_answer_content_from_html(response_html)
async def get_article_info(self, article_id: str) -> Optional[ZhihuContent]:
"""
获取文章信息
Args:
article_id:
Returns:
"""
uri = f"/p/{article_id}"
response_html = await self.get(uri, return_response=True)
return self._extractor.extract_article_content_from_html(response_html)
async def get_video_info(self, video_id: str) -> Optional[ZhihuContent]:
"""
获取视频信息
Args:
video_id:
Returns:
"""
uri = f"/zvideo/{video_id}"
response_html = await self.get(uri, return_response=True)
return self._extractor.extract_zvideo_content_from_html(response_html)
@@ -1,455 +0,0 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
import asyncio
import os
import random
from asyncio import Task
from typing import Dict, List, Optional, Tuple, cast
from playwright.async_api import (
BrowserContext,
BrowserType,
Page,
Playwright,
async_playwright,
)
import config
from constant import zhihu as constant
from base.base_crawler import AbstractCrawler
from model.m_zhihu import ZhihuContent, ZhihuCreator
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import zhihu as zhihu_store
from tools import utils
from tools.cdp_browser import CDPBrowserManager
from var import crawler_type_var, source_keyword_var
from .client import ZhiHuClient
from .exception import DataFetchError
from .help import ZhihuExtractor, judge_zhihu_url
from .login import ZhiHuLogin
class ZhihuCrawler(AbstractCrawler):
context_page: Page
zhihu_client: ZhiHuClient
browser_context: BrowserContext
cdp_manager: Optional[CDPBrowserManager]
def __init__(self) -> None:
self.index_url = "https://www.zhihu.com"
# self.user_agent = utils.get_user_agent()
self.user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
self._extractor = ZhihuExtractor()
self.cdp_manager = None
async def start(self) -> None:
"""
Start the crawler
Returns:
"""
playwright_proxy_format, httpx_proxy_format = None, None
if config.ENABLE_IP_PROXY:
ip_proxy_pool = await create_ip_pool(
config.IP_PROXY_POOL_COUNT, enable_validate_ip=True
)
ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy()
playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info(
ip_proxy_info
)
async with async_playwright() as playwright:
# 根据配置选择启动模式
if config.ENABLE_CDP_MODE:
utils.logger.info("[ZhihuCrawler] 使用CDP模式启动浏览器")
self.browser_context = await self.launch_browser_with_cdp(
playwright,
playwright_proxy_format,
self.user_agent,
headless=config.CDP_HEADLESS,
)
else:
utils.logger.info("[ZhihuCrawler] 使用标准模式启动浏览器")
# Launch a browser context.
chromium = playwright.chromium
self.browser_context = await self.launch_browser(
chromium, None, self.user_agent, headless=config.HEADLESS
)
# stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js")
self.context_page = await self.browser_context.new_page()
await self.context_page.goto(self.index_url, wait_until="domcontentloaded")
# Create a client to interact with the zhihu website.
self.zhihu_client = await self.create_zhihu_client(httpx_proxy_format)
if not await self.zhihu_client.pong():
login_obj = ZhiHuLogin(
login_type=config.LOGIN_TYPE,
login_phone="", # input your phone number
browser_context=self.browser_context,
context_page=self.context_page,
cookie_str=config.COOKIES,
)
await login_obj.begin()
await self.zhihu_client.update_cookies(
browser_context=self.browser_context
)
# 知乎的搜索接口需要打开搜索页面之后cookies才能访问API,单独的首页不行
utils.logger.info(
"[ZhihuCrawler.start] Zhihu跳转到搜索页面获取搜索页面的Cookies,该过程需要5秒左右"
)
await self.context_page.goto(
f"{self.index_url}/search?q=python&search_source=Guess&utm_content=search_hot&type=content"
)
await asyncio.sleep(5)
await self.zhihu_client.update_cookies(browser_context=self.browser_context)
crawler_type_var.set(config.CRAWLER_TYPE)
if config.CRAWLER_TYPE == "search":
# Search for notes and retrieve their comment information.
await self.search()
elif config.CRAWLER_TYPE == "detail":
# Get the information and comments of the specified post
await self.get_specified_notes()
elif config.CRAWLER_TYPE == "creator":
# Get creator's information and their notes and comments
await self.get_creators_and_notes()
else:
pass
utils.logger.info("[ZhihuCrawler.start] Zhihu Crawler finished ...")
async def search(self) -> None:
"""Search for notes and retrieve their comment information."""
utils.logger.info("[ZhihuCrawler.search] Begin search zhihu keywords")
zhihu_limit_count = 20 # zhihu limit page fixed value
if config.CRAWLER_MAX_NOTES_COUNT < zhihu_limit_count:
config.CRAWLER_MAX_NOTES_COUNT = zhihu_limit_count
start_page = config.START_PAGE
for keyword in config.KEYWORDS.split(","):
source_keyword_var.set(keyword)
utils.logger.info(
f"[ZhihuCrawler.search] Current search keyword: {keyword}"
)
page = 1
while (
page - start_page + 1
) * zhihu_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if page < start_page:
utils.logger.info(f"[ZhihuCrawler.search] Skip page {page}")
page += 1
continue
try:
utils.logger.info(
f"[ZhihuCrawler.search] search zhihu keyword: {keyword}, page: {page}"
)
content_list: List[ZhihuContent] = (
await self.zhihu_client.get_note_by_keyword(
keyword=keyword,
page=page,
)
)
utils.logger.info(
f"[ZhihuCrawler.search] Search contents :{content_list}"
)
if not content_list:
utils.logger.info("No more content!")
break
page += 1
for content in content_list:
await zhihu_store.update_zhihu_content(content)
await self.batch_get_content_comments(content_list)
except DataFetchError:
utils.logger.error("[ZhihuCrawler.search] Search content error")
return
async def batch_get_content_comments(self, content_list: List[ZhihuContent]):
"""
Batch get content comments
Args:
content_list:
Returns:
"""
if not config.ENABLE_GET_COMMENTS:
utils.logger.info(
f"[ZhihuCrawler.batch_get_content_comments] Crawling comment mode is not enabled"
)
return
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = []
for content_item in content_list:
task = asyncio.create_task(
self.get_comments(content_item, semaphore), name=content_item.content_id
)
task_list.append(task)
await asyncio.gather(*task_list)
async def get_comments(
self, content_item: ZhihuContent, semaphore: asyncio.Semaphore
):
"""
Get note comments with keyword filtering and quantity limitation
Args:
content_item:
semaphore:
Returns:
"""
async with semaphore:
utils.logger.info(
f"[ZhihuCrawler.get_comments] Begin get note id comments {content_item.content_id}"
)
await self.zhihu_client.get_note_all_comments(
content=content_item,
crawl_interval=random.random(),
callback=zhihu_store.batch_update_zhihu_note_comments,
)
async def get_creators_and_notes(self) -> None:
"""
Get creator's information and their notes and comments
Returns:
"""
utils.logger.info(
"[ZhihuCrawler.get_creators_and_notes] Begin get xiaohongshu creators"
)
for user_link in config.ZHIHU_CREATOR_URL_LIST:
utils.logger.info(
f"[ZhihuCrawler.get_creators_and_notes] Begin get creator {user_link}"
)
user_url_token = user_link.split("/")[-1]
# get creator detail info from web html content
createor_info: ZhihuCreator = await self.zhihu_client.get_creator_info(
url_token=user_url_token
)
if not createor_info:
utils.logger.info(
f"[ZhihuCrawler.get_creators_and_notes] Creator {user_url_token} not found"
)
continue
utils.logger.info(
f"[ZhihuCrawler.get_creators_and_notes] Creator info: {createor_info}"
)
await zhihu_store.save_creator(creator=createor_info)
# 默认只提取回答信息,如果需要文章和视频,把下面的注释打开即可
# Get all anwser information of the creator
all_content_list = await self.zhihu_client.get_all_anwser_by_creator(
creator=createor_info,
crawl_interval=random.random(),
callback=zhihu_store.batch_update_zhihu_contents,
)
# Get all articles of the creator's contents
# all_content_list = await self.zhihu_client.get_all_articles_by_creator(
# creator=createor_info,
# crawl_interval=random.random(),
# callback=zhihu_store.batch_update_zhihu_contents
# )
# Get all videos of the creator's contents
# all_content_list = await self.zhihu_client.get_all_videos_by_creator(
# creator=createor_info,
# crawl_interval=random.random(),
# callback=zhihu_store.batch_update_zhihu_contents
# )
# Get all comments of the creator's contents
await self.batch_get_content_comments(all_content_list)
async def get_note_detail(
self, full_note_url: str, semaphore: asyncio.Semaphore
) -> Optional[ZhihuContent]:
"""
Get note detail
Args:
full_note_url: str
semaphore:
Returns:
"""
async with semaphore:
utils.logger.info(
f"[ZhihuCrawler.get_specified_notes] Begin get specified note {full_note_url}"
)
# judge note type
note_type: str = judge_zhihu_url(full_note_url)
if note_type == constant.ANSWER_NAME:
question_id = full_note_url.split("/")[-3]
answer_id = full_note_url.split("/")[-1]
utils.logger.info(
f"[ZhihuCrawler.get_specified_notes] Get answer info, question_id: {question_id}, answer_id: {answer_id}"
)
return await self.zhihu_client.get_answer_info(question_id, answer_id)
elif note_type == constant.ARTICLE_NAME:
article_id = full_note_url.split("/")[-1]
utils.logger.info(
f"[ZhihuCrawler.get_specified_notes] Get article info, article_id: {article_id}"
)
return await self.zhihu_client.get_article_info(article_id)
elif note_type == constant.VIDEO_NAME:
video_id = full_note_url.split("/")[-1]
utils.logger.info(
f"[ZhihuCrawler.get_specified_notes] Get video info, video_id: {video_id}"
)
return await self.zhihu_client.get_video_info(video_id)
async def get_specified_notes(self):
"""
Get the information and comments of the specified post
Returns:
"""
get_note_detail_task_list = []
for full_note_url in config.ZHIHU_SPECIFIED_ID_LIST:
# remove query params
full_note_url = full_note_url.split("?")[0]
crawler_task = self.get_note_detail(
full_note_url=full_note_url,
semaphore=asyncio.Semaphore(config.MAX_CONCURRENCY_NUM),
)
get_note_detail_task_list.append(crawler_task)
need_get_comment_notes: List[ZhihuContent] = []
note_details = await asyncio.gather(*get_note_detail_task_list)
for index, note_detail in enumerate(note_details):
if not note_detail:
utils.logger.info(
f"[ZhihuCrawler.get_specified_notes] Note {config.ZHIHU_SPECIFIED_ID_LIST[index]} not found"
)
continue
note_detail = cast(ZhihuContent, note_detail) # only for type check
need_get_comment_notes.append(note_detail)
await zhihu_store.update_zhihu_content(note_detail)
await self.batch_get_content_comments(need_get_comment_notes)
async def create_zhihu_client(self, httpx_proxy: Optional[str]) -> ZhiHuClient:
"""Create zhihu client"""
utils.logger.info(
"[ZhihuCrawler.create_zhihu_client] Begin create zhihu API client ..."
)
cookie_str, cookie_dict = utils.convert_cookies(
await self.browser_context.cookies()
)
zhihu_client_obj = ZhiHuClient(
proxy=httpx_proxy,
headers={
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"cookie": cookie_str,
"priority": "u=1, i",
"referer": "https://www.zhihu.com/search?q=python&time_interval=a_year&type=content",
"user-agent": self.user_agent,
"x-api-version": "3.0.91",
"x-app-za": "OS=Web",
"x-requested-with": "fetch",
"x-zse-93": "101_3_3.0",
},
playwright_page=self.context_page,
cookie_dict=cookie_dict,
)
return zhihu_client_obj
async def launch_browser(
self,
chromium: BrowserType,
playwright_proxy: Optional[Dict],
user_agent: Optional[str],
headless: bool = True,
) -> BrowserContext:
"""Launch browser and create browser context"""
utils.logger.info(
"[ZhihuCrawler.launch_browser] Begin create browser context ..."
)
if config.SAVE_LOGIN_STATE:
# feat issue #14
# we will save login state to avoid login every time
user_data_dir = os.path.join(
os.getcwd(), "browser_data", config.USER_DATA_DIR % config.PLATFORM
) # type: ignore
browser_context = await chromium.launch_persistent_context(
user_data_dir=user_data_dir,
accept_downloads=True,
headless=headless,
proxy=playwright_proxy, # type: ignore
viewport={"width": 1920, "height": 1080},
user_agent=user_agent,
)
return browser_context
else:
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
browser_context = await browser.new_context(
viewport={"width": 1920, "height": 1080}, user_agent=user_agent
)
return browser_context
async def launch_browser_with_cdp(
self,
playwright: Playwright,
playwright_proxy: Optional[Dict],
user_agent: Optional[str],
headless: bool = True,
) -> BrowserContext:
"""
使用CDP模式启动浏览器
"""
try:
self.cdp_manager = CDPBrowserManager()
browser_context = await self.cdp_manager.launch_and_connect(
playwright=playwright,
playwright_proxy=playwright_proxy,
user_agent=user_agent,
headless=headless,
)
# 显示浏览器信息
browser_info = await self.cdp_manager.get_browser_info()
utils.logger.info(f"[ZhihuCrawler] CDP浏览器信息: {browser_info}")
return browser_context
except Exception as e:
utils.logger.error(f"[ZhihuCrawler] CDP模式启动失败,回退到标准模式: {e}")
# 回退到标准模式
chromium = playwright.chromium
return await self.launch_browser(
chromium, playwright_proxy, user_agent, headless
)
async def close(self):
"""Close browser context"""
# 如果使用CDP模式,需要特殊处理
if self.cdp_manager:
await self.cdp_manager.cleanup()
self.cdp_manager = None
else:
await self.browser_context.close()
utils.logger.info("[ZhihuCrawler.close] Browser context closed ...")
@@ -1,23 +0,0 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
from httpx import RequestError
class DataFetchError(RequestError):
"""something error when fetch"""
class IPBlockError(RequestError):
"""fetch so fast that the server block us ip"""
class ForbiddenError(RequestError):
"""Forbidden"""
@@ -1,47 +0,0 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
from enum import Enum
from typing import NamedTuple
from constant import zhihu as zhihu_constant
class SearchTime(Enum):
"""
搜索时间范围
"""
DEFAULT = "" # 不限时间
ONE_DAY = "a_day" # 一天内
ONE_WEEK = "a_week" # 一周内
ONE_MONTH = "a_month" # 一个月内
THREE_MONTH = "three_months" # 三个月内
HALF_YEAR = "half_a_year" # 半年内
ONE_YEAR = "a_year" # 一年内
class SearchType(Enum):
"""
搜索结果类型
"""
DEFAULT = "" # 不限类型
ANSWER = zhihu_constant.ANSWER_NAME # 只看回答
ARTICLE = zhihu_constant.ARTICLE_NAME # 只看文章
VIDEO = zhihu_constant.VIDEO_NAME # 只看视频
class SearchSort(Enum):
"""
搜索结果排序
"""
DEFAULT = "" # 综合排序
UPVOTED_COUNT = "upvoted_count" # 最多赞同
CREATE_TIME = "created_time" # 最新发布
@@ -1,467 +0,0 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
import json
from typing import Dict, List, Optional
from urllib.parse import parse_qs, urlparse
import execjs
from parsel import Selector
from constant import zhihu as zhihu_constant
from model.m_zhihu import ZhihuComment, ZhihuContent, ZhihuCreator
from tools import utils
from tools.crawler_util import extract_text_from_html
ZHIHU_SGIN_JS = None
def sign(url: str, cookies: str) -> Dict:
"""
zhihu sign algorithm
Args:
url: request url with query string
cookies: request cookies with d_c0 key
Returns:
"""
global ZHIHU_SGIN_JS
if not ZHIHU_SGIN_JS:
with open("libs/zhihu.js", mode="r", encoding="utf-8-sig") as f:
ZHIHU_SGIN_JS = execjs.compile(f.read())
return ZHIHU_SGIN_JS.call("get_sign", url, cookies)
class ZhihuExtractor:
def __init__(self):
pass
def extract_contents_from_search(self, json_data: Dict) -> List[ZhihuContent]:
"""
extract zhihu contents
Args:
json_data: zhihu json data
Returns:
"""
if not json_data:
return []
search_result: List[Dict] = json_data.get("data", [])
search_result = [s_item for s_item in search_result if s_item.get("type") in ['search_result', 'zvideo']]
return self._extract_content_list([sr_item.get("object") for sr_item in search_result if sr_item.get("object")])
def _extract_content_list(self, content_list: List[Dict]) -> List[ZhihuContent]:
"""
extract zhihu content list
Args:
content_list:
Returns:
"""
if not content_list:
return []
res: List[ZhihuContent] = []
for content in content_list:
if content.get("type") == zhihu_constant.ANSWER_NAME:
res.append(self._extract_answer_content(content))
elif content.get("type") == zhihu_constant.ARTICLE_NAME:
res.append(self._extract_article_content(content))
elif content.get("type") == zhihu_constant.VIDEO_NAME:
res.append(self._extract_zvideo_content(content))
else:
continue
return res
def _extract_answer_content(self, answer: Dict) -> ZhihuContent:
"""
extract zhihu answer content
Args:
answer: zhihu answer
Returns:
"""
res = ZhihuContent()
res.content_id = answer.get("id")
res.content_type = answer.get("type")
res.content_text = extract_text_from_html(answer.get("content", ""))
res.question_id = answer.get("question").get("id")
res.content_url = f"{zhihu_constant.ZHIHU_URL}/question/{res.question_id}/answer/{res.content_id}"
res.title = extract_text_from_html(answer.get("title", ""))
res.desc = extract_text_from_html(answer.get("description", "") or answer.get("excerpt", ""))
res.created_time = answer.get("created_time")
res.updated_time = answer.get("updated_time")
res.voteup_count = answer.get("voteup_count", 0)
res.comment_count = answer.get("comment_count", 0)
# extract author info
author_info = self._extract_content_or_comment_author(answer.get("author"))
res.user_id = author_info.user_id
res.user_link = author_info.user_link
res.user_nickname = author_info.user_nickname
res.user_avatar = author_info.user_avatar
res.user_url_token = author_info.url_token
return res
def _extract_article_content(self, article: Dict) -> ZhihuContent:
"""
extract zhihu article content
Args:
article: zhihu article
Returns:
"""
res = ZhihuContent()
res.content_id = article.get("id")
res.content_type = article.get("type")
res.content_text = extract_text_from_html(article.get("content"))
res.content_url = f"{zhihu_constant.ZHIHU_ZHUANLAN_URL}/p/{res.content_id}"
res.title = extract_text_from_html(article.get("title"))
res.desc = extract_text_from_html(article.get("excerpt"))
res.created_time = article.get("created_time", 0) or article.get("created", 0)
res.updated_time = article.get("updated_time", 0) or article.get("updated", 0)
res.voteup_count = article.get("voteup_count", 0)
res.comment_count = article.get("comment_count", 0)
# extract author info
author_info = self._extract_content_or_comment_author(article.get("author"))
res.user_id = author_info.user_id
res.user_link = author_info.user_link
res.user_nickname = author_info.user_nickname
res.user_avatar = author_info.user_avatar
res.user_url_token = author_info.url_token
return res
def _extract_zvideo_content(self, zvideo: Dict) -> ZhihuContent:
"""
extract zhihu zvideo content
Args:
zvideo:
Returns:
"""
res = ZhihuContent()
if "video" in zvideo and isinstance(zvideo.get("video"), dict): # 说明是从创作者主页的视频列表接口来的
res.content_url = f"{zhihu_constant.ZHIHU_URL}/zvideo/{res.content_id}"
res.created_time = zvideo.get("published_at")
res.updated_time = zvideo.get("updated_at")
else:
res.content_url = zvideo.get("video_url")
res.created_time = zvideo.get("created_at")
res.content_id = zvideo.get("id")
res.content_type = zvideo.get("type")
res.title = extract_text_from_html(zvideo.get("title"))
res.desc = extract_text_from_html(zvideo.get("description"))
res.voteup_count = zvideo.get("voteup_count")
res.comment_count = zvideo.get("comment_count")
# extract author info
author_info = self._extract_content_or_comment_author(zvideo.get("author"))
res.user_id = author_info.user_id
res.user_link = author_info.user_link
res.user_nickname = author_info.user_nickname
res.user_avatar = author_info.user_avatar
res.user_url_token = author_info.url_token
return res
@staticmethod
def _extract_content_or_comment_author(author: Dict) -> ZhihuCreator:
"""
extract zhihu author
Args:
author:
Returns:
"""
res = ZhihuCreator()
try:
if not author:
return res
if not author.get("id"):
author = author.get("member")
res.user_id = author.get("id")
res.user_link = f"{zhihu_constant.ZHIHU_URL}/people/{author.get('url_token')}"
res.user_nickname = author.get("name")
res.user_avatar = author.get("avatar_url")
res.url_token = author.get("url_token")
except Exception as e :
utils.logger.warning(
f"[ZhihuExtractor._extract_content_or_comment_author] User Maybe Blocked. {e}"
)
return res
def extract_comments(self, page_content: ZhihuContent, comments: List[Dict]) -> List[ZhihuComment]:
"""
extract zhihu comments
Args:
page_content: zhihu content object
comments: zhihu comments
Returns:
"""
if not comments:
return []
res: List[ZhihuComment] = []
for comment in comments:
if comment.get("type") != "comment":
continue
res.append(self._extract_comment(page_content, comment))
return res
def _extract_comment(self, page_content: ZhihuContent, comment: Dict) -> ZhihuComment:
"""
extract zhihu comment
Args:
page_content: comment with content object
comment: zhihu comment
Returns:
"""
res = ZhihuComment()
res.comment_id = str(comment.get("id", ""))
res.parent_comment_id = comment.get("reply_comment_id")
res.content = extract_text_from_html(comment.get("content"))
res.publish_time = comment.get("created_time")
res.ip_location = self._extract_comment_ip_location(comment.get("comment_tag", []))
res.sub_comment_count = comment.get("child_comment_count")
res.like_count = comment.get("like_count") if comment.get("like_count") else 0
res.dislike_count = comment.get("dislike_count") if comment.get("dislike_count") else 0
res.content_id = page_content.content_id
res.content_type = page_content.content_type
# extract author info
author_info = self._extract_content_or_comment_author(comment.get("author"))
res.user_id = author_info.user_id
res.user_link = author_info.user_link
res.user_nickname = author_info.user_nickname
res.user_avatar = author_info.user_avatar
return res
@staticmethod
def _extract_comment_ip_location(comment_tags: List[Dict]) -> str:
"""
extract comment ip location
Args:
comment_tags:
Returns:
"""
if not comment_tags:
return ""
for ct in comment_tags:
if ct.get("type") == "ip_info":
return ct.get("text")
return ""
@staticmethod
def extract_offset(paging_info: Dict) -> str:
"""
extract offset
Args:
paging_info:
Returns:
"""
# https://www.zhihu.com/api/v4/comment_v5/zvideos/1424368906836807681/root_comment?limit=10&offset=456770961_10125996085_0&order_by=score
next_url = paging_info.get("next")
if not next_url:
return ""
parsed_url = urlparse(next_url)
query_params = parse_qs(parsed_url.query)
offset = query_params.get('offset', [""])[0]
return offset
@staticmethod
def _foramt_gender_text(gender: int) -> str:
"""
format gender text
Args:
gender:
Returns:
"""
if gender == 1:
return ""
elif gender == 0:
return ""
else:
return "未知"
def extract_creator(self, user_url_token: str, html_content: str) -> Optional[ZhihuCreator]:
"""
extract zhihu creator
Args:
user_url_token : zhihu creator url token
html_content: zhihu creator html content
Returns:
"""
if not html_content:
return None
js_init_data = Selector(text=html_content).xpath("//script[@id='js-initialData']/text()").get(default="").strip()
if not js_init_data:
return None
js_init_data_dict: Dict = json.loads(js_init_data)
users_info: Dict = js_init_data_dict.get("initialState", {}).get("entities", {}).get("users", {})
if not users_info:
return None
creator_info: Dict = users_info.get(user_url_token)
if not creator_info:
return None
res = ZhihuCreator()
res.user_id = creator_info.get("id")
res.user_link = f"{zhihu_constant.ZHIHU_URL}/people/{user_url_token}"
res.user_nickname = creator_info.get("name")
res.user_avatar = creator_info.get("avatarUrl")
res.url_token = creator_info.get("urlToken") or user_url_token
res.gender = self._foramt_gender_text(creator_info.get("gender"))
res.ip_location = creator_info.get("ipInfo")
res.follows = creator_info.get("followingCount")
res.fans = creator_info.get("followerCount")
res.anwser_count = creator_info.get("answerCount")
res.video_count = creator_info.get("zvideoCount")
res.question_count = creator_info.get("questionCount")
res.article_count = creator_info.get("articlesCount")
res.column_count = creator_info.get("columnsCount")
res.get_voteup_count = creator_info.get("voteupCount")
return res
def extract_content_list_from_creator(self, anwser_list: List[Dict]) -> List[ZhihuContent]:
"""
extract content list from creator
Args:
anwser_list:
Returns:
"""
if not anwser_list:
return []
return self._extract_content_list(anwser_list)
def extract_answer_content_from_html(self, html_content: str) -> Optional[ZhihuContent]:
"""
extract zhihu answer content from html
Args:
html_content:
Returns:
"""
js_init_data: str = Selector(text=html_content).xpath("//script[@id='js-initialData']/text()").get(default="")
if not js_init_data:
return None
json_data: Dict = json.loads(js_init_data)
answer_info: Dict = json_data.get("initialState", {}).get("entities", {}).get("answers", {})
if not answer_info:
return None
return self._extract_answer_content(answer_info.get(list(answer_info.keys())[0]))
def extract_article_content_from_html(self, html_content: str) -> Optional[ZhihuContent]:
"""
extract zhihu article content from html
Args:
html_content:
Returns:
"""
js_init_data: str = Selector(text=html_content).xpath("//script[@id='js-initialData']/text()").get(default="")
if not js_init_data:
return None
json_data: Dict = json.loads(js_init_data)
article_info: Dict = json_data.get("initialState", {}).get("entities", {}).get("articles", {})
if not article_info:
return None
return self._extract_article_content(article_info.get(list(article_info.keys())[0]))
def extract_zvideo_content_from_html(self, html_content: str) -> Optional[ZhihuContent]:
"""
extract zhihu zvideo content from html
Args:
html_content:
Returns:
"""
js_init_data: str = Selector(text=html_content).xpath("//script[@id='js-initialData']/text()").get(default="")
if not js_init_data:
return None
json_data: Dict = json.loads(js_init_data)
zvideo_info: Dict = json_data.get("initialState", {}).get("entities", {}).get("zvideos", {})
users: Dict = json_data.get("initialState", {}).get("entities", {}).get("users", {})
if not zvideo_info:
return None
# handler user info and video info
video_detail_info: Dict = zvideo_info.get(list(zvideo_info.keys())[0])
if not video_detail_info:
return None
if isinstance(video_detail_info.get("author"), str):
author_name: str = video_detail_info.get("author")
video_detail_info["author"] = users.get(author_name)
return self._extract_zvideo_content(video_detail_info)
def judge_zhihu_url(note_detail_url: str) -> str:
"""
judge zhihu url type
Args:
note_detail_url:
eg1: https://www.zhihu.com/question/123456789/answer/123456789 # answer
eg2: https://www.zhihu.com/p/123456789 # article
eg3: https://www.zhihu.com/zvideo/123456789 # zvideo
Returns:
"""
if "/answer/" in note_detail_url:
return zhihu_constant.ANSWER_NAME
elif "/p/" in note_detail_url:
return zhihu_constant.ARTICLE_NAME
elif "/zvideo/" in note_detail_url:
return zhihu_constant.VIDEO_NAME
else:
return ""
@@ -1,115 +0,0 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
import asyncio
import functools
import sys
from typing import Optional
from playwright.async_api import BrowserContext, Page
from tenacity import (RetryError, retry, retry_if_result, stop_after_attempt,
wait_fixed)
import config
from base.base_crawler import AbstractLogin
from tools import utils
class ZhiHuLogin(AbstractLogin):
def __init__(self,
login_type: str,
browser_context: BrowserContext,
context_page: Page,
login_phone: Optional[str] = "",
cookie_str: str = ""
):
config.LOGIN_TYPE = login_type
self.browser_context = browser_context
self.context_page = context_page
self.login_phone = login_phone
self.cookie_str = cookie_str
@retry(stop=stop_after_attempt(600), wait=wait_fixed(1), retry=retry_if_result(lambda value: value is False))
async def check_login_state(self) -> bool:
"""
Check if the current login status is successful and return True otherwise return False
Returns:
"""
current_cookie = await self.browser_context.cookies()
_, cookie_dict = utils.convert_cookies(current_cookie)
current_web_session = cookie_dict.get("z_c0")
if current_web_session:
return True
return False
async def begin(self):
"""Start login zhihu"""
utils.logger.info("[ZhiHu.begin] Begin login zhihu ...")
if config.LOGIN_TYPE == "qrcode":
await self.login_by_qrcode()
elif config.LOGIN_TYPE == "phone":
await self.login_by_mobile()
elif config.LOGIN_TYPE == "cookie":
await self.login_by_cookies()
else:
raise ValueError("[ZhiHu.begin]I nvalid Login Type Currently only supported qrcode or phone or cookies ...")
async def login_by_mobile(self):
"""Login zhihu by mobile"""
# todo implement login by mobile
async def login_by_qrcode(self):
"""login zhihu website and keep webdriver login state"""
utils.logger.info("[ZhiHu.login_by_qrcode] Begin login zhihu by qrcode ...")
qrcode_img_selector = "canvas.Qrcode-qrcode"
# find login qrcode
base64_qrcode_img = await utils.find_qrcode_img_from_canvas(
self.context_page,
canvas_selector=qrcode_img_selector
)
if not base64_qrcode_img:
utils.logger.info("[ZhiHu.login_by_qrcode] login failed , have not found qrcode please check ....")
if not base64_qrcode_img:
sys.exit()
# show login qrcode
# fix issue #12
# we need to use partial function to call show_qrcode function and run in executor
# then current asyncio event loop will not be blocked
partial_show_qrcode = functools.partial(utils.show_qrcode, base64_qrcode_img)
asyncio.get_running_loop().run_in_executor(executor=None, func=partial_show_qrcode)
utils.logger.info(f"[ZhiHu.login_by_qrcode] waiting for scan code login, remaining time is 120s")
try:
await self.check_login_state()
except RetryError:
utils.logger.info("[ZhiHu.login_by_qrcode] Login zhihu failed by qrcode login method ...")
sys.exit()
wait_redirect_seconds = 5
utils.logger.info(
f"[ZhiHu.login_by_qrcode] Login successful then wait for {wait_redirect_seconds} seconds redirect ...")
await asyncio.sleep(wait_redirect_seconds)
async def login_by_cookies(self):
"""login zhihu website by cookies"""
utils.logger.info("[ZhiHu.login_by_cookies] Begin login zhihu by cookie ...")
for key, value in utils.convert_str_cookie_to_dict(self.cookie_str).items():
await self.browser_context.add_cookies([{
'name': key,
'value': value,
'domain': ".zhihu.com",
'path': "/"
}])