1. 同步MediaCrawler为最新版本

2. 修复数据库not null错误
3. 支持PG数据库
4. 规范环境变量及配置使用
5. 规范为uv安装
6. 使用loggru
This commit is contained in:
Doiiars
2025-11-03 22:38:34 +08:00
parent 62fac9ee2e
commit f4fe4141d4
155 changed files with 9414 additions and 6247 deletions
@@ -15,7 +15,7 @@ from typing import List
import config
from base.base_crawler import AbstractStore
from model.m_zhihu import ZhihuComment, ZhihuContent, ZhihuCreator
from store.zhihu.zhihu_store_impl import (ZhihuCsvStoreImplement,
from ._store_impl import (ZhihuCsvStoreImplement,
ZhihuDbStoreImplement,
ZhihuJsonStoreImplement,
ZhihuSqliteStoreImplement)
@@ -28,14 +28,15 @@ class ZhihuStoreFactory:
"csv": ZhihuCsvStoreImplement,
"db": ZhihuDbStoreImplement,
"json": ZhihuJsonStoreImplement,
"sqlite": ZhihuSqliteStoreImplement
"sqlite": ZhihuSqliteStoreImplement,
"postgresql": ZhihuDbStoreImplement,
}
@staticmethod
def create_store() -> AbstractStore:
store_class = ZhihuStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError("[ZhihuStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
raise ValueError("[ZhihuStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or postgresql ...")
return store_class()
async def batch_update_zhihu_contents(contents: List[ZhihuContent]):
@@ -0,0 +1,191 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
# @Author : persist1@126.com
# @Time : 2025/9/5 19:34
# @Desc : 知乎存储实现类
import asyncio
import csv
import json
import os
import pathlib
from typing import Dict
import aiofiles
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
import config
from base.base_crawler import AbstractStore
from database.db_session import get_session
from database.models import ZhihuContent, ZhihuComment, ZhihuCreator
from tools import utils, words
from var import crawler_type_var
from tools.async_file_writer import AsyncFileWriter
def calculate_number_of_files(file_store_path: str) -> int:
"""计算数据保存文件的前部分排序数字,支持每次运行代码不写到同一个文件中
Args:
file_store_path;
Returns:
file nums
"""
if not os.path.exists(file_store_path):
return 1
try:
return max([int(file_name.split("_")[0]) for file_name in os.listdir(file_store_path)]) + 1
except ValueError:
return 1
class ZhihuCsvStoreImplement(AbstractStore):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.writer = AsyncFileWriter(platform="zhihu", crawler_type=crawler_type_var.get())
async def store_content(self, content_item: Dict):
"""
Zhihu content CSV storage implementation
Args:
content_item: note item dict
Returns:
"""
await self.writer.write_to_csv(item_type="contents", item=content_item)
async def store_comment(self, comment_item: Dict):
"""
Zhihu comment CSV storage implementation
Args:
comment_item: comment item dict
Returns:
"""
await self.writer.write_to_csv(item_type="comments", item=comment_item)
async def store_creator(self, creator: Dict):
"""
Zhihu content CSV storage implementation
Args:
creator: creator dict
Returns:
"""
await self.writer.write_to_csv(item_type="creators", item=creator)
class ZhihuDbStoreImplement(AbstractStore):
async def store_content(self, content_item: Dict):
"""
Zhihu content DB storage implementation
Args:
content_item: content item dict
"""
content_id = content_item.get("content_id")
async with get_session() as session:
stmt = select(ZhihuContent).where(ZhihuContent.content_id == content_id)
result = await session.execute(stmt)
existing_content = result.scalars().first()
if existing_content:
for key, value in content_item.items():
setattr(existing_content, key, value)
else:
new_content = ZhihuContent(**content_item)
session.add(new_content)
await session.commit()
async def store_comment(self, comment_item: Dict):
"""
Zhihu content DB storage implementation
Args:
comment_item: comment item dict
"""
comment_id = comment_item.get("comment_id")
async with get_session() as session:
stmt = select(ZhihuComment).where(ZhihuComment.comment_id == comment_id)
result = await session.execute(stmt)
existing_comment = result.scalars().first()
if existing_comment:
for key, value in comment_item.items():
setattr(existing_comment, key, value)
else:
new_comment = ZhihuComment(**comment_item)
session.add(new_comment)
await session.commit()
async def store_creator(self, creator: Dict):
"""
Zhihu content DB storage implementation
Args:
creator: creator dict
"""
user_id = creator.get("user_id")
async with get_session() as session:
stmt = select(ZhihuCreator).where(ZhihuCreator.user_id == user_id)
result = await session.execute(stmt)
existing_creator = result.scalars().first()
if existing_creator:
for key, value in creator.items():
setattr(existing_creator, key, value)
else:
new_creator = ZhihuCreator(**creator)
session.add(new_creator)
await session.commit()
class ZhihuJsonStoreImplement(AbstractStore):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.writer = AsyncFileWriter(platform="zhihu", crawler_type=crawler_type_var.get())
async def store_content(self, content_item: Dict):
"""
content JSON storage implementation
Args:
content_item:
Returns:
"""
await self.writer.write_single_item_to_json(item_type="contents", item=content_item)
async def store_comment(self, comment_item: Dict):
"""
comment JSON storage implementation
Args:
comment_item:
Returns:
"""
await self.writer.write_single_item_to_json(item_type="comments", item=comment_item)
async def store_creator(self, creator: Dict):
"""
Zhihu content JSON storage implementation
Args:
creator: creator dict
Returns:
"""
await self.writer.write_single_item_to_json(item_type="creators", item=creator)
class ZhihuSqliteStoreImplement(ZhihuDbStoreImplement):
"""
Zhihu content SQLite storage implementation
"""
pass
@@ -1,318 +0,0 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
import asyncio
import csv
import json
import os
import pathlib
from typing import Dict
import aiofiles
import config
from base.base_crawler import AbstractStore
from tools import utils, words
from var import crawler_type_var
def calculate_number_of_files(file_store_path: str) -> int:
"""计算数据保存文件的前部分排序数字,支持每次运行代码不写到同一个文件中
Args:
file_store_path;
Returns:
file nums
"""
if not os.path.exists(file_store_path):
return 1
try:
return max([int(file_name.split("_")[0]) for file_name in os.listdir(file_store_path)]) + 1
except ValueError:
return 1
class ZhihuCsvStoreImplement(AbstractStore):
csv_store_path: str = "data/zhihu"
file_count: int = calculate_number_of_files(csv_store_path)
def make_save_file_name(self, store_type: str) -> str:
"""
make save file name by store type
Args:
store_type: contents or comments
Returns: eg: data/zhihu/search_comments_20240114.csv ...
"""
return f"{self.csv_store_path}/{self.file_count}_{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.csv"
async def save_data_to_csv(self, save_item: Dict, store_type: str):
"""
Below is a simple way to save it in CSV format.
Args:
save_item: save content dict info
store_type: Save type contains content and commentscontents | comments
Returns: no returns
"""
pathlib.Path(self.csv_store_path).mkdir(parents=True, exist_ok=True)
save_file_name = self.make_save_file_name(store_type=store_type)
async with aiofiles.open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
f.fileno()
writer = csv.writer(f)
if await f.tell() == 0:
await writer.writerow(save_item.keys())
await writer.writerow(save_item.values())
async def store_content(self, content_item: Dict):
"""
Zhihu content CSV storage implementation
Args:
content_item: note item dict
Returns:
"""
await self.save_data_to_csv(save_item=content_item, store_type="contents")
async def store_comment(self, comment_item: Dict):
"""
Zhihu comment CSV storage implementation
Args:
comment_item: comment item dict
Returns:
"""
await self.save_data_to_csv(save_item=comment_item, store_type="comments")
async def store_creator(self, creator: Dict):
"""
Zhihu content CSV storage implementation
Args:
creator: creator dict
Returns:
"""
await self.save_data_to_csv(save_item=creator, store_type="creator")
class ZhihuDbStoreImplement(AbstractStore):
async def store_content(self, content_item: Dict):
"""
Zhihu content DB storage implementation
Args:
content_item: content item dict
Returns:
"""
from .zhihu_store_sql import (add_new_content,
query_content_by_content_id,
update_content_by_content_id)
note_id = content_item.get("note_id")
note_detail: Dict = await query_content_by_content_id(content_id=note_id)
if not note_detail:
content_item["add_ts"] = utils.get_current_timestamp()
await add_new_content(content_item)
else:
await update_content_by_content_id(note_id, content_item=content_item)
async def store_comment(self, comment_item: Dict):
"""
Zhihu content DB storage implementation
Args:
comment_item: comment item dict
Returns:
"""
from .zhihu_store_sql import (add_new_comment,
query_comment_by_comment_id,
update_comment_by_comment_id)
comment_id = comment_item.get("comment_id")
comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id)
if not comment_detail:
comment_item["add_ts"] = utils.get_current_timestamp()
await add_new_comment(comment_item)
else:
await update_comment_by_comment_id(comment_id, comment_item=comment_item)
async def store_creator(self, creator: Dict):
"""
Zhihu content DB storage implementation
Args:
creator: creator dict
Returns:
"""
from .zhihu_store_sql import (add_new_creator,
query_creator_by_user_id,
update_creator_by_user_id)
user_id = creator.get("user_id")
user_detail: Dict = await query_creator_by_user_id(user_id)
if not user_detail:
creator["add_ts"] = utils.get_current_timestamp()
await add_new_creator(creator)
else:
await update_creator_by_user_id(user_id, creator)
class ZhihuJsonStoreImplement(AbstractStore):
json_store_path: str = "data/zhihu/json"
words_store_path: str = "data/zhihu/words"
lock = asyncio.Lock()
file_count: int = calculate_number_of_files(json_store_path)
WordCloud = words.AsyncWordCloudGenerator()
def make_save_file_name(self, store_type: str) -> (str, str):
"""
make save file name by store type
Args:
store_type: Save type contains content and commentscontents | comments
Returns:
"""
return (
f"{self.json_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.json",
f"{self.words_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}"
)
async def save_data_to_json(self, save_item: Dict, store_type: str):
"""
Below is a simple way to save it in json format.
Args:
save_item: save content dict info
store_type: Save type contains content and commentscontents | comments
Returns:
"""
pathlib.Path(self.json_store_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(self.words_store_path).mkdir(parents=True, exist_ok=True)
save_file_name, words_file_name_prefix = self.make_save_file_name(store_type=store_type)
save_data = []
async with self.lock:
if os.path.exists(save_file_name):
async with aiofiles.open(save_file_name, 'r', encoding='utf-8') as file:
save_data = json.loads(await file.read())
save_data.append(save_item)
async with aiofiles.open(save_file_name, 'w', encoding='utf-8') as file:
await file.write(json.dumps(save_data, ensure_ascii=False, indent=4))
if config.ENABLE_GET_COMMENTS and config.ENABLE_GET_WORDCLOUD:
try:
await self.WordCloud.generate_word_frequency_and_cloud(save_data, words_file_name_prefix)
except:
pass
async def store_content(self, content_item: Dict):
"""
content JSON storage implementation
Args:
content_item:
Returns:
"""
await self.save_data_to_json(content_item, "contents")
async def store_comment(self, comment_item: Dict):
"""
comment JSON storage implementation
Args:
comment_item:
Returns:
"""
await self.save_data_to_json(comment_item, "comments")
async def store_creator(self, creator: Dict):
"""
Zhihu content JSON storage implementation
Args:
creator: creator dict
Returns:
"""
await self.save_data_to_json(creator, "creator")
class ZhihuSqliteStoreImplement(AbstractStore):
async def store_content(self, content_item: Dict):
"""
Zhihu content SQLite storage implementation
Args:
content_item: content item dict
Returns:
"""
from .zhihu_store_sql import (add_new_content,
query_content_by_content_id,
update_content_by_content_id)
note_id = content_item.get("note_id")
note_detail: Dict = await query_content_by_content_id(content_id=note_id)
if not note_detail:
content_item["add_ts"] = utils.get_current_timestamp()
await add_new_content(content_item)
else:
await update_content_by_content_id(note_id, content_item=content_item)
async def store_comment(self, comment_item: Dict):
"""
Zhihu comment SQLite storage implementation
Args:
comment_item: comment item dict
Returns:
"""
from .zhihu_store_sql import (add_new_comment,
query_comment_by_comment_id,
update_comment_by_comment_id)
comment_id = comment_item.get("comment_id")
comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id)
if not comment_detail:
comment_item["add_ts"] = utils.get_current_timestamp()
await add_new_comment(comment_item)
else:
await update_comment_by_comment_id(comment_id, comment_item=comment_item)
async def store_creator(self, creator: Dict):
"""
Zhihu creator SQLite storage implementation
Args:
creator: creator dict
Returns:
"""
from .zhihu_store_sql import (add_new_creator,
query_creator_by_user_id,
update_creator_by_user_id)
user_id = creator.get("user_id")
user_detail: Dict = await query_creator_by_user_id(user_id)
if not user_detail:
creator["add_ts"] = utils.get_current_timestamp()
await add_new_creator(creator)
else:
await update_creator_by_user_id(user_id, creator)
@@ -1,156 +0,0 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
from typing import Dict, List, Union
from async_db import AsyncMysqlDB
from async_sqlite_db import AsyncSqliteDB
from var import media_crawler_db_var
async def query_content_by_content_id(content_id: str) -> Dict:
"""
查询一条内容记录(zhihu的帖子 | 抖音的视频 | 微博 | 快手视频 ...)
Args:
content_id:
Returns:
"""
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from zhihu_content where content_id = '{content_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
return rows[0]
return dict()
async def add_new_content(content_item: Dict) -> int:
"""
新增一条内容记录(zhihu的帖子 | 抖音的视频 | 微博 | 快手视频 ...)
Args:
content_item:
Returns:
"""
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("zhihu_content", content_item)
return last_row_id
async def update_content_by_content_id(content_id: str, content_item: Dict) -> int:
"""
更新一条记录(zhihu的帖子 | 抖音的视频 | 微博 | 快手视频 ...)
Args:
content_id:
content_item:
Returns:
"""
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("zhihu_content", content_item, "content_id", content_id)
return effect_row
async def query_comment_by_comment_id(comment_id: str) -> Dict:
"""
查询一条评论内容
Args:
comment_id:
Returns:
"""
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from zhihu_comment where comment_id = '{comment_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
return rows[0]
return dict()
async def add_new_comment(comment_item: Dict) -> int:
"""
新增一条评论记录
Args:
comment_item:
Returns:
"""
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("zhihu_comment", comment_item)
return last_row_id
async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> int:
"""
更新增一条评论记录
Args:
comment_id:
comment_item:
Returns:
"""
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("zhihu_comment", comment_item, "comment_id", comment_id)
return effect_row
async def query_creator_by_user_id(user_id: str) -> Dict:
"""
查询一条创作者记录
Args:
user_id:
Returns:
"""
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
sql: str = f"select * from zhihu_creator where user_id = '{user_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
return rows[0]
return dict()
async def add_new_creator(creator_item: Dict) -> int:
"""
新增一条创作者信息
Args:
creator_item:
Returns:
"""
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("zhihu_creator", creator_item)
return last_row_id
async def update_creator_by_user_id(user_id: str, creator_item: Dict) -> int:
"""
更新一条创作者信息
Args:
user_id:
creator_item:
Returns:
"""
async_db_conn: Union[AsyncMysqlDB, AsyncSqliteDB] = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("zhihu_creator", creator_item, "user_id", user_id)
return effect_row