More convenient project initialization, with database initialization added to app.py.

This commit is contained in:
戒酒的李白
2025-01-09 23:30:15 +08:00
parent a30773715e
commit e58f105761
5 changed files with 186 additions and 91 deletions
+2
View File
@@ -0,0 +1,2 @@
2025-01-09 23:29:06,246 [INFO] 尝试连接到数据库: root@localhost:3306/Weibo_PublicOpinion_AnalysisSystem
2025-01-09 23:29:06,346 [ERROR] 数据库连接失败: (1045, "Access denied for user 'root'@'localhost' (using password: YES)")
+115 -22
View File
@@ -1,10 +1,104 @@
from flask import Flask, session, request, redirect, render_template
import re
from apscheduler.schedulers.background import BackgroundScheduler
import subprocess
import os
from pytz import utc
import re
import logging
import getpass
import pymysql
import subprocess
from flask import Flask, session, request, redirect, render_template
from apscheduler.schedulers.background import BackgroundScheduler
from pytz import utc
# 初始化日志记录
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler()
]
)
def get_db_connection_interactive():
"""
通过终端交互获取数据库连接参数,若按回车则使用默认值。
返回一个连接对象。
"""
print("请依次输入数据库连接信息(直接按回车使用默认值):")
host = input(" 1. 主机 (默认: localhost): ") or "localhost"
port_str = input(" 2. 端口 (默认: 3306): ") or "3306"
try:
port = int(port_str)
except ValueError:
logging.warning("端口号无效,使用默认端口 3306。")
port = 3306
user = input(" 3. 用户名 (默认: root): ") or "root"
password = getpass.getpass(" 4. 密码 (默认: 12345678): ") or "12345678"
db_name = input(" 5. 数据库名 (默认: Weibo_PublicOpinion_AnalysisSystem): ") or "Weibo_PublicOpinion_AnalysisSystem"
logging.info(f"尝试连接到数据库: {user}@{host}:{port}/{db_name}")
try:
connection = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
database=db_name,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor # 返回字典格式
)
logging.info("数据库连接成功。")
return connection
except pymysql.MySQLError as e:
logging.error(f"数据库连接失败: {e}")
exit(1)
def initialize_database(connection, sql_file_path):
"""
执行 SQL 文件中的语句以初始化数据库。
:param connection: 已建立的数据库连接
:param sql_file_path: SQL 文件的路径
"""
try:
with open(sql_file_path, 'r', encoding='utf8') as file:
sql_commands = file.read()
with connection.cursor() as cursor:
for statement in sql_commands.split(';'):
statement = statement.strip()
if statement:
cursor.execute(statement)
connection.commit()
logging.info("数据库初始化成功。")
except FileNotFoundError:
logging.error(f"SQL 文件未找到: {sql_file_path}")
exit(1)
except pymysql.MySQLError as e:
logging.error(f"执行 SQL 时出错: {e}")
connection.rollback()
exit(1)
except Exception as e:
logging.error(f"初始化数据库时出错: {e}")
connection.rollback()
exit(1)
def prompt_first_run():
"""
询问用户是否首次运行,需要初始化数据库。
:return: BooleanTrue 表示需要初始化数据库
"""
while True:
choice = input("是否首次运行该项目,需要初始化数据库?(Y/n): ").strip().lower()
if choice in ['y', 'yes', '']:
return True
elif choice in ['n', 'no']:
return False
else:
print("请输入 Y 或 N。")
# 初始化 Flask 应用
app = Flask(__name__)
@@ -14,25 +108,13 @@ app.secret_key = 'this is secret_key you know ?' # 设置 Flask 的密钥,用
from views.page import page
from views.user import user
app.register_blueprint(page.pb) # 注册页面蓝图
app.register_blueprint(user.ub) # 注册用户蓝图
app.register_blueprint(user.ub) # 注册用户蓝图
# 首页路由,清空 session
@app.route('/')
def hello_world():
return session.clear() # 清空 session,用户退出登录
"""
@app.before_request
def before_reuqest():
pat = re.compile(r'^/static') # 正则匹配静态文件路径
if re.search(pat, request.path): # 如果是静态文件,直接返回
return
elif request.path == '/user/login' or request.path == '/user/register': # 登录或注册页面无需验证
return
elif session.get('username'): # 如果 session 中有用户名,则允许继续
return
return redirect('/user/login') # 否则重定向到登录页面
"""
session.clear() # 清空 session,用户退出登录
return "Session Cleared"
# 中间件:处理请求前的逻辑
@app.before_request
@@ -79,6 +161,19 @@ def run_script():
# 主程序入口
if __name__ == '__main__':
# 检测是否需要初始化数据库
if prompt_first_run():
# 获取数据库连接
connection = get_db_connection_interactive()
# 执行数据库初始化
sql_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'createTables.sql')
initialize_database(connection, sql_file)
# 关闭数据库连接
connection.close()
logging.info("数据库连接已关闭。")
# 设置定时任务,定期执行爬虫脚本
scheduler = BackgroundScheduler(timezone=utc) # 创建后台任务调度器
scheduler.add_job(run_script, 'interval', hours=5) # 每5小时执行一次爬虫脚本
@@ -90,8 +185,6 @@ if __name__ == '__main__':
scheduler.shutdown() # 确保在应用关闭时关闭调度器
# 设置日志记录,捕获应用的请求信息
logging.basicConfig(level=logging.INFO) # 配置日志记录,设置日志级别为 INFO
@app.before_request
def log_request_info():
# 记录每次请求的信息,便于调试和监控
View File
+36 -36
View File
@@ -4,7 +4,7 @@ from sqlalchemy import create_engine
from getpass import getpass
import logging
# 配置日志
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
@@ -14,95 +14,95 @@ logging.basicConfig(
]
)
# 假设 articleAddr 和 commentsAddr 是绝对路径或相对于脚本的路径
# 假设 articleAddr 和 commentsAddr 是绝对路径或相对于脚本的路径
from spiderDataPackage.settings import articleAddr, commentsAddr
def get_db_connection_interactive():
"""
通过终端交互获取数据库连接参数,若按回车则使用默认值。
返回 SQLAlchemy 的数据库引擎。
通过终端交互获取数据库连接参数,若按回车则使用默认值。
返回 SQLAlchemy 的数据库引擎。
"""
print("请依次输入数据库连接信息(直接按回车使用默认值):")
print("请依次输入数据库连接信息(直接按回车使用默认值):")
host = input(" 1. 主机 (默认: localhost): ") or "localhost"
port_str = input(" 2. 端口 (默认: 3306): ") or "3306"
host = input(" 1. 主机 (默认: localhost): ") or "localhost"
port_str = input(" 2. 端口 (默认: 3306): ") or "3306"
try:
port = int(port_str)
except ValueError:
logging.warning("端口号无效,使用默认端口 3306。")
logging.warning("端口号无效,使用默认端口 3306。")
port = 3306
user = input(" 3. 用户名 (默认: root): ") or "root"
password = getpass(" 4. 密码 (默认: 12345678): ") or "12345678"
db_name = input(" 5. 数据库名 (默认: Weibo_PublicOpinion_AnalysisSystem): ") or "Weibo_PublicOpinion_AnalysisSystem"
user = input(" 3. 用户名 (默认: root): ") or "root"
password = getpass(" 4. 密码 (默认: 12345678): ") or "12345678"
db_name = input(" 5. 数据库名 (默认: Weibo_PublicOpinion_AnalysisSystem): ") or "Weibo_PublicOpinion_AnalysisSystem"
# 构建数据库连接字符串
# 构建数据库连接字符串
connection_str = f"mysql+pymysql://{user}:{password}@{host}:{port}/{db_name}?charset=utf8mb4"
try:
engine = create_engine(connection_str)
# 测试连接
# 测试连接
with engine.connect() as connection:
logging.info(f"成功连接到数据库: {user}@{host}:{port}/{db_name}")
logging.info(f"成功连接到数据库: {user}@{host}:{port}/{db_name}")
return engine
except Exception as e:
logging.error(f"无法连接到数据库: {e}")
logging.error(f"无法连接到数据库: {e}")
exit(1)
def saveData(engine):
"""
从数据库和CSV文件读取数据,合并后去重并保存回数据库。
最后删除CSV文件。
从数据库和CSV文件读取数据,合并后去重并保存回数据库。
最后删除CSV文件。
"""
try:
# 读取旧数据
# 读取旧数据
oldArticle = pd.read_sql('SELECT * FROM article', engine)
oldComment = pd.read_sql('SELECT * FROM comments', engine)
logging.info("成功从数据库读取旧的文章和评论数据。")
logging.info("成功从数据库读取旧的文章和评论数据。")
# 读取新数据
# 读取新数据
newArticle = pd.read_csv(articleAddr)
newComment = pd.read_csv(commentsAddr)
logging.info("成功从CSV文件读取新的文章和评论数据。")
logging.info("成功从CSV文件读取新的文章和评论数据。")
# 合并数据
# 合并数据
mergeArticle = pd.concat([newArticle, oldArticle], ignore_index=True, sort=False)
mergeComment = pd.concat([newComment, oldComment], ignore_index=True, sort=False)
logging.info("成功合并新旧文章和评论数据。")
logging.info("成功合并新旧文章和评论数据。")
# 去重
# 去重
mergeArticle.drop_duplicates(subset='id', keep='last', inplace=True)
mergeComment.drop_duplicates(subset='content', keep='last', inplace=True)
logging.info("成功去除重复的文章和评论数据。")
logging.info("成功去除重复的文章和评论数据。")
# 保存回数据库
# 保存回数据库
mergeArticle.to_sql('article', con=engine, if_exists='replace', index=False)
mergeComment.to_sql('comments', con=engine, if_exists='replace', index=False)
logging.info("成功将合并后的数据保存回数据库。")
logging.info("成功将合并后的数据保存回数据库。")
except pd.errors.EmptyDataError as e:
logging.error(f"读取CSV文件时出错: {e}")
logging.error(f"读取CSV文件时出错: {e}")
except Exception as e:
logging.error(f"保存数据时出错: {e}")
logging.error(f"保存数据时出错: {e}")
else:
# 删除CSV文件
# 删除CSV文件
try:
os.remove(articleAddr)
os.remove(commentsAddr)
logging.info("成功删除CSV文件。")
logging.info("成功删除CSV文件。")
except Exception as e:
logging.warning(f"删除CSV文件时出错: {e}")
logging.warning(f"删除CSV文件时出错: {e}")
def main():
# 获取数据库连接
# 获取数据库连接
engine = get_db_connection_interactive()
# 保存数据
# 保存数据
saveData(engine)
# 关闭引擎(可选,因为SQLAlchemy引擎会自动管理连接池)
# 关闭引擎(可选,因为SQLAlchemy引擎会自动管理连接池)
engine.dispose()
logging.info("数据库连接已关闭。")
logging.info("数据库连接已关闭。")
if __name__ == '__main__':
main()
+33 -33
View File
@@ -2,7 +2,7 @@ import getpass
import pymysql
import logging
# 配置日志
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
@@ -14,24 +14,24 @@ logging.basicConfig(
def get_db_connection_interactive():
"""
通过终端交互获取数据库连接参数,若按回车则使用默认值。
返回一个连接对象。
通过终端交互获取数据库连接参数,若按回车则使用默认值。
返回一个连接对象。
"""
print("请依次输入数据库连接信息(直接按回车使用默认值):")
print("请依次输入数据库连接信息(直接按回车使用默认值):")
host = input(" 1. 主机 (默认: localhost): ") or "localhost"
port_str = input(" 2. 端口 (默认: 3306): ") or "3306"
host = input(" 1. 主机 (默认: localhost): ") or "localhost"
port_str = input(" 2. 端口 (默认: 3306): ") or "3306"
try:
port = int(port_str)
except ValueError:
logging.warning("端口号无效,使用默认端口 3306。")
logging.warning("端口号无效,使用默认端口 3306。")
port = 3306
user = input(" 3. 用户名 (默认: root): ") or "root"
password = getpass.getpass(" 4. 密码 (默认: 312517): ") or "312517"
db_name = input(" 5. 数据库名 (默认: Weibo_PublicOpinion_AnalysisSystem): ") or "Weibo_PublicOpinion_AnalysisSystem"
user = input(" 3. 用户名 (默认: root): ") or "root"
password = getpass.getpass(" 4. 密码 (默认: 12345678): ") or "12345678"
db_name = input(" 5. 数据库名 (默认: Weibo_PublicOpinion_AnalysisSystem): ") or "Weibo_PublicOpinion_AnalysisSystem"
logging.info(f"尝试连接到数据库: {user}@{host}:{port}/{db_name}")
logging.info(f"尝试连接到数据库: {user}@{host}:{port}/{db_name}")
try:
connection = pymysql.connect(
@@ -41,29 +41,29 @@ def get_db_connection_interactive():
password=password,
database=db_name,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor # 返回字典格式
cursorclass=pymysql.cursors.DictCursor # 返回字典格式
)
logging.info("数据库连接成功。")
logging.info("数据库连接成功。")
return connection
except pymysql.MySQLError as e:
logging.error(f"数据库连接失败: {e}")
logging.error(f"数据库连接失败: {e}")
exit(1)
# 获取数据库连接
# 获取数据库连接
conn = get_db_connection_interactive()
# 获取游标
# 获取游标
cursor = conn.cursor()
def query(sql, params=None, query_type="no_select"):
"""
执行SQL查询或操作。
执行SQL查询或操作。
:param sql: SQL语句
:param params: SQL参数(可选)
:param query_type: 查询类型,默认为 "no_select"
如果不是 "no_select",则执行 fetch 操作
:return: 如果是查询操作,返回数据列表;否则返回 None
:param sql: SQL语句
:param params: SQL参数(可选)
:param query_type: 查询类型,默认为 "no_select"
如果不是 "no_select",则执行 fetch 操作
:return: 如果是查询操作,返回数据列表;否则返回 None
"""
try:
if params:
@@ -72,43 +72,43 @@ def query(sql, params=None, query_type="no_select"):
else:
cursor.execute(sql)
# 确保连接保持活跃
# 确保连接保持活跃
conn.ping(reconnect=True)
if query_type != "no_select":
data_list = cursor.fetchall()
conn.commit()
logging.info("查询成功,已获取数据。")
logging.info("查询成功,已获取数据。")
return data_list
else:
conn.commit()
logging.info("操作成功,已提交事务。")
logging.info("操作成功,已提交事务。")
except pymysql.MySQLError as e:
logging.error(f"执行SQL时出错: {e}")
logging.error(f"执行SQL时出错: {e}")
conn.rollback()
return None
def main():
# 示例用法
# 示例用法
# 执行查询操作
# 执行查询操作
select_sql = "SELECT * FROM article LIMIT 5"
articles = query(select_sql, query_type="select")
if articles:
for article in articles:
print(article)
# 执行插入操作(根据实际表结构修改)
# 执行插入操作(根据实际表结构修改)
insert_sql = "INSERT INTO article (id, content) VALUES (%s, %s)"
new_article = (12345, "这是一条新的文章内容。")
new_article = (12345, "这是一条新的文章内容。")
result = query(insert_sql, params=new_article, query_type="no_select")
if result is None:
logging.info("插入操作完成。")
logging.info("插入操作完成。")
# 关闭游标和连接
# 关闭游标和连接
cursor.close()
conn.close()
logging.info("数据库连接已关闭。")
logging.info("数据库连接已关闭。")
if __name__ == '__main__':
main()