Files
bettafish-company/app.py
T
2025-04-02 20:32:57 +08:00

194 lines
6.3 KiB
Python

import os
import getpass
import pymysql
import subprocess
from flask import Flask, session, request, redirect
from apscheduler.schedulers.background import BackgroundScheduler
try:
from zoneinfo import ZoneInfo # Python 3.9+
except ImportError:
from backports.zoneinfo import ZoneInfo # Python < 3.9
from datetime import datetime, timedelta
import secrets
from dotenv import load_dotenv
from utils.logger import app_logger as logging
from utils.db_pool import DatabasePool
from utils.error_handlers import register_error_handlers
from middleware.security import set_secure_headers, log_request_info, require_https
# 加载环境变量
load_dotenv()
def get_db_connection_interactive():
"""
通过终端交互获取数据库连接参数,若按回车则使用默认值。
返回一个连接对象。
"""
print("请依次输入数据库连接信息(直接按回车使用默认值):")
host = input(" 1. 主机 (默认: localhost): ") or os.getenv('DB_HOST', 'localhost')
port_str = input(" 2. 端口 (默认: 3306): ") or os.getenv('DB_PORT', '3306')
try:
port = int(port_str)
except ValueError:
logging.warning("端口号无效,使用默认端口 3306。")
port = 3306
user = input(" 3. 用户名 (默认: root): ") or os.getenv('DB_USER', 'root')
password = getpass.getpass(" 4. 密码: ") or os.getenv('DB_PASSWORD', '')
db_name = input(" 5. 数据库名 (默认: Weibo_PublicOpinion_AnalysisSystem): ") or os.getenv('DB_NAME', 'Weibo_PublicOpinion_AnalysisSystem')
logging.info(f"尝试连接到数据库: {user}@{host}:{port}/{db_name}")
try:
connection = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
database=db_name,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
ssl={'ssl': {'ca': os.getenv('DB_SSL_CA')}} if os.getenv('DB_SSL_CA') else None
)
logging.info("数据库连接成功。")
return connection
except pymysql.MySQLError as e:
logging.error(f"数据库连接失败: {e}")
raise
# 初始化 Flask 应用
app = Flask(__name__)
app.secret_key = os.getenv('FLASK_SECRET_KEY', secrets.token_hex(32))
app.config['SESSION_COOKIE_SECURE'] = True
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=2)
# 导入蓝图
from views.page import page
from views.user import user
from views.spider_control import spider_bp
from views.workflow_api import workflow_bp, workflow_api_bp
app.register_blueprint(page.pb)
app.register_blueprint(user.ub)
app.register_blueprint(spider_bp)
app.register_blueprint(workflow_bp)
app.register_blueprint(workflow_api_bp)
# 注册错误处理器
register_error_handlers(app)
# 首页路由
@app.route('/')
@require_https()
def hello_world():
session.clear()
return redirect('/user/login')
# 请求前中间件
@app.before_request
def before_request():
# 记录请求信息
log_request_info()
# 如果请求的是静态文件路径,允许访问
if request.path.startswith('/static'):
return
# 如果请求的是登录或注册页面,不需要会话验证
if request.path in ['/user/login', '/user/register']:
return
# 验证会话
if not session.get('username'):
return redirect('/user/login')
# 验证会话完整性
if 'client_info' not in session:
session.clear()
return redirect('/user/login')
# 验证客户端信息
current_client = {
'ip': request.remote_addr,
'user_agent': str(request.user_agent)
}
stored_client = session.get('client_info', {})
if (current_client['ip'] != stored_client.get('ip') or
current_client['user_agent'] != stored_client.get('user_agent')):
session.clear()
return redirect('/user/login')
# 响应后中间件
@app.after_request
def after_request(response):
return set_secure_headers(response)
# 数据库配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', 'localhost'),
'user': os.getenv('DB_USER', 'root'),
'password': os.getenv('DB_PASSWORD', ''),
'database': os.getenv('DB_NAME', 'Weibo_PublicOpinion_AnalysisSystem'),
'port': int(os.getenv('DB_PORT', '3306')),
'charset': 'utf8mb4',
'ssl': {'ca': os.getenv('DB_SSL_CA')} if os.getenv('DB_SSL_CA') else None
}
if __name__ == '__main__':
# 检测是否需要初始化数据库
try:
if os.getenv('INITIALIZE_DB', 'false').lower() == 'true':
connection = get_db_connection_interactive()
sql_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'createTables.sql')
initialize_database(connection, sql_file)
connection.close()
logging.info("数据库初始化完成。")
except Exception as e:
logging.error(f"数据库初始化失败: {e}")
exit(1)
# 初始化数据库连接池
try:
DatabasePool.initialize(DB_CONFIG)
except Exception as e:
logging.error(f"数据库连接池初始化失败: {e}")
exit(1)
# 设置定时任务
try:
scheduler = BackgroundScheduler(timezone=ZoneInfo("UTC"))
scheduler.start()
if check_database_empty():
logging.info("数据库为空。立即开始初始爬取。")
dynamic_crawl()
else:
logging.info("数据库已有数据。安排首次爬取。")
base_interval = int(os.getenv('CRAWL_INTERVAL', '18000')) # 默认5小时
scheduler.add_job(
dynamic_crawl,
'date',
run_date=datetime.now() + timedelta(seconds=base_interval),
id='dynamic_crawl'
)
# 启动应用
app.run(
host=os.getenv('FLASK_HOST', '127.0.0.1'),
port=int(os.getenv('FLASK_PORT', '5000')),
ssl_context='adhoc' if os.getenv('ENABLE_HTTPS', 'false').lower() == 'true' else None
)
except Exception as e:
logging.error(f"应用启动失败: {e}")
if 'scheduler' in locals():
scheduler.shutdown()
DatabasePool.close()
exit(1)
finally:
if 'scheduler' in locals():
scheduler.shutdown()
DatabasePool.close()