from flask import Flask, render_template, request, jsonify import os import subprocess import json import threading import time from urllib.parse import unquote app = Flask(__name__) # 配置 TASKS_DIR = os.path.join(os.getcwd(), 'tasks') CONFIG_FILE = 'config.json' LOG_LINES = 200 # 全局变量 SCRIPT_CONFIGS = {} running_processes = {} script_outputs = {} os.makedirs(TASKS_DIR, exist_ok=True) def load_configs(): global SCRIPT_CONFIGS try: if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r', encoding='utf-8') as f: SCRIPT_CONFIGS = json.load(f) if not isinstance(SCRIPT_CONFIGS, dict): SCRIPT_CONFIGS = {} except: SCRIPT_CONFIGS = {} def save_configs(): with open(CONFIG_FILE, 'w', encoding='utf-8') as f: json.dump(SCRIPT_CONFIGS, f, indent=2, ensure_ascii=False) def get_script_path(script_name): return os.path.join(TASKS_DIR, script_name) @app.route('/') def index(): return render_template('index.html') @app.route('/api/scripts') def api_scripts(): if not os.path.exists(TASKS_DIR): return jsonify([]) files = [] for f in os.listdir(TASKS_DIR): path = os.path.join(TASKS_DIR, f) if os.path.isfile(path) and f.endswith(('.bat', '.py')): files.append(f) return jsonify(sorted(files)) @app.route('/api/config') def api_get_config(): return jsonify(SCRIPT_CONFIGS) @app.route('/api/config', methods=['POST']) def api_save_config(): try: data = request.get_json() if not isinstance(data, dict): return jsonify({"error": "Invalid data format"}), 400 for script_name, config in data.items(): SCRIPT_CONFIGS[script_name] = config save_configs() return jsonify({"msg": "配置已保存"}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/start/') def api_start_script(script_name): script_name = unquote(script_name) if script_name not in SCRIPT_CONFIGS: return jsonify({"error": "未找到配置"}), 404 if script_name in running_processes: return jsonify({"error": "已在运行"}), 400 script_path = get_script_path(script_name) if not os.path.exists(script_path): return jsonify({"error": f"脚本不存在: {script_path}"}), 404 if script_name.endswith('.bat'): cmd = ['cmd', '/c', script_path] elif script_name.endswith('.py'): cmd = ['python', script_path] else: return jsonify({"error": "不支持的类型"}), 400 def target(): try: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, text=True, encoding='utf-8', cwd=TASKS_DIR ) running_processes[script_name] = proc script_outputs[script_name] = [] while True: line = proc.stdout.readline() if line: line = f"[{time.strftime('%H:%M:%S')}] {line.strip()}" script_outputs[script_name].append(line) script_outputs[script_name] = script_outputs[script_name][-LOG_LINES:] if proc.poll() is not None: break except Exception as e: script_outputs.setdefault(script_name, []).append(f"❌ 执行错误: {e}") finally: running_processes.pop(script_name, None) thread = threading.Thread(target=target, daemon=True) thread.start() return jsonify({"msg": f"✅ 开始执行: {script_name}"}) @app.route('/api/stop/') def api_stop_script(script_name): script_name = unquote(script_name) if script_name not in running_processes: return jsonify({"msg": "未在运行"}) proc = running_processes[script_name] proc.terminate() try: proc.wait(timeout=5) except: proc.kill() running_processes.pop(script_name, None) script_outputs.setdefault(script_name, []).append("⏹️ 已停止") return jsonify({"msg": f"⏹️ 已停止: {script_name}"}) @app.route('/api/status/') def api_status(script_name): script_name = unquote(script_name) output = script_outputs.get(script_name, []) is_running = script_name in running_processes return jsonify({ "running": is_running, "output": output }) @app.route('/api/send-input/', methods=['POST']) def api_send_input(script_name): script_name = unquote(script_name) if script_name not in running_processes: return jsonify({"error": "脚本未运行"}), 400 proc = running_processes[script_name] try: proc.stdin.write(request.data + "\n") proc.stdin.flush() return jsonify({"msg": "输入已发送"}) except Exception as e: return jsonify({"error": str(e)}), 500 if __name__ == '__main__': load_configs() app.run(host='0.0.0.0', port=5000, debug=False)