Files
bettafish-company/app.py
T
alchemin f1794d4da9 chore: add configurable HOST and PORT via .env file
支持通过.env配置服务器HOST和PORT
2025-11-06 15:24:24 +08:00

1038 lines
36 KiB
Python

"""
Flask主应用 - 统一管理三个Streamlit应用
"""
import os
import sys
import subprocess
import time
import threading
from datetime import datetime
from queue import Queue
from flask import Flask, render_template, request, jsonify, Response
from flask_socketio import SocketIO, emit
import atexit
import requests
from loguru import logger
import importlib
from pathlib import Path
# 导入ReportEngine
try:
from ReportEngine.flask_interface import report_bp, initialize_report_engine
REPORT_ENGINE_AVAILABLE = True
except ImportError as e:
logger.error(f"ReportEngine导入失败: {e}")
REPORT_ENGINE_AVAILABLE = False
app = Flask(__name__)
app.config['SECRET_KEY'] = 'Dedicated-to-creating-a-concise-and-versatile-public-opinion-analysis-platform'
socketio = SocketIO(app, cors_allowed_origins="*")
# 注册ReportEngine Blueprint
if REPORT_ENGINE_AVAILABLE:
app.register_blueprint(report_bp, url_prefix='/api/report')
logger.info("ReportEngine接口已注册")
else:
logger.info("ReportEngine不可用,跳过接口注册")
# 设置UTF-8编码环境
os.environ['PYTHONIOENCODING'] = 'utf-8'
os.environ['PYTHONUTF8'] = '1'
# 创建日志目录
LOG_DIR = Path('logs')
LOG_DIR.mkdir(exist_ok=True)
CONFIG_MODULE_NAME = 'config'
CONFIG_FILE_PATH = Path(__file__).resolve().parent / 'config.py'
CONFIG_KEYS = [
'HOST',
'PORT',
'DB_DIALECT',
'DB_HOST',
'DB_PORT',
'DB_USER',
'DB_PASSWORD',
'DB_NAME',
'DB_CHARSET',
'INSIGHT_ENGINE_API_KEY',
'INSIGHT_ENGINE_BASE_URL',
'INSIGHT_ENGINE_MODEL_NAME',
'MEDIA_ENGINE_API_KEY',
'MEDIA_ENGINE_BASE_URL',
'MEDIA_ENGINE_MODEL_NAME',
'QUERY_ENGINE_API_KEY',
'QUERY_ENGINE_BASE_URL',
'QUERY_ENGINE_MODEL_NAME',
'REPORT_ENGINE_API_KEY',
'REPORT_ENGINE_BASE_URL',
'REPORT_ENGINE_MODEL_NAME',
'FORUM_HOST_API_KEY',
'FORUM_HOST_BASE_URL',
'FORUM_HOST_MODEL_NAME',
'KEYWORD_OPTIMIZER_API_KEY',
'KEYWORD_OPTIMIZER_BASE_URL',
'KEYWORD_OPTIMIZER_MODEL_NAME',
'TAVILY_API_KEY',
'BOCHA_WEB_SEARCH_API_KEY'
]
def _load_config_module():
"""Load or reload the config module to ensure latest values are available."""
importlib.invalidate_caches()
module = sys.modules.get(CONFIG_MODULE_NAME)
try:
if module is None:
module = importlib.import_module(CONFIG_MODULE_NAME)
else:
module = importlib.reload(module)
except ModuleNotFoundError:
return None
return module
def read_config_values():
"""Return the current configuration values that are exposed to the frontend."""
try:
# 重新加载配置以获取最新的 Settings 实例
from config import reload_settings, settings
reload_settings()
values = {}
for key in CONFIG_KEYS:
# 从 Pydantic Settings 实例读取值
value = getattr(settings, key, None)
# Convert to string for uniform handling on the frontend.
if value is None:
values[key] = ''
else:
values[key] = str(value)
return values
except Exception as exc:
logger.exception(f"读取配置失败: {exc}")
return {}
def _serialize_config_value(value):
"""Serialize Python values back to a config.py assignment-friendly string."""
if isinstance(value, bool):
return 'True' if value else 'False'
if isinstance(value, (int, float)):
return str(value)
if value is None:
return 'None'
value_str = str(value)
escaped = value_str.replace('\\', '\\\\').replace('"', '\\"')
return f'"{escaped}"'
def write_config_values(updates):
"""Persist configuration updates to .env file (Pydantic Settings source)."""
from pathlib import Path
# 确定 .env 文件路径(与 config.py 中的逻辑一致)
project_root = Path(__file__).resolve().parent
cwd_env = Path.cwd() / ".env"
env_file_path = cwd_env if cwd_env.exists() else (project_root / ".env")
# 读取现有的 .env 文件内容
env_lines = []
env_key_indices = {} # 记录每个键在文件中的索引位置
if env_file_path.exists():
env_lines = env_file_path.read_text(encoding='utf-8').splitlines()
# 提取已存在的键及其索引
for i, line in enumerate(env_lines):
line_stripped = line.strip()
if line_stripped and not line_stripped.startswith('#'):
if '=' in line_stripped:
key = line_stripped.split('=')[0].strip()
env_key_indices[key] = i
# 更新或添加配置项
for key, raw_value in updates.items():
# 格式化值用于 .env 文件(不需要引号,除非是字符串且包含空格)
if raw_value is None or raw_value == '':
env_value = ''
elif isinstance(raw_value, (int, float)):
env_value = str(raw_value)
elif isinstance(raw_value, bool):
env_value = 'True' if raw_value else 'False'
else:
value_str = str(raw_value)
# 如果包含空格或特殊字符,需要引号
if ' ' in value_str or '\n' in value_str or '#' in value_str:
escaped = value_str.replace('\\', '\\\\').replace('"', '\\"')
env_value = f'"{escaped}"'
else:
env_value = value_str
# 更新或添加配置项
if key in env_key_indices:
# 更新现有行
env_lines[env_key_indices[key]] = f'{key}={env_value}'
else:
# 添加新行到文件末尾
env_lines.append(f'{key}={env_value}')
# 写入 .env 文件
env_file_path.parent.mkdir(parents=True, exist_ok=True)
env_file_path.write_text('\n'.join(env_lines) + '\n', encoding='utf-8')
# 重新加载配置模块(这会重新读取 .env 文件并创建新的 Settings 实例)
_load_config_module()
system_state_lock = threading.Lock()
system_state = {
'started': False,
'starting': False
}
def _set_system_state(*, started=None, starting=None):
"""Safely update the cached system state flags."""
with system_state_lock:
if started is not None:
system_state['started'] = started
if starting is not None:
system_state['starting'] = starting
def _get_system_state():
"""Return a shallow copy of the system state flags."""
with system_state_lock:
return system_state.copy()
def _prepare_system_start():
"""Mark the system as starting if it is not already running or starting."""
with system_state_lock:
if system_state['started']:
return False, '系统已启动'
if system_state['starting']:
return False, '系统正在启动'
system_state['starting'] = True
return True, None
def initialize_system_components():
"""启动所有依赖组件(Streamlit 子应用、ForumEngine、ReportEngine)。"""
logs = []
errors = []
try:
stop_forum_engine()
logs.append("已停止 ForumEngine 监控器以避免文件冲突")
except Exception as exc: # pragma: no cover - 安全捕获
message = f"停止 ForumEngine 时发生异常: {exc}"
logs.append(message)
logger.exception(message)
processes['forum']['status'] = 'stopped'
for app_name, script_path in STREAMLIT_SCRIPTS.items():
logs.append(f"检查文件: {script_path}")
if os.path.exists(script_path):
success, message = start_streamlit_app(app_name, script_path, processes[app_name]['port'])
logs.append(f"{app_name}: {message}")
if success:
startup_success, startup_message = wait_for_app_startup(app_name, 30)
logs.append(f"{app_name} 启动检查: {startup_message}")
if not startup_success:
errors.append(f"{app_name} 启动失败: {startup_message}")
else:
errors.append(f"{app_name} 启动失败: {message}")
else:
msg = f"文件不存在: {script_path}"
logs.append(f"错误: {msg}")
errors.append(f"{app_name}: {msg}")
forum_started = False
try:
start_forum_engine()
processes['forum']['status'] = 'running'
logs.append("ForumEngine 启动完成")
forum_started = True
except Exception as exc: # pragma: no cover - 保底捕获
error_msg = f"ForumEngine 启动失败: {exc}"
logs.append(error_msg)
errors.append(error_msg)
if REPORT_ENGINE_AVAILABLE:
try:
if initialize_report_engine():
logs.append("ReportEngine 初始化成功")
else:
msg = "ReportEngine 初始化失败"
logs.append(msg)
errors.append(msg)
except Exception as exc: # pragma: no cover
msg = f"ReportEngine 初始化异常: {exc}"
logs.append(msg)
errors.append(msg)
if errors:
cleanup_processes()
processes['forum']['status'] = 'stopped'
if forum_started:
try:
stop_forum_engine()
except Exception: # pragma: no cover
logger.exception("停止ForumEngine失败")
return False, logs, errors
return True, logs, []
# 初始化ForumEngine的forum.log文件
def init_forum_log():
"""初始化forum.log文件"""
try:
forum_log_file = LOG_DIR / "forum.log"
# 检查文件不存在则创建并且写一个开始,存在就清空写一个开始
if not forum_log_file.exists():
with open(forum_log_file, 'w', encoding='utf-8') as f:
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"=== ForumEngine 系统初始化 - {start_time} ===\n")
logger.info(f"ForumEngine: forum.log 已初始化")
else:
with open(forum_log_file, 'w', encoding='utf-8') as f:
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"=== ForumEngine 系统初始化 - {start_time} ===\n")
logger.info(f"ForumEngine: forum.log 已初始化")
except Exception as e:
logger.exception(f"ForumEngine: 初始化forum.log失败: {e}")
# 初始化forum.log
init_forum_log()
# 启动ForumEngine智能监控
def start_forum_engine():
"""启动ForumEngine论坛"""
try:
from ForumEngine.monitor import start_forum_monitoring
logger.info("ForumEngine: 启动论坛...")
success = start_forum_monitoring()
if not success:
logger.info("ForumEngine: 论坛启动失败")
except Exception as e:
logger.exception(f"ForumEngine: 启动论坛失败: {e}")
# 停止ForumEngine智能监控
def stop_forum_engine():
"""停止ForumEngine论坛"""
try:
from ForumEngine.monitor import stop_forum_monitoring
logger.info("ForumEngine: 停止论坛...")
stop_forum_monitoring()
logger.info("ForumEngine: 论坛已停止")
except Exception as e:
logger.exception(f"ForumEngine: 停止论坛失败: {e}")
def parse_forum_log_line(line):
"""解析forum.log行内容,提取对话信息"""
import re
# 匹配格式: [时间] [来源] 内容
pattern = r'\[(\d{2}:\d{2}:\d{2})\]\s*\[([A-Z]+)\]\s*(.*)'
match = re.match(pattern, line)
if match:
timestamp, source, content = match.groups()
# 过滤掉系统消息和空内容
if source == 'SYSTEM' or not content.strip():
return None
# 只处理三个Engine的消息
if source not in ['QUERY', 'INSIGHT', 'MEDIA']:
return None
# 根据来源确定消息类型和发送者
message_type = 'agent'
sender = f'{source} Engine'
return {
'type': message_type,
'sender': sender,
'content': content.strip(),
'timestamp': timestamp,
'source': source
}
return None
# Forum日志监听器
def monitor_forum_log():
"""监听forum.log文件变化并推送到前端"""
import time
from pathlib import Path
forum_log_file = LOG_DIR / "forum.log"
last_position = 0
processed_lines = set() # 用于跟踪已处理的行,避免重复
# 如果文件存在,获取初始位置
if forum_log_file.exists():
with open(forum_log_file, 'r', encoding='utf-8', errors='ignore') as f:
# 初始化时读取所有现有行,避免重复处理
existing_lines = f.readlines()
for line in existing_lines:
line_hash = hash(line.strip())
processed_lines.add(line_hash)
last_position = f.tell()
while True:
try:
if forum_log_file.exists():
with open(forum_log_file, 'r', encoding='utf-8', errors='ignore') as f:
f.seek(last_position)
new_lines = f.readlines()
if new_lines:
for line in new_lines:
line = line.rstrip('\n\r')
if line.strip():
line_hash = hash(line.strip())
# 避免重复处理同一行
if line_hash in processed_lines:
continue
processed_lines.add(line_hash)
# 解析日志行并发送forum消息
parsed_message = parse_forum_log_line(line)
if parsed_message:
socketio.emit('forum_message', parsed_message)
# 只有在控制台显示forum时才发送控制台消息
timestamp = datetime.now().strftime('%H:%M:%S')
formatted_line = f"[{timestamp}] {line}"
socketio.emit('console_output', {
'app': 'forum',
'line': formatted_line
})
last_position = f.tell()
# 清理processed_lines集合,避免内存泄漏(保留最近1000行的哈希)
if len(processed_lines) > 1000:
processed_lines.clear()
time.sleep(1) # 每秒检查一次
except Exception as e:
logger.error(f"Forum日志监听错误: {e}")
time.sleep(5)
# 启动Forum日志监听线程
forum_monitor_thread = threading.Thread(target=monitor_forum_log, daemon=True)
forum_monitor_thread.start()
# 全局变量存储进程信息
processes = {
'insight': {'process': None, 'port': 8501, 'status': 'stopped', 'output': [], 'log_file': None},
'media': {'process': None, 'port': 8502, 'status': 'stopped', 'output': [], 'log_file': None},
'query': {'process': None, 'port': 8503, 'status': 'stopped', 'output': [], 'log_file': None},
'forum': {'process': None, 'port': None, 'status': 'stopped', 'output': [], 'log_file': None} # 启动后标记为 running
}
STREAMLIT_SCRIPTS = {
'insight': 'SingleEngineApp/insight_engine_streamlit_app.py',
'media': 'SingleEngineApp/media_engine_streamlit_app.py',
'query': 'SingleEngineApp/query_engine_streamlit_app.py'
}
# 输出队列
output_queues = {
'insight': Queue(),
'media': Queue(),
'query': Queue(),
'forum': Queue()
}
def write_log_to_file(app_name, line):
"""将日志写入文件"""
try:
log_file_path = LOG_DIR / f"{app_name}.log"
with open(log_file_path, 'a', encoding='utf-8') as f:
f.write(line + '\n')
f.flush()
except Exception as e:
logger.error(f"Error writing log for {app_name}: {e}")
def read_log_from_file(app_name, tail_lines=None):
"""从文件读取日志"""
try:
log_file_path = LOG_DIR / f"{app_name}.log"
if not log_file_path.exists():
return []
with open(log_file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
lines = [line.rstrip('\n\r') for line in lines if line.strip()]
if tail_lines:
return lines[-tail_lines:]
return lines
except Exception as e:
logger.exception(f"Error reading log for {app_name}: {e}")
return []
def read_process_output(process, app_name):
"""读取进程输出并写入文件"""
import select
import sys
while True:
try:
if process.poll() is not None:
# 进程结束,读取剩余输出
remaining_output = process.stdout.read()
if remaining_output:
lines = remaining_output.decode('utf-8', errors='replace').split('\n')
for line in lines:
line = line.strip()
if line:
timestamp = datetime.now().strftime('%H:%M:%S')
formatted_line = f"[{timestamp}] {line}"
write_log_to_file(app_name, formatted_line)
socketio.emit('console_output', {
'app': app_name,
'line': formatted_line
})
break
# 使用非阻塞读取
if sys.platform == 'win32':
# Windows下使用不同的方法
output = process.stdout.readline()
if output:
line = output.decode('utf-8', errors='replace').strip()
if line:
timestamp = datetime.now().strftime('%H:%M:%S')
formatted_line = f"[{timestamp}] {line}"
# 写入日志文件
write_log_to_file(app_name, formatted_line)
# 发送到前端
socketio.emit('console_output', {
'app': app_name,
'line': formatted_line
})
else:
# 没有输出时短暂休眠
time.sleep(0.1)
else:
# Unix系统使用select
ready, _, _ = select.select([process.stdout], [], [], 0.1)
if ready:
output = process.stdout.readline()
if output:
line = output.decode('utf-8', errors='replace').strip()
if line:
timestamp = datetime.now().strftime('%H:%M:%S')
formatted_line = f"[{timestamp}] {line}"
# 写入日志文件
write_log_to_file(app_name, formatted_line)
# 发送到前端
socketio.emit('console_output', {
'app': app_name,
'line': formatted_line
})
except Exception as e:
logger.exception(f"Error reading output for {app_name}: {e}")
write_log_to_file(app_name, f"[{datetime.now().strftime('%H:%M:%S')}] {error_msg}")
break
def start_streamlit_app(app_name, script_path, port):
"""启动Streamlit应用"""
try:
if processes[app_name]['process'] is not None:
return False, "应用已经在运行"
# 检查文件是否存在
if not os.path.exists(script_path):
return False, f"文件不存在: {script_path}"
# 清空之前的日志文件
log_file_path = LOG_DIR / f"{app_name}.log"
if log_file_path.exists():
log_file_path.unlink()
# 创建启动日志
start_msg = f"[{datetime.now().strftime('%H:%M:%S')}] 启动 {app_name} 应用..."
write_log_to_file(app_name, start_msg)
cmd = [
sys.executable, '-m', 'streamlit', 'run',
script_path,
'--server.port', str(port),
'--server.headless', 'true',
'--browser.gatherUsageStats', 'false',
# '--logger.level', 'debug', # 增加日志详细程度
'--logger.level', 'info',
'--server.enableCORS', 'false'
]
# 设置环境变量确保UTF-8编码和减少缓冲
env = os.environ.copy()
env.update({
'PYTHONIOENCODING': 'utf-8',
'PYTHONUTF8': '1',
'LANG': 'en_US.UTF-8',
'LC_ALL': 'en_US.UTF-8',
'PYTHONUNBUFFERED': '1', # 禁用Python缓冲
'STREAMLIT_BROWSER_GATHER_USAGE_STATS': 'false'
})
# 使用当前工作目录而不是脚本目录
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=0, # 无缓冲
universal_newlines=False,
cwd=os.getcwd(),
env=env,
encoding=None, # 让我们手动处理编码
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0
)
processes[app_name]['process'] = process
processes[app_name]['status'] = 'starting'
processes[app_name]['output'] = []
# 启动输出读取线程
output_thread = threading.Thread(
target=read_process_output,
args=(process, app_name),
daemon=True
)
output_thread.start()
return True, f"{app_name} 应用启动中..."
except Exception as e:
error_msg = f"启动失败: {str(e)}"
write_log_to_file(app_name, f"[{datetime.now().strftime('%H:%M:%S')}] {error_msg}")
return False, error_msg
def stop_streamlit_app(app_name):
"""停止Streamlit应用"""
try:
if processes[app_name]['process'] is None:
return False, "应用未运行"
process = processes[app_name]['process']
process.terminate()
# 等待进程结束
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
processes[app_name]['process'] = None
processes[app_name]['status'] = 'stopped'
return True, f"{app_name} 应用已停止"
except Exception as e:
return False, f"停止失败: {str(e)}"
def check_app_status():
"""检查应用状态"""
for app_name, info in processes.items():
if info['process'] is not None:
if info['process'].poll() is None:
# 进程仍在运行,检查端口是否可访问
try:
response = requests.get(f"http://localhost:{info['port']}", timeout=2)
if response.status_code == 200:
info['status'] = 'running'
else:
info['status'] = 'starting'
except requests.exceptions.RequestException:
info['status'] = 'starting'
except Exception:
info['status'] = 'starting'
else:
# 进程已结束
info['process'] = None
info['status'] = 'stopped'
def wait_for_app_startup(app_name, max_wait_time=30):
"""等待应用启动完成"""
import time
start_time = time.time()
while time.time() - start_time < max_wait_time:
info = processes[app_name]
if info['process'] is None:
return False, "进程已停止"
if info['process'].poll() is not None:
return False, "进程启动失败"
try:
response = requests.get(f"http://localhost:{info['port']}", timeout=2)
if response.status_code == 200:
info['status'] = 'running'
return True, "启动成功"
except:
pass
time.sleep(1)
return False, "启动超时"
def cleanup_processes():
"""清理所有进程"""
for app_name in STREAMLIT_SCRIPTS:
stop_streamlit_app(app_name)
processes['forum']['status'] = 'stopped'
try:
stop_forum_engine()
except Exception: # pragma: no cover
logger.exception("停止ForumEngine失败")
_set_system_state(started=False, starting=False)
# 注册清理函数
atexit.register(cleanup_processes)
@app.route('/')
def index():
"""主页"""
return render_template('index.html')
@app.route('/api/status')
def get_status():
"""获取所有应用状态"""
check_app_status()
return jsonify({
app_name: {
'status': info['status'],
'port': info['port'],
'output_lines': len(info['output'])
}
for app_name, info in processes.items()
})
@app.route('/api/start/<app_name>')
def start_app(app_name):
"""启动指定应用"""
if app_name not in processes:
return jsonify({'success': False, 'message': '未知应用'})
if app_name == 'forum':
try:
start_forum_engine()
processes['forum']['status'] = 'running'
return jsonify({'success': True, 'message': 'ForumEngine已启动'})
except Exception as exc: # pragma: no cover
logger.exception("手动启动ForumEngine失败")
return jsonify({'success': False, 'message': f'ForumEngine启动失败: {exc}'})
script_path = STREAMLIT_SCRIPTS.get(app_name)
if not script_path:
return jsonify({'success': False, 'message': '该应用不支持启动操作'})
success, message = start_streamlit_app(
app_name,
script_path,
processes[app_name]['port']
)
if success:
# 等待应用启动
startup_success, startup_message = wait_for_app_startup(app_name, 15)
if not startup_success:
message += f" 但启动检查失败: {startup_message}"
return jsonify({'success': success, 'message': message})
@app.route('/api/stop/<app_name>')
def stop_app(app_name):
"""停止指定应用"""
if app_name not in processes:
return jsonify({'success': False, 'message': '未知应用'})
if app_name == 'forum':
try:
stop_forum_engine()
processes['forum']['status'] = 'stopped'
return jsonify({'success': True, 'message': 'ForumEngine已停止'})
except Exception as exc: # pragma: no cover
logger.exception("手动停止ForumEngine失败")
return jsonify({'success': False, 'message': f'ForumEngine停止失败: {exc}'})
success, message = stop_streamlit_app(app_name)
return jsonify({'success': success, 'message': message})
@app.route('/api/output/<app_name>')
def get_output(app_name):
"""获取应用输出"""
if app_name not in processes:
return jsonify({'success': False, 'message': '未知应用'})
# 特殊处理Forum Engine
if app_name == 'forum':
try:
forum_log_content = read_log_from_file('forum')
return jsonify({
'success': True,
'output': forum_log_content,
'total_lines': len(forum_log_content)
})
except Exception as e:
return jsonify({'success': False, 'message': f'读取forum日志失败: {str(e)}'})
# 从文件读取完整日志
output_lines = read_log_from_file(app_name)
return jsonify({
'success': True,
'output': output_lines
})
@app.route('/api/test_log/<app_name>')
def test_log(app_name):
"""测试日志写入功能"""
if app_name not in processes:
return jsonify({'success': False, 'message': '未知应用'})
# 写入测试消息
test_msg = f"[{datetime.now().strftime('%H:%M:%S')}] 测试日志消息 - {datetime.now()}"
write_log_to_file(app_name, test_msg)
# 通过Socket.IO发送
socketio.emit('console_output', {
'app': app_name,
'line': test_msg
})
return jsonify({
'success': True,
'message': f'测试消息已写入 {app_name} 日志'
})
@app.route('/api/forum/start')
def start_forum_monitoring_api():
"""手动启动ForumEngine论坛"""
try:
from ForumEngine.monitor import start_forum_monitoring
success = start_forum_monitoring()
if success:
return jsonify({'success': True, 'message': 'ForumEngine论坛已启动'})
else:
return jsonify({'success': False, 'message': 'ForumEngine论坛启动失败'})
except Exception as e:
return jsonify({'success': False, 'message': f'启动论坛失败: {str(e)}'})
@app.route('/api/forum/stop')
def stop_forum_monitoring_api():
"""手动停止ForumEngine论坛"""
try:
from ForumEngine.monitor import stop_forum_monitoring
stop_forum_monitoring()
return jsonify({'success': True, 'message': 'ForumEngine论坛已停止'})
except Exception as e:
return jsonify({'success': False, 'message': f'停止论坛失败: {str(e)}'})
@app.route('/api/forum/log')
def get_forum_log():
"""获取ForumEngine的forum.log内容"""
try:
forum_log_file = LOG_DIR / "forum.log"
if not forum_log_file.exists():
return jsonify({
'success': True,
'log_lines': [],
'parsed_messages': [],
'total_lines': 0
})
with open(forum_log_file, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
lines = [line.rstrip('\n\r') for line in lines if line.strip()]
# 解析每一行日志并提取对话信息
parsed_messages = []
for line in lines:
parsed_message = parse_forum_log_line(line)
if parsed_message:
parsed_messages.append(parsed_message)
return jsonify({
'success': True,
'log_lines': lines,
'parsed_messages': parsed_messages,
'total_lines': len(lines)
})
except Exception as e:
return jsonify({'success': False, 'message': f'读取forum.log失败: {str(e)}'})
@app.route('/api/search', methods=['POST'])
def search():
"""统一搜索接口"""
data = request.get_json()
query = data.get('query', '').strip()
if not query:
return jsonify({'success': False, 'message': '搜索查询不能为空'})
# ForumEngine论坛已经在后台运行,会自动检测搜索活动
# logger.info("ForumEngine: 搜索请求已收到,论坛将自动检测日志变化")
# 检查哪些应用正在运行
check_app_status()
running_apps = [name for name, info in processes.items() if info['status'] == 'running']
if not running_apps:
return jsonify({'success': False, 'message': '没有运行中的应用'})
# 向运行中的应用发送搜索请求
results = {}
api_ports = {'insight': 8601, 'media': 8602, 'query': 8603}
for app_name in running_apps:
try:
api_port = api_ports[app_name]
# 调用Streamlit应用的API端点
response = requests.post(
f"http://localhost:{api_port}/api/search",
json={'query': query},
timeout=10
)
if response.status_code == 200:
results[app_name] = response.json()
else:
results[app_name] = {'success': False, 'message': 'API调用失败'}
except Exception as e:
results[app_name] = {'success': False, 'message': str(e)}
# 搜索完成后可以选择停止监控,或者让它继续运行以捕获后续的处理日志
# 这里我们让监控继续运行,用户可以通过其他接口手动停止
return jsonify({
'success': True,
'query': query,
'results': results
})
@app.route('/api/config', methods=['GET'])
def get_config():
"""Expose selected configuration values to the frontend."""
try:
config_values = read_config_values()
return jsonify({'success': True, 'config': config_values})
except Exception as exc:
logger.exception("读取配置失败")
return jsonify({'success': False, 'message': f'读取配置失败: {exc}'}), 500
@app.route('/api/config', methods=['POST'])
def update_config():
"""Update configuration values and persist them to config.py."""
payload = request.get_json(silent=True) or {}
if not isinstance(payload, dict) or not payload:
return jsonify({'success': False, 'message': '请求体不能为空'}), 400
updates = {}
for key, value in payload.items():
if key in CONFIG_KEYS:
updates[key] = value if value is not None else ''
if not updates:
return jsonify({'success': False, 'message': '没有可更新的配置项'}), 400
try:
write_config_values(updates)
updated_config = read_config_values()
return jsonify({'success': True, 'config': updated_config})
except Exception as exc:
logger.exception("更新配置失败")
return jsonify({'success': False, 'message': f'更新配置失败: {exc}'}), 500
@app.route('/api/system/status')
def get_system_status():
"""返回系统启动状态。"""
state = _get_system_state()
return jsonify({
'success': True,
'started': state['started'],
'starting': state['starting']
})
@app.route('/api/system/start', methods=['POST'])
def start_system():
"""在接收到请求后启动完整系统。"""
allowed, message = _prepare_system_start()
if not allowed:
return jsonify({'success': False, 'message': message}), 400
try:
success, logs, errors = initialize_system_components()
if success:
_set_system_state(started=True)
return jsonify({'success': True, 'message': '系统启动成功', 'logs': logs})
_set_system_state(started=False)
return jsonify({
'success': False,
'message': '系统启动失败',
'logs': logs,
'errors': errors
}), 500
except Exception as exc: # pragma: no cover - 保底捕获
logger.exception("系统启动过程中出现异常")
_set_system_state(started=False)
return jsonify({'success': False, 'message': f'系统启动异常: {exc}'}), 500
finally:
_set_system_state(starting=False)
@socketio.on('connect')
def handle_connect():
"""客户端连接"""
emit('status', 'Connected to Flask server')
@socketio.on('request_status')
def handle_status_request():
"""请求状态更新"""
check_app_status()
emit('status_update', {
app_name: {
'status': info['status'],
'port': info['port']
}
for app_name, info in processes.items()
})
if __name__ == '__main__':
# 从配置文件读取 HOST 和 PORT
from config import settings
HOST = settings.HOST
PORT = settings.PORT
logger.info("等待配置确认,系统将在前端指令后启动组件...")
logger.info(f"Flask服务器已启动,访问地址: http://{HOST}:{PORT}")
try:
socketio.run(app, host=HOST, port=PORT, debug=False)
except KeyboardInterrupt:
logger.info("\n正在关闭应用...")
cleanup_processes()