from flask import Flask, render_template, request, jsonify from flask_cors import CORS import os import subprocess import json import threading import time import schedule from urllib.parse import unquote from datetime import datetime import shutil import psutil from functools import wraps # 基础配置 app = Flask(__name__) CORS(app, resources=r"/*") # 目录配置 BASE_DIR = os.getcwd() TASKS_DIR = os.path.join(BASE_DIR, "tasks") LOG_DIR = os.path.join(BASE_DIR, "logs") CONFIG_FILE = os.path.join(BASE_DIR, "config.json") PROCESSES_FILE = os.path.join(BASE_DIR, "processes.json") # 常量配置 LOG_LINES = 200 MONITOR_INTERVAL = 10 SCHEDULER_INTERVAL = 1 # 全局变量 SCRIPT_CONFIGS = {} running_processes = {} script_outputs = {} scheduled_jobs = {} scheduler_running = True monitor_running = True manual_stopped_scripts = set() # 记录手动停止的脚本 # 初始化函数 def init_dirs(): for dir_path in [TASKS_DIR, LOG_DIR]: if not os.path.exists(dir_path): os.makedirs(dir_path, exist_ok=True) app.logger.info(f"创建目录:{dir_path}") def is_script_running(script_name): """检查系统中是否已有该脚本的进程在运行""" script_path = get_script_path(script_name) if not os.path.exists(script_path): return False, None # 标准化路径(处理大小写和符号链接) normalized_path = os.path.normcase(os.path.realpath(script_path)) # 遍历所有进程检查命令行 for proc in psutil.process_iter(['pid', 'cmdline', 'name']): try: cmdline = proc.info['cmdline'] if not cmdline: continue # 检查命令行中是否包含脚本路径 cmd_str = ' '.join(cmdline).lower() if normalized_path.lower() in cmd_str: return True, proc.info['pid'] except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): continue return False, None def load_configs(): global SCRIPT_CONFIGS try: if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, "r", encoding="utf-8") as f: loaded_configs = json.load(f) # 标准化所有键的路径分隔符 normalized_configs = {} for script_name, config in loaded_configs.items(): normalized_name = normalize_script_name(script_name) normalized_configs[normalized_name] = config SCRIPT_CONFIGS = normalized_configs # 补全缺失的配置项 for script_name, config in SCRIPT_CONFIGS.items(): if "mode" not in config: config["mode"] = "long-running" if config["mode"] == "interval" and ("interval" not in config or "unit" not in config): config["interval"] = 1 config["unit"] = "hours" else: SCRIPT_CONFIGS = {} app.logger.info("配置文件不存在,初始化空配置") except Exception as e: SCRIPT_CONFIGS = {} app.logger.error(f"加载配置失败:{str(e)},使用空配置") def save_configs(): try: with open(CONFIG_FILE, "w", encoding="utf-8") as f: json.dump(SCRIPT_CONFIGS, f, indent=2, ensure_ascii=False) app.logger.info("配置保存成功") except Exception as e: app.logger.error(f"保存配置失败:{str(e)}") raise def load_processes(): """只加载进程信息数据,不恢复进程对象""" try: if os.path.exists(PROCESSES_FILE): with open(PROCESSES_FILE, "r", encoding="utf-8") as f: processes_data = json.load(f) app.logger.info(f"加载了{len(processes_data)}条进程历史记录") return processes_data except Exception as e: app.logger.error(f"加载进程信息失败:{str(e)}") return {} def save_processes(): try: processes_data = {} for name, proc in running_processes.items(): # 安全获取命令行参数 cmd = getattr(proc, 'args', []) if isinstance(cmd, str): cmd = cmd.split() processes_data[name] = { "pid": proc.pid, "script_name": name, "start_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "cmd": cmd } with open(PROCESSES_FILE, "w", encoding="utf-8") as f: json.dump(processes_data, f, indent=2, ensure_ascii=False) except Exception as e: app.logger.error(f"保存进程信息失败:{str(e)}") # 日志处理函数 def get_log_path(script_name): """获取日志文件路径,并确保父目录存在""" safe_script_name = script_name.replace('/', os.sep).replace('\\', os.sep) log_filename = f"{os.path.splitext(safe_script_name)[0]}.log" log_path = os.path.join(LOG_DIR, log_filename) log_dir = os.path.dirname(log_path) if not os.path.exists(log_dir): os.makedirs(log_dir, exist_ok=True) app.logger.info(f"创建日志目录:{log_dir}") return log_path def normalize_script_name(script_name): """标准化脚本名称,统一路径分隔符为正斜杠""" return script_name.replace("\\", "/") def load_script_log(script_name): log_path = get_log_path(script_name) logs = [] if os.path.exists(log_path): try: with open(log_path, "r", encoding="utf-8", errors="ignore") as f: lines = f.readlines()[-LOG_LINES:] logs = [line.strip() for line in lines if line.strip()] except Exception as e: app.logger.error(f"加载{script_name}日志失败:{str(e)}") logs = [f"⚠️ 加载历史日志失败:{str(e)}"] return logs def write_script_log(script_name, content): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_line = f"[{timestamp}] {content}" if script_name not in script_outputs: script_outputs[script_name] = [] script_outputs[script_name].append(log_line) script_outputs[script_name] = script_outputs[script_name][-LOG_LINES:] log_path = get_log_path(script_name) try: with open(log_path, "a", encoding="utf-8", errors="ignore") as f: f.write(log_line + "\n") except Exception as e: app.logger.error(f"写入{script_name}日志失败:{str(e)}") error_line = f"[{timestamp}] ⚠️ 日志写入失败:{str(e)}" script_outputs[script_name].append(error_line) script_outputs[script_name] = script_outputs[script_name][-LOG_LINES:] # 定时任务处理 def add_scheduled_task(script_name): global scheduled_jobs try: removed = remove_scheduled_task(script_name) app.logger.info(f"移除{script_name}旧任务:{'成功' if removed else '无旧任务'}") config = SCRIPT_CONFIGS.get(script_name) if not config or config["mode"] != "interval": app.logger.error(f"{script_name}配置无效,无法添加定时任务") return False interval = int(config.get("interval", 1)) unit = config.get("unit", "hours") app.logger.info(f"准备添加定时任务:{script_name}(每{interval}{unit})") def run_scheduled_script(): write_script_log(script_name, f"⏰ 定时任务触发(当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')})") script_path = get_script_path(script_name) write_script_log(script_name, f"📌 尝试执行脚本路径:{script_path}") if not os.path.exists(script_path): write_script_log(script_name, f"❌ 脚本路径不存在!(检查路径是否正确)") return if script_name.endswith(".bat"): cmd = ["cmd", "/c", script_path] write_script_log(script_name, f"📝 执行BAT命令:{cmd}") elif script_name.endswith(".py"): python_path = config.get("python_path", "python") cmd = [python_path, script_path] write_script_log(script_name, f"📝 执行Python命令:{cmd}(Python路径:{python_path})") else: write_script_log(script_name, "❌ 不支持的脚本类型(仅支持.bat/.py)") return try: write_script_log(script_name, "▶️ 开始执行定时任务...") result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", cwd=TASKS_DIR, timeout=60 ) write_script_log(script_name, f"📊 任务执行完成(返回码:{result.returncode})") if result.stdout: for line in result.stdout.splitlines(): write_script_log(script_name, f"📝 脚本输出:{line.strip()}") if result.returncode == 0: write_script_log(script_name, "✅ 定时任务执行成功") else: write_script_log(script_name, f"❌ 定时任务执行失败(返回码非0)") except subprocess.TimeoutExpired: write_script_log(script_name, "❌ 定时任务执行超时(超过60秒)") except Exception as e: write_script_log(script_name, f"❌ 任务执行异常:{str(e)}(检查脚本是否可手动运行)") job = None if unit == "minutes": job = schedule.every(interval).minutes.do(run_scheduled_script) elif unit == "hours": job = schedule.every(interval).hours.do(run_scheduled_script) elif unit == "days": job = schedule.every(interval).days.do(run_scheduled_script) else: write_script_log(script_name, f"❌ 不支持的时间单位:{unit}") return False job.ident = script_name scheduled_jobs[script_name] = job write_script_log(script_name, f"✅ 定时任务已添加(每{interval}{unit}执行一次)") return True except Exception as e: write_script_log(script_name, f"❌ 添加定时任务失败:{str(e)}") app.logger.error(f"{script_name}添加定时任务失败:{str(e)}") return False def remove_scheduled_task(script_name): global scheduled_jobs if script_name in scheduled_jobs: schedule.cancel_job(scheduled_jobs[script_name]) del scheduled_jobs[script_name] write_script_log(script_name, "⏹️ 定时任务已移除") app.logger.info(f"{script_name}定时任务移除成功") return True return False def run_scheduler(): global scheduler_running app.logger.info("定时任务调度线程已启动") while scheduler_running: pending_jobs = len(schedule.get_jobs()) app.logger.debug(f"调度器循环 - 待执行任务数: {pending_jobs}") schedule.run_pending() time.sleep(SCHEDULER_INTERVAL) app.logger.info("定时任务调度线程已停止") # 长期运行脚本处理 def get_script_path(script_name): """获取脚本的完整路径""" safe_script_name = script_name.replace('/', os.sep).replace('\\', os.sep) script_path = os.path.join(TASKS_DIR, safe_script_name) app.logger.debug(f"解析脚本路径:{script_name} -> {script_path}(是否存在:{os.path.exists(script_path)})") return script_path def start_long_running_script(script_name): # 检查是否已在系统中运行 is_running, pid = is_script_running(script_name) if is_running: write_script_log(script_name, f"⚠️ 检测到脚本已在运行(PID: {pid}),无需重复启动") running_processes[script_name] = psutil.Process(pid) save_processes() return True script_path = get_script_path(script_name) if not os.path.exists(script_path): write_script_log(script_name, f"❌ 脚本不存在:{script_path}") return False config = SCRIPT_CONFIGS.get(script_name, {}) if script_name.endswith(".bat"): cmd = ["cmd", "/c", "start", "/b", script_path] elif script_name.endswith(".py"): python_path = config.get("python_path", "python") cmd = [python_path, script_path] else: write_script_log(script_name, "❌ 不支持的脚本类型") return False def target(): try: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, text=True, encoding="utf-8", errors="ignore", cwd=TASKS_DIR ) running_processes[script_name] = proc save_processes() write_script_log(script_name, "✅ 长期运行脚本已启动") while proc.poll() is None: line = proc.stdout.readline() if line: write_script_log(script_name, line.strip()) time.sleep(0.1) exit_code = proc.returncode if exit_code == 0: write_script_log(script_name, f"ℹ️ 脚本正常退出(返回码:{exit_code})") else: write_script_log(script_name, f"❌ 脚本异常退出(返回码:{exit_code})") except Exception as e: write_script_log(script_name, f"❌ 脚本执行异常:{str(e)}") finally: running_processes.pop(script_name, None) save_processes() app.logger.info(f"{script_name}长期运行进程已退出") thread = threading.Thread(target=target, daemon=True) thread.start() return True @app.route("/api/scheduled-jobs") def api_scheduled_jobs(): """查看当前所有定时任务""" jobs = [] for job in schedule.get_jobs(): jobs.append({ "script_name": next((name for name, j in scheduled_jobs.items() if j == job), "未知"), "interval": str(job.interval), "next_run": job.next_run.strftime("%Y-%m-%d %H:%M:%S") if job.next_run else "立即执行" }) return jsonify(jobs), 200 def monitor_long_running_scripts(): global monitor_running app.logger.info("长期脚本监控线程已启动") while monitor_running: for script_name, config in SCRIPT_CONFIGS.items(): if config.get("mode") == "long-running" and script_name not in manual_stopped_scripts: is_system_running, _ = is_script_running(script_name) is_memory_running = False if script_name in running_processes: try: proc = psutil.Process(running_processes[script_name].pid) is_memory_running = proc.is_running() except (psutil.NoSuchProcess, psutil.AccessDenied): is_memory_running = False if not is_system_running and not is_memory_running: app.logger.info(f"{script_name}长期脚本未运行,尝试重启") write_script_log(script_name, "⚠️ 脚本未运行,尝试重启...") start_long_running_script(script_name) time.sleep(MONITOR_INTERVAL) # 单次运行脚本处理 def start_single_run_script(script_name): # 检查是否已在运行 if script_name in running_processes: proc = running_processes[script_name] if proc.poll() is None: write_script_log(script_name, "⚠️ 单次任务已在运行中,无需重复启动") return False is_running, pid = is_script_running(script_name) if is_running: write_script_log(script_name, f"⚠️ 检测到脚本已在运行(PID: {pid}),无需重复启动") return False script_path = get_script_path(script_name) if not os.path.exists(script_path): write_script_log(script_name, f"❌ 脚本不存在:{script_path}") return False config = SCRIPT_CONFIGS.get(script_name, {}) if script_name.endswith(".bat"): cmd = ["cmd", "/c", script_path] elif script_name.endswith(".py"): python_path = config.get("python_path", "python") cmd = [python_path, script_path] else: write_script_log(script_name, "❌ 不支持的脚本类型") return False def target(): try: write_script_log(script_name, "✅ 单次运行脚本已启动") proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", errors="ignore", cwd=TASKS_DIR ) running_processes[script_name] = proc save_processes() while proc.poll() is None: line = proc.stdout.readline() if line: write_script_log(script_name, line.strip()) time.sleep(0.1) exit_code = proc.returncode if exit_code == 0: write_script_log(script_name, f"✅ 单次运行脚本执行成功(返回码:{exit_code})") else: write_script_log(script_name, f"❌ 单次运行脚本执行失败(返回码:{exit_code})") except Exception as e: write_script_log(script_name, f"❌ 单次运行脚本异常:{str(e)}") finally: running_processes.pop(script_name, None) save_processes() thread = threading.Thread(target=target, daemon=True) thread.start() return True # API接口 @app.route("/") def index(): return render_template("index.html") @app.route("/api/scripts") def api_get_scripts(): try: # 扫描所有脚本 scripts = [] for root, dirs, files in os.walk(TASKS_DIR): for file in files: if file.endswith((".py", ".bat")): # 计算相对路径作为脚本名 rel_path = os.path.relpath(os.path.join(root, file), TASKS_DIR) script_name = normalize_script_name(rel_path) # 获取修改时间 modify_time = os.path.getmtime(os.path.join(root, file)) scripts.append({ "name": script_name, "modify_time": modify_time, "modify_time_str": datetime.fromtimestamp(modify_time).strftime("%Y-%m-%d %H:%M") }) # 只保留有配置的脚本 configured_scripts = [s for s in scripts if s["name"] in SCRIPT_CONFIGS] return jsonify(configured_scripts), 200 except Exception as e: app.logger.error(f"获取脚本列表失败:{str(e)}") return jsonify({"error": str(e)}), 500 @app.route("/api/scripts/status") def api_get_scripts_status(): try: result = {} # 遍历所有有配置的脚本 for script_name in SCRIPT_CONFIGS.keys(): # 获取状态 is_running, pid = is_script_running(script_name) status = "running" if is_running else "scheduled" if script_name in scheduled_jobs else "stopped" # 获取模式和调度信息 mode = SCRIPT_CONFIGS[script_name].get("mode", "single-run") schedule_info = "" if mode == "interval" and script_name in scheduled_jobs: job = scheduled_jobs[script_name] schedule_info = f"下次执行: {job.next_run.strftime('%Y-%m-%d %H:%M')}" result[script_name] = { "status": status, "running": is_running, "scheduled": script_name in scheduled_jobs, "mode": mode, "schedule_info": schedule_info } return jsonify(result), 200 except Exception as e: app.logger.error(f"批量获取脚本状态失败:{str(e)}") return jsonify({"error": str(e)}), 500 @app.route("/api/status/") def api_get_status(script_name): try: script_name = normalize_script_name(unquote(script_name)) is_running, pid = is_script_running(script_name) is_scheduled = script_name in scheduled_jobs status = "stopped" if is_running: status = "running" elif is_scheduled: status = "scheduled" # 获取输出日志 output = load_script_log(script_name) return jsonify({ "status": status, "running": is_running, "scheduled": is_scheduled, "pid": pid, "output": output }), 200 except Exception as e: app.logger.error(f"获取{script_name}状态失败:{str(e)}") return jsonify({"error": str(e)}), 500 @app.route("/api/start/") def api_start_script(script_name): try: script_name = normalize_script_name(unquote(script_name)) if script_name not in SCRIPT_CONFIGS: return jsonify({"error": "脚本未配置"}), 400 config = SCRIPT_CONFIGS[script_name] mode = config.get("mode", "single-run") # 清除手动停止标记 manual_stopped_scripts.discard(script_name) if mode == "long-running": success = start_long_running_script(script_name) msg = "启动成功" if success else "启动失败" elif mode == "interval": success = add_scheduled_task(script_name) msg = "定时任务启动成功" if success else "定时任务启动失败" else: # single-run success = start_single_run_script(script_name) msg = "启动成功" if success else "启动失败" return jsonify({"msg": msg, "success": success}), 200 except Exception as e: app.logger.error(f"启动{script_name}失败:{str(e)}") return jsonify({"error": str(e)}), 500 @app.route("/api/stop/") def api_stop_script(script_name): try: script_name = normalize_script_name(unquote(script_name)) if script_name not in SCRIPT_CONFIGS: return jsonify({"error": "脚本未配置"}), 400 # 标记为手动停止 manual_stopped_scripts.add(script_name) # 停止定时任务 remove_scheduled_task(script_name) # 停止进程 is_running, pid = is_script_running(script_name) if is_running and pid: try: proc = psutil.Process(pid) proc.terminate() time.sleep(1) if proc.is_running(): proc.kill() write_script_log(script_name, f"✅ 进程已终止(PID: {pid})") except Exception as e: app.logger.error(f"终止{script_name}进程失败:{str(e)}") return jsonify({"error": f"终止进程失败:{str(e)}"}), 500 # 清除内存记录 running_processes.pop(script_name, None) save_processes() return jsonify({"msg": "已停止"}), 200 except Exception as e: app.logger.error(f"停止{script_name}失败:{str(e)}") return jsonify({"error": str(e)}), 500 @app.route("/api/config", methods=["GET", "POST"]) def api_config(): if request.method == "POST": try: new_configs = request.get_json() if not isinstance(new_configs, dict): return jsonify({"error": "配置格式错误"}), 400 # 更新配置 for script_name, config in new_configs.items(): normalized_name = normalize_script_name(script_name) SCRIPT_CONFIGS[normalized_name] = config # 重新加载定时任务 if config.get("mode") == "interval": add_scheduled_task(normalized_name) else: remove_scheduled_task(normalized_name) save_configs() return jsonify({"msg": "配置保存成功"}), 200 except Exception as e: app.logger.error(f"保存配置失败:{str(e)}") return jsonify({"error": str(e)}), 500 else: # 获取当前配置 return jsonify(SCRIPT_CONFIGS), 200 @app.route("/api/directory") def api_get_directory(): try: path = request.args.get("path", "") full_path = os.path.join(TASKS_DIR, path.replace('/', os.sep)) if not os.path.exists(full_path) or not os.path.isdir(full_path): return jsonify({"error": "目录不存在"}), 404 directories = [] files = [] for item in os.listdir(full_path): item_path = os.path.join(full_path, item) rel_path = os.path.relpath(item_path, TASKS_DIR) normalized_path = normalize_script_name(rel_path) if os.path.isdir(item_path): directories.append({ "name": item, "path": normalized_path }) else: files.append({ "name": item, "path": normalized_path }) # 排序:目录在前,按名称排序 directories.sort(key=lambda x: x["name"].lower()) files.sort(key=lambda x: x["name"].lower()) return jsonify({ "directories": directories, "files": files }), 200 except Exception as e: app.logger.error(f"获取目录内容失败:{str(e)}") return jsonify({"error": str(e)}), 500 # 启动与初始化 def main(): init_dirs() load_configs() load_processes() # 启动调度器线程 scheduler_thread = threading.Thread(target=run_scheduler, daemon=True) scheduler_thread.start() # 启动监控线程 monitor_thread = threading.Thread(target=monitor_long_running_scripts, daemon=True) monitor_thread.start() # 启动定时任务 for script_name, config in SCRIPT_CONFIGS.items(): if config.get("mode") == "interval": add_scheduled_task(script_name) elif config.get("mode") == "long-running": # 启动长期运行脚本 start_long_running_script(script_name) app.run(host="0.0.0.0", port=5078, debug=False) if __name__ == "__main__": main()