diff --git a/app.py b/app.py index 4ca7add..3481ac4 100644 --- a/app.py +++ b/app.py @@ -1,63 +1,95 @@ from flask import Flask, render_template, request, jsonify -from flask_cors import CORS # 处理跨域 +from flask_cors import CORS import os import subprocess import json import threading import time -import schedule # 定时任务库 +import schedule from urllib.parse import unquote from datetime import datetime +import shutil +import psutil +from functools import wraps -# ====================== 基础配置 ====================== +# 基础配置 app = Flask(__name__) -# 允许所有跨域请求(外网访问必备) CORS(app, resources=r"/*") -# 目录配置(自动创建不存在的目录) +# 目录配置 BASE_DIR = os.getcwd() -TASKS_DIR = os.path.join(BASE_DIR, "tasks") # 脚本存储目录 -LOG_DIR = os.path.join(BASE_DIR, "logs") # 日志存储目录 -CONFIG_FILE = os.path.join(BASE_DIR, "config.json") # 配置文件 -PROCESSES_FILE = os.path.join(BASE_DIR, "processes.json") # 进程持久化文件 +TASKS_DIR = os.path.join(BASE_DIR, "tasks") +LOG_DIR = os.path.join(BASE_DIR, "logs") +CONFIG_FILE = os.path.join(BASE_DIR, "config.json") +PROCESSES_FILE = os.path.join(BASE_DIR, "processes.json") # 常量配置 -LOG_LINES = 200 # 内存中保留的日志行数(防止内存溢出) -MONITOR_INTERVAL = 10 # 长期脚本监控间隔(秒) -SCHEDULER_INTERVAL = 1 # 定时任务调度间隔(秒) +LOG_LINES = 200 +MONITOR_INTERVAL = 10 +SCHEDULER_INTERVAL = 1 -# ====================== 全局变量 ====================== -SCRIPT_CONFIGS = {} # 脚本配置:{脚本名: {mode, interval, unit...}} -running_processes = {} # 长期运行的进程:{脚本名: subprocess.Popen对象} -script_outputs = {} # 内存日志缓存:{脚本名: [日志行1, 日志行2...]} -scheduled_jobs = {} # 定时任务:{脚本名: schedule.Job对象} -scheduler_running = True # 定时调度器运行标志 -monitor_running = True # 长期脚本监控运行标志 +# 全局变量 +SCRIPT_CONFIGS = {} +running_processes = {} +script_outputs = {} +scheduled_jobs = {} +scheduler_running = True +monitor_running = True +manual_stopped_scripts = set() # 记录手动停止的脚本 -# ====================== 初始化函数 ====================== +# 初始化函数 def init_dirs(): - """初始化必要目录(tasks/logs)""" for dir_path in [TASKS_DIR, LOG_DIR]: if not os.path.exists(dir_path): os.makedirs(dir_path, exist_ok=True) app.logger.info(f"创建目录:{dir_path}") +def is_script_running(script_name): + """检查系统中是否已有该脚本的进程在运行""" + script_path = get_script_path(script_name) + if not os.path.exists(script_path): + return False, None + + # 标准化路径(处理大小写和符号链接) + normalized_path = os.path.normcase(os.path.realpath(script_path)) + + # 遍历所有进程检查命令行 + for proc in psutil.process_iter(['pid', 'cmdline', 'name']): + try: + cmdline = proc.info['cmdline'] + if not cmdline: + continue + + # 检查命令行中是否包含脚本路径 + cmd_str = ' '.join(cmdline).lower() + if normalized_path.lower() in cmd_str: + return True, proc.info['pid'] + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + continue + return False, None + + def load_configs(): - """加载脚本配置(从config.json)""" global SCRIPT_CONFIGS try: if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, "r", encoding="utf-8") as f: - SCRIPT_CONFIGS = json.load(f) - # 校验配置格式,确保每个脚本有基础配置 + loaded_configs = json.load(f) + # 标准化所有键的路径分隔符 + normalized_configs = {} + for script_name, config in loaded_configs.items(): + normalized_name = normalize_script_name(script_name) + normalized_configs[normalized_name] = config + SCRIPT_CONFIGS = normalized_configs + # 补全缺失的配置项 for script_name, config in SCRIPT_CONFIGS.items(): if "mode" not in config: - config["mode"] = "long-running" # 默认长期运行 + config["mode"] = "long-running" if config["mode"] == "interval" and ("interval" not in config or "unit" not in config): config["interval"] = 1 - config["unit"] = "hours" # 默认每小时执行 + config["unit"] = "hours" else: SCRIPT_CONFIGS = {} app.logger.info("配置文件不存在,初始化空配置") @@ -67,111 +99,75 @@ def load_configs(): def save_configs(): - """保存脚本配置到config.json""" try: with open(CONFIG_FILE, "w", encoding="utf-8") as f: json.dump(SCRIPT_CONFIGS, f, indent=2, ensure_ascii=False) app.logger.info("配置保存成功") except Exception as e: app.logger.error(f"保存配置失败:{str(e)}") - raise # 抛出异常让接口返回错误 + raise + + +def load_processes(): + """只加载进程信息数据,不恢复进程对象""" + try: + if os.path.exists(PROCESSES_FILE): + with open(PROCESSES_FILE, "r", encoding="utf-8") as f: + processes_data = json.load(f) + app.logger.info(f"加载了{len(processes_data)}条进程历史记录") + return processes_data + except Exception as e: + app.logger.error(f"加载进程信息失败:{str(e)}") + return {} def save_processes(): - """保存当前运行的长期任务进程信息到文件""" - processes_data = {} - for script_name, proc in running_processes.items(): - # 仅保存长期运行模式的进程 - if SCRIPT_CONFIGS.get(script_name, {}).get("mode") == "long-running": - processes_data[script_name] = { - "pid": proc.pid, - "script_name": script_name, - "start_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "cmd": proc.args # 记录启动命令(用于校验身份) - } try: + processes_data = {} + for name, proc in running_processes.items(): + # 安全获取命令行参数 + cmd = getattr(proc, 'args', []) + if isinstance(cmd, str): + cmd = cmd.split() + + processes_data[name] = { + "pid": proc.pid, + "script_name": name, + "start_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "cmd": cmd + } with open(PROCESSES_FILE, "w", encoding="utf-8") as f: json.dump(processes_data, f, indent=2, ensure_ascii=False) - app.logger.info("进程信息已持久化") except Exception as e: app.logger.error(f"保存进程信息失败:{str(e)}") -def is_process_alive(pid, expected_cmd): - """验证进程是否存活且命令行匹配""" - try: - # Windows系统通过tasklist获取进程信息 - if os.name == "nt": - # 执行tasklist命令获取进程详情 - result = subprocess.run( - ["tasklist", "/fi", f"PID eq {pid}", "/fo", "csv", "/v"], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - encoding="utf-8" - ) - output = result.stdout - # 检查PID是否存在 - if f"{pid}" not in output: - return False - # 检查命令行是否包含目标脚本路径 - script_path = get_script_path(expected_cmd[-1]) # 提取脚本路径 - return script_path in output - else: - # Linux/macOS系统 - result = subprocess.run( - ["ps", "-p", str(pid), "-o", "cmd="], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True - ) - return result.returncode == 0 and expected_cmd[-1] in result.stdout - except Exception as e: - app.logger.error(f"验证进程失败:{str(e)}") - return False - - -def load_processes(): - """程序启动时加载并验证持久化的进程信息""" - global running_processes - if not os.path.exists(PROCESSES_FILE): - return - - try: - with open(PROCESSES_FILE, "r", encoding="utf-8") as f: - processes_data = json.load(f) - - for script_name, data in processes_data.items(): - pid = data["pid"] - cmd = data["cmd"] - # 验证进程是否存活且匹配脚本 - if is_process_alive(pid, cmd): - # 构造伪Popen对象(仅保留必要属性用于管理) - proc = subprocess.Popen() - proc.pid = pid - proc.args = cmd - running_processes[script_name] = proc - app.logger.info(f"成功接管进程:{script_name}(PID: {pid})") - else: - app.logger.warning(f"进程已结束或不匹配:{script_name}(PID: {pid})") - except Exception as e: - app.logger.error(f"加载进程信息失败:{str(e)}") - - -# ====================== 日志处理函数 ====================== +# 日志处理函数 def get_log_path(script_name): - """获取脚本对应的日志文件路径""" - return os.path.join(LOG_DIR, f"{os.path.splitext(script_name)[0]}.log") + """获取日志文件路径,并确保父目录存在""" + safe_script_name = script_name.replace('/', os.sep).replace('\\', os.sep) + log_filename = f"{os.path.splitext(safe_script_name)[0]}.log" + log_path = os.path.join(LOG_DIR, log_filename) + + log_dir = os.path.dirname(log_path) + if not os.path.exists(log_dir): + os.makedirs(log_dir, exist_ok=True) + app.logger.info(f"创建日志目录:{log_dir}") + + return log_path + + +def normalize_script_name(script_name): + """标准化脚本名称,统一路径分隔符为正斜杠""" + return script_name.replace("\\", "/") def load_script_log(script_name): - """加载脚本的历史日志(从日志文件)""" log_path = get_log_path(script_name) logs = [] if os.path.exists(log_path): try: with open(log_path, "r", encoding="utf-8", errors="ignore") as f: - # 读取最后LOG_LINES行,避免大文件加载缓慢 lines = f.readlines()[-LOG_LINES:] logs = [line.strip() for line in lines if line.strip()] except Exception as e: @@ -181,36 +177,32 @@ def load_script_log(script_name): def write_script_log(script_name, content): - """写入日志到文件和内存缓存""" - # 格式化日志(带时间戳) timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_line = f"[{timestamp}] {content}" - # 1. 写入内存缓存(保留最后LOG_LINES行) if script_name not in script_outputs: script_outputs[script_name] = [] script_outputs[script_name].append(log_line) script_outputs[script_name] = script_outputs[script_name][-LOG_LINES:] - # 2. 写入日志文件 log_path = get_log_path(script_name) try: with open(log_path, "a", encoding="utf-8", errors="ignore") as f: f.write(log_line + "\n") except Exception as e: app.logger.error(f"写入{script_name}日志失败:{str(e)}") - # 同时记录日志写入错误到内存 error_line = f"[{timestamp}] ⚠️ 日志写入失败:{str(e)}" script_outputs[script_name].append(error_line) script_outputs[script_name] = script_outputs[script_name][-LOG_LINES:] -# ====================== 定时任务处理 ====================== +# 定时任务处理 def add_scheduled_task(script_name): - """添加定时任务(基于配置的interval和unit)""" global scheduled_jobs try: - # 获取脚本配置 + removed = remove_scheduled_task(script_name) + app.logger.info(f"移除{script_name}旧任务:{'成功' if removed else '无旧任务'}") + config = SCRIPT_CONFIGS.get(script_name) if not config or config["mode"] != "interval": app.logger.error(f"{script_name}配置无效,无法添加定时任务") @@ -218,26 +210,30 @@ def add_scheduled_task(script_name): interval = int(config.get("interval", 1)) unit = config.get("unit", "hours") + app.logger.info(f"准备添加定时任务:{script_name}(每{interval}{unit})") - # 定义定时任务执行函数 def run_scheduled_script(): + write_script_log(script_name, f"⏰ 定时任务触发(当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')})") script_path = get_script_path(script_name) + write_script_log(script_name, f"📌 尝试执行脚本路径:{script_path}") + if not os.path.exists(script_path): - write_script_log(script_name, f"❌ 脚本不存在:{script_path}") + write_script_log(script_name, f"❌ 脚本路径不存在!(检查路径是否正确)") return - # 构建执行命令 if script_name.endswith(".bat"): cmd = ["cmd", "/c", script_path] + write_script_log(script_name, f"📝 执行BAT命令:{cmd}") elif script_name.endswith(".py"): - cmd = ["python", script_path] + python_path = config.get("python_path", "python") + cmd = [python_path, script_path] + write_script_log(script_name, f"📝 执行Python命令:{cmd}(Python路径:{python_path})") else: - write_script_log(script_name, "❌ 不支持的脚本类型") + write_script_log(script_name, "❌ 不支持的脚本类型(仅支持.bat/.py)") return - # 执行脚本(单次) try: - write_script_log(script_name, "✅ 定时任务开始执行") + write_script_log(script_name, "▶️ 开始执行定时任务...") result = subprocess.run( cmd, stdout=subprocess.PIPE, @@ -245,21 +241,21 @@ def add_scheduled_task(script_name): text=True, encoding="utf-8", cwd=TASKS_DIR, - timeout=3600 # 超时1小时(可根据需求调整) + timeout=60 ) - # 记录执行结果 + write_script_log(script_name, f"📊 任务执行完成(返回码:{result.returncode})") + if result.stdout: + for line in result.stdout.splitlines(): + write_script_log(script_name, f"📝 脚本输出:{line.strip()}") if result.returncode == 0: write_script_log(script_name, "✅ 定时任务执行成功") - if result.stdout: - write_script_log(script_name, f"📝 输出:{result.stdout.strip()}") else: - write_script_log(script_name, f"❌ 定时任务执行失败(返回码:{result.returncode})") - if result.stdout: - write_script_log(script_name, f"📝 错误输出:{result.stdout.strip()}") + write_script_log(script_name, f"❌ 定时任务执行失败(返回码非0)") + except subprocess.TimeoutExpired: + write_script_log(script_name, "❌ 定时任务执行超时(超过60秒)") except Exception as e: - write_script_log(script_name, f"❌ 定时任务执行异常:{str(e)}") + write_script_log(script_name, f"❌ 任务执行异常:{str(e)}(检查脚本是否可手动运行)") - # 根据unit设置定时频率 job = None if unit == "minutes": job = schedule.every(interval).minutes.do(run_scheduled_script) @@ -271,10 +267,9 @@ def add_scheduled_task(script_name): write_script_log(script_name, f"❌ 不支持的时间单位:{unit}") return False - # 保存定时任务 + job.ident = script_name scheduled_jobs[script_name] = job - write_script_log(script_name, f"⏰ 定时任务已添加(每{interval}{unit}执行一次)") - app.logger.info(f"{script_name}定时任务添加成功(每{interval}{unit})") + write_script_log(script_name, f"✅ 定时任务已添加(每{interval}{unit}执行一次)") return True except Exception as e: write_script_log(script_name, f"❌ 添加定时任务失败:{str(e)}") @@ -283,7 +278,6 @@ def add_scheduled_task(script_name): def remove_scheduled_task(script_name): - """移除定时任务""" global scheduled_jobs if script_name in scheduled_jobs: schedule.cancel_job(scheduled_jobs[script_name]) @@ -295,38 +289,50 @@ def remove_scheduled_task(script_name): def run_scheduler(): - """定时任务调度线程(循环检查任务)""" global scheduler_running app.logger.info("定时任务调度线程已启动") while scheduler_running: + pending_jobs = len(schedule.get_jobs()) + app.logger.debug(f"调度器循环 - 待执行任务数: {pending_jobs}") schedule.run_pending() time.sleep(SCHEDULER_INTERVAL) app.logger.info("定时任务调度线程已停止") -# ====================== 长期运行脚本处理 ====================== +# 长期运行脚本处理 def get_script_path(script_name): """获取脚本的完整路径""" - return os.path.join(TASKS_DIR, script_name) + safe_script_name = script_name.replace('/', os.sep).replace('\\', os.sep) + script_path = os.path.join(TASKS_DIR, safe_script_name) + app.logger.debug(f"解析脚本路径:{script_name} -> {script_path}(是否存在:{os.path.exists(script_path)})") + return script_path def start_long_running_script(script_name): - """启动长期运行的脚本(带输出捕获)""" + # 检查是否已在系统中运行 + is_running, pid = is_script_running(script_name) + if is_running: + write_script_log(script_name, f"⚠️ 检测到脚本已在运行(PID: {pid}),无需重复启动") + running_processes[script_name] = psutil.Process(pid) + save_processes() + return True + script_path = get_script_path(script_name) if not os.path.exists(script_path): write_script_log(script_name, f"❌ 脚本不存在:{script_path}") return False - # 构建执行命令 + config = SCRIPT_CONFIGS.get(script_name, {}) + if script_name.endswith(".bat"): - cmd = ["cmd", "/c", script_path] + cmd = ["cmd", "/c", "start", "/b", script_path] elif script_name.endswith(".py"): - cmd = ["python", script_path] + python_path = config.get("python_path", "python") + cmd = [python_path, script_path] else: write_script_log(script_name, "❌ 不支持的脚本类型") return False - # 定义子线程执行函数 def target(): try: proc = subprocess.Popen( @@ -339,20 +345,16 @@ def start_long_running_script(script_name): errors="ignore", cwd=TASKS_DIR ) - # 记录运行中的进程 running_processes[script_name] = proc - write_script_log(script_name, "✅ 长期运行脚本已启动") - # 保存进程信息 save_processes() + write_script_log(script_name, "✅ 长期运行脚本已启动") - # 实时捕获输出 while proc.poll() is None: line = proc.stdout.readline() if line: write_script_log(script_name, line.strip()) - time.sleep(0.1) # 减少CPU占用 + time.sleep(0.1) - # 进程退出处理 exit_code = proc.returncode if exit_code == 0: write_script_log(script_name, f"ℹ️ 脚本正常退出(返回码:{exit_code})") @@ -361,57 +363,83 @@ def start_long_running_script(script_name): except Exception as e: write_script_log(script_name, f"❌ 脚本执行异常:{str(e)}") finally: - # 移除进程记录(监控线程会重启) running_processes.pop(script_name, None) - save_processes() # 更新进程信息 + save_processes() app.logger.info(f"{script_name}长期运行进程已退出") - # 启动子线程 thread = threading.Thread(target=target, daemon=True) thread.start() return True +@app.route("/api/scheduled-jobs") +def api_scheduled_jobs(): + """查看当前所有定时任务""" + jobs = [] + for job in schedule.get_jobs(): + jobs.append({ + "script_name": next((name for name, j in scheduled_jobs.items() if j == job), "未知"), + "interval": str(job.interval), + "next_run": job.next_run.strftime("%Y-%m-%d %H:%M:%S") if job.next_run else "立即执行" + }) + return jsonify(jobs), 200 + + def monitor_long_running_scripts(): - """监控长期运行的脚本(仅重启长期模式脚本,排除单次/定时)""" global monitor_running app.logger.info("长期脚本监控线程已启动") while monitor_running: - # 遍历所有配置,仅处理「长期运行」模式的脚本 for script_name, config in SCRIPT_CONFIGS.items(): - if config.get("mode") == "long-running": # 仅长期模式需要监控重启 - # 检查脚本是否在运行(进程存在且未退出) - is_running = script_name in running_processes and running_processes[script_name].poll() is None - if not is_running: + if config.get("mode") == "long-running" and script_name not in manual_stopped_scripts: + is_system_running, _ = is_script_running(script_name) + is_memory_running = False + if script_name in running_processes: + try: + proc = psutil.Process(running_processes[script_name].pid) + is_memory_running = proc.is_running() + except (psutil.NoSuchProcess, psutil.AccessDenied): + is_memory_running = False + + if not is_system_running and not is_memory_running: app.logger.info(f"{script_name}长期脚本未运行,尝试重启") write_script_log(script_name, "⚠️ 脚本未运行,尝试重启...") start_long_running_script(script_name) - time.sleep(MONITOR_INTERVAL) # 每10秒检查一次 - app.logger.info("长期脚本监控线程已停止") + time.sleep(MONITOR_INTERVAL) -# ====================== 单次运行脚本处理 ====================== +# 单次运行脚本处理 def start_single_run_script(script_name): - """启动单次运行的脚本(执行一次后停止)""" + # 检查是否已在运行 + if script_name in running_processes: + proc = running_processes[script_name] + if proc.poll() is None: + write_script_log(script_name, "⚠️ 单次任务已在运行中,无需重复启动") + return False + + is_running, pid = is_script_running(script_name) + if is_running: + write_script_log(script_name, f"⚠️ 检测到脚本已在运行(PID: {pid}),无需重复启动") + return False + script_path = get_script_path(script_name) if not os.path.exists(script_path): write_script_log(script_name, f"❌ 脚本不存在:{script_path}") return False - # 构建执行命令 + config = SCRIPT_CONFIGS.get(script_name, {}) + if script_name.endswith(".bat"): cmd = ["cmd", "/c", script_path] elif script_name.endswith(".py"): - cmd = ["python", script_path] + python_path = config.get("python_path", "python") + cmd = [python_path, script_path] else: write_script_log(script_name, "❌ 不支持的脚本类型") return False - # 定义子线程执行函数 def target(): try: write_script_log(script_name, "✅ 单次运行脚本已启动") - # 执行脚本(带超时控制) proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, @@ -421,17 +449,15 @@ def start_single_run_script(script_name): errors="ignore", cwd=TASKS_DIR ) - # 记录进程(用于停止功能) running_processes[script_name] = proc + save_processes() - # 实时捕获输出 while proc.poll() is None: line = proc.stdout.readline() if line: write_script_log(script_name, line.strip()) time.sleep(0.1) - # 记录执行结果 exit_code = proc.returncode if exit_code == 0: write_script_log(script_name, f"✅ 单次运行脚本执行成功(返回码:{exit_code})") @@ -440,278 +466,263 @@ def start_single_run_script(script_name): except Exception as e: write_script_log(script_name, f"❌ 单次运行脚本异常:{str(e)}") finally: - # 移除进程记录(执行完成/异常终止) running_processes.pop(script_name, None) - app.logger.info(f"{script_name}单次运行脚本已结束") + save_processes() - # 启动子线程 thread = threading.Thread(target=target, daemon=True) thread.start() return True -# ====================== Flask接口 ====================== +# API接口 @app.route("/") def index(): - """前端页面入口""" return render_template("index.html") @app.route("/api/scripts") -def api_scripts(): - """获取脚本列表(含修改时间,支持前端排序)""" +def api_get_scripts(): try: + # 扫描所有脚本 scripts = [] - if os.path.exists(TASKS_DIR): - for f in os.listdir(TASKS_DIR): - f_path = os.path.join(TASKS_DIR, f) - # 仅保留.bat/.py文件 - if os.path.isfile(f_path) and f.endswith((".bat", ".py")): - # 获取文件修改时间(时间戳,单位:秒) - modify_time = os.path.getmtime(f_path) - # 格式化时间为可读格式(备用,前端也可自行格式化) - modify_time_str = datetime.fromtimestamp(modify_time).strftime("%Y-%m-%d %H:%M:%S") + for root, dirs, files in os.walk(TASKS_DIR): + for file in files: + if file.endswith((".py", ".bat")): + # 计算相对路径作为脚本名 + rel_path = os.path.relpath(os.path.join(root, file), TASKS_DIR) + script_name = normalize_script_name(rel_path) + # 获取修改时间 + modify_time = os.path.getmtime(os.path.join(root, file)) scripts.append({ - "name": f, - "modify_time": modify_time, # 时间戳(用于排序) - "modify_time_str": modify_time_str # 格式化时间(用于显示) + "name": script_name, + "modify_time": modify_time, + "modify_time_str": datetime.fromtimestamp(modify_time).strftime("%Y-%m-%d %H:%M") }) - # 默认按修改时间降序排序(后端先排序,减少前端计算) - scripts_sorted = sorted(scripts, key=lambda x: x["modify_time"], reverse=True) - return jsonify(scripts_sorted), 200 + + # 只保留有配置的脚本 + configured_scripts = [s for s in scripts if s["name"] in SCRIPT_CONFIGS] + return jsonify(configured_scripts), 200 except Exception as e: app.logger.error(f"获取脚本列表失败:{str(e)}") return jsonify({"error": str(e)}), 500 -@app.route("/api/scripts/search") -def api_script_search(): - """搜索脚本(支持模糊匹配名称)""" +@app.route("/api/scripts/status") +def api_get_scripts_status(): try: - # 获取前端传入的搜索关键词 - keyword = request.args.get("keyword", "").strip().lower() - scripts = [] - if os.path.exists(TASKS_DIR): - for f in os.listdir(TASKS_DIR): - f_path = os.path.join(TASKS_DIR, f) - if os.path.isfile(f_path) and f.endswith((".bat", ".py")): - # 模糊匹配(不区分大小写) - if keyword in f.lower(): - modify_time = os.path.getmtime(f_path) - modify_time_str = datetime.fromtimestamp(modify_time).strftime("%Y-%m-%d %H:%M:%S") - scripts.append({ - "name": f, - "modify_time": modify_time, - "modify_time_str": modify_time_str - }) - # 按修改时间降序排序 - scripts_sorted = sorted(scripts, key=lambda x: x["modify_time"], reverse=True) - return jsonify(scripts_sorted), 200 + result = {} + # 遍历所有有配置的脚本 + for script_name in SCRIPT_CONFIGS.keys(): + # 获取状态 + is_running, pid = is_script_running(script_name) + status = "running" if is_running else "scheduled" if script_name in scheduled_jobs else "stopped" + + # 获取模式和调度信息 + mode = SCRIPT_CONFIGS[script_name].get("mode", "single-run") + schedule_info = "" + if mode == "interval" and script_name in scheduled_jobs: + job = scheduled_jobs[script_name] + schedule_info = f"下次执行: {job.next_run.strftime('%Y-%m-%d %H:%M')}" + + result[script_name] = { + "status": status, + "running": is_running, + "scheduled": script_name in scheduled_jobs, + "mode": mode, + "schedule_info": schedule_info + } + return jsonify(result), 200 except Exception as e: - app.logger.error(f"搜索脚本失败:{str(e)}") + app.logger.error(f"批量获取脚本状态失败:{str(e)}") return jsonify({"error": str(e)}), 500 -@app.route("/api/config") -def api_get_config(): - """获取所有脚本配置""" - return jsonify(SCRIPT_CONFIGS), 200 - - -@app.route("/api/config", methods=["POST"]) -def api_save_config(): - """保存脚本配置(支持三种模式)""" - try: - data = request.get_json() - if not isinstance(data, dict): - return jsonify({"error": "无效的配置格式(需为JSON对象)"}), 400 - - # 更新配置并保存 - for script_name, config in data.items(): - # 校验配置必填项 - if "mode" not in config: - return jsonify({"error": f"{script_name}缺少'mode'配置(支持:long-running/interval/single-run)"}), 400 - # 仅定时模式需要校验interval和unit,单次/长期模式无需 - if config["mode"] == "interval" and ("interval" not in config or "unit" not in config): - return jsonify({"error": f"{script_name}定时模式需配置'interval'和'unit'"}), 400 - # 保存配置 - SCRIPT_CONFIGS[script_name] = config - - save_configs() - return jsonify({"msg": "配置已保存"}), 200 - except Exception as e: - app.logger.error(f"保存配置失败:{str(e)}") - return jsonify({"error": str(e)}), 500 - - -@app.route("/api/start/") -def api_start_script(script_name): - """启动脚本(支持三种模式:长期/定时/单次)""" - script_name = unquote(script_name) # 解码URL编码的脚本名 - - # 1. 校验前置条件 - if script_name not in SCRIPT_CONFIGS: - return jsonify({"error": f"未找到{script_name}的配置,请先保存配置"}), 404 - - config = SCRIPT_CONFIGS[script_name] - mode = config.get("mode", "long-running") # 默认长期运行 - - # 2. 根据模式处理 - if mode == "long-running": - # 检查是否已在运行 - if script_name in running_processes and running_processes[script_name].poll() is None: - return jsonify({"error": f"{script_name}已在运行"}), 400 - # 启动长期脚本 - success = start_long_running_script(script_name) - if success: - return jsonify({"msg": f"{script_name}(长期运行模式)已启动"}), 200 - else: - return jsonify({"error": f"{script_name}启动失败,请查看日志"}), 500 - - elif mode == "interval": - # 检查是否已添加定时任务 - if script_name in scheduled_jobs: - return jsonify({"error": f"{script_name}定时任务已存在"}), 400 - # 添加定时任务 - success = add_scheduled_task(script_name) - if success: - return jsonify({"msg": f"{script_name}(定时模式)已调度"}), 200 - else: - return jsonify({"error": f"{script_name}定时任务添加失败"}), 500 - - elif mode == "single-run": - # 检查是否已在运行 - if script_name in running_processes and running_processes[script_name].poll() is None: - return jsonify({"error": f"{script_name}已在运行"}), 400 - # 启动单次脚本 - success = start_single_run_script(script_name) - if success: - return jsonify({"msg": f"{script_name}(单次模式)已启动"}), 200 - else: - return jsonify({"error": f"{script_name}启动失败,请查看日志"}), 500 - - else: - return jsonify({"error": f"不支持的运行模式:{mode}"}), 400 - - -@app.route("/api/stop/") -def api_stop_script(script_name): - """停止脚本(根据模式停止对应进程或任务)""" - script_name = unquote(script_name) - config = SCRIPT_CONFIGS.get(script_name) - if not config: - return jsonify({"error": f"未找到{script_name}的配置"}), 404 - - mode = config.get("mode", "long-running") - stopped = False - - # 处理长期运行模式 - if mode == "long-running": - if script_name in running_processes: - proc = running_processes[script_name] - try: - # 终止进程 - proc.terminate() - # 等待进程退出 - time.sleep(1) - if proc.poll() is None: - proc.kill() # 强制终止 - stopped = True - running_processes.pop(script_name, None) - save_processes() # 更新进程信息 - write_script_log(script_name, "⏹️ 长期运行脚本已停止") - except Exception as e: - app.logger.error(f"停止{script_name}失败:{str(e)}") - return jsonify({"error": f"停止失败:{str(e)}"}), 500 - - # 处理定时模式 - elif mode == "interval": - stopped = remove_scheduled_task(script_name) - - # 处理单次运行模式 - elif mode == "single-run": - if script_name in running_processes: - proc = running_processes[script_name] - try: - proc.terminate() - time.sleep(1) - if proc.poll() is None: - proc.kill() - stopped = True - running_processes.pop(script_name, None) - write_script_log(script_name, "⏹️ 单次运行脚本已停止") - except Exception as e: - app.logger.error(f"停止{script_name}失败:{str(e)}") - return jsonify({"error": f"停止失败:{str(e)}"}), 500 - - if stopped: - return jsonify({"msg": f"{script_name}已停止"}), 200 - else: - return jsonify({"error": f"{script_name}未在运行或无法停止"}), 400 - - -@app.route("/api/status/") +@app.route("/api/status/") def api_get_status(script_name): - """获取脚本运行状态和输出日志""" - script_name = unquote(script_name) - config = SCRIPT_CONFIGS.get(script_name, {}) - mode = config.get("mode", "long-running") + try: + script_name = normalize_script_name(unquote(script_name)) + is_running, pid = is_script_running(script_name) + is_scheduled = script_name in scheduled_jobs - # 1. 检查运行状态 - status = "stopped" - running = False - scheduled = False + status = "stopped" + if is_running: + status = "running" + elif is_scheduled: + status = "scheduled" - if mode == "long-running": - running = script_name in running_processes and running_processes[script_name].poll() is None - status = "running" if running else "stopped" - elif mode == "interval": - scheduled = script_name in scheduled_jobs - status = "scheduled" if scheduled else "stopped" - elif mode == "single-run": - running = script_name in running_processes and running_processes[script_name].poll() is None - status = "running" if running else "stopped" + # 获取输出日志 + output = load_script_log(script_name) - # 2. 获取调度信息(仅定时模式) - schedule_info = "" - if mode == "interval" and scheduled: - interval = config.get("interval", 1) - unit = config.get("unit", "hours") - schedule_info = f"每{interval}{unit}执行一次" - - # 3. 获取输出日志(内存缓存 + 历史日志) - logs = load_script_log(script_name) # 加载文件日志 - # 合并内存缓存(内存日志更新更快) - if script_name in script_outputs: - logs = logs + script_outputs[script_name][len(logs):] - - return jsonify({ - "status": status, - "running": running, - "scheduled": scheduled, - "mode": mode, - "schedule_info": schedule_info, - "output": logs - }), 200 + return jsonify({ + "status": status, + "running": is_running, + "scheduled": is_scheduled, + "pid": pid, + "output": output + }), 200 + except Exception as e: + app.logger.error(f"获取{script_name}状态失败:{str(e)}") + return jsonify({"error": str(e)}), 500 -# ====================== 程序入口 ====================== -def init_app(): - """应用初始化总函数""" +@app.route("/api/start/") +def api_start_script(script_name): + try: + script_name = normalize_script_name(unquote(script_name)) + if script_name not in SCRIPT_CONFIGS: + return jsonify({"error": "脚本未配置"}), 400 + + config = SCRIPT_CONFIGS[script_name] + mode = config.get("mode", "single-run") + + # 清除手动停止标记 + manual_stopped_scripts.discard(script_name) + + if mode == "long-running": + success = start_long_running_script(script_name) + msg = "启动成功" if success else "启动失败" + elif mode == "interval": + success = add_scheduled_task(script_name) + msg = "定时任务启动成功" if success else "定时任务启动失败" + else: # single-run + success = start_single_run_script(script_name) + msg = "启动成功" if success else "启动失败" + + return jsonify({"msg": msg, "success": success}), 200 + except Exception as e: + app.logger.error(f"启动{script_name}失败:{str(e)}") + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/stop/") +def api_stop_script(script_name): + try: + script_name = normalize_script_name(unquote(script_name)) + if script_name not in SCRIPT_CONFIGS: + return jsonify({"error": "脚本未配置"}), 400 + + # 标记为手动停止 + manual_stopped_scripts.add(script_name) + + # 停止定时任务 + remove_scheduled_task(script_name) + + # 停止进程 + is_running, pid = is_script_running(script_name) + if is_running and pid: + try: + proc = psutil.Process(pid) + proc.terminate() + time.sleep(1) + if proc.is_running(): + proc.kill() + write_script_log(script_name, f"✅ 进程已终止(PID: {pid})") + except Exception as e: + app.logger.error(f"终止{script_name}进程失败:{str(e)}") + return jsonify({"error": f"终止进程失败:{str(e)}"}), 500 + + # 清除内存记录 + running_processes.pop(script_name, None) + save_processes() + + return jsonify({"msg": "已停止"}), 200 + except Exception as e: + app.logger.error(f"停止{script_name}失败:{str(e)}") + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/config", methods=["GET", "POST"]) +def api_config(): + if request.method == "POST": + try: + new_configs = request.get_json() + if not isinstance(new_configs, dict): + return jsonify({"error": "配置格式错误"}), 400 + + # 更新配置 + for script_name, config in new_configs.items(): + normalized_name = normalize_script_name(script_name) + SCRIPT_CONFIGS[normalized_name] = config + # 重新加载定时任务 + if config.get("mode") == "interval": + add_scheduled_task(normalized_name) + else: + remove_scheduled_task(normalized_name) + + save_configs() + return jsonify({"msg": "配置保存成功"}), 200 + except Exception as e: + app.logger.error(f"保存配置失败:{str(e)}") + return jsonify({"error": str(e)}), 500 + else: + # 获取当前配置 + return jsonify(SCRIPT_CONFIGS), 200 + + +@app.route("/api/directory") +def api_get_directory(): + try: + path = request.args.get("path", "") + full_path = os.path.join(TASKS_DIR, path.replace('/', os.sep)) + + if not os.path.exists(full_path) or not os.path.isdir(full_path): + return jsonify({"error": "目录不存在"}), 404 + + directories = [] + files = [] + for item in os.listdir(full_path): + item_path = os.path.join(full_path, item) + rel_path = os.path.relpath(item_path, TASKS_DIR) + normalized_path = normalize_script_name(rel_path) + + if os.path.isdir(item_path): + directories.append({ + "name": item, + "path": normalized_path + }) + else: + files.append({ + "name": item, + "path": normalized_path + }) + + # 排序:目录在前,按名称排序 + directories.sort(key=lambda x: x["name"].lower()) + files.sort(key=lambda x: x["name"].lower()) + + return jsonify({ + "directories": directories, + "files": files + }), 200 + except Exception as e: + app.logger.error(f"获取目录内容失败:{str(e)}") + return jsonify({"error": str(e)}), 500 + + +# 启动与初始化 +def main(): init_dirs() load_configs() - load_processes() # 加载并接管进程 - # 启动监控线程和调度线程 - threading.Thread(target=monitor_long_running_scripts, daemon=True).start() - threading.Thread(target=run_scheduler, daemon=True).start() + load_processes() + # 启动调度器线程 + scheduler_thread = threading.Thread(target=run_scheduler, daemon=True) + scheduler_thread.start() -import atexit + # 启动监控线程 + monitor_thread = threading.Thread(target=monitor_long_running_scripts, daemon=True) + monitor_thread.start() + + # 启动定时任务 + for script_name, config in SCRIPT_CONFIGS.items(): + if config.get("mode") == "interval": + add_scheduled_task(script_name) + elif config.get("mode") == "long-running": + # 启动长期运行脚本 + start_long_running_script(script_name) + + app.run(host="0.0.0.0", port=5078, debug=False) -# 注册退出钩子,确保进程信息最终被保存 -atexit.register(save_processes) if __name__ == "__main__": - init_app() - # 启动Flask服务(debug模式便于开发,生产环境需关闭) - app.run(host="0.0.0.0", port=5000, debug=True) \ No newline at end of file + main() \ No newline at end of file diff --git a/config.json b/config.json deleted file mode 100644 index bff5545..0000000 --- a/config.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "3.bat": { - "mode": "interval", - "interval": 1, - "unit": "minutes" - }, - "2.bat": { - "mode": "long-running" - }, - "1.bat": { - "mode": "single-run" - }, - "[object Object]": { - "mode": "interval", - "interval": 1, - "unit": "minutes" - } -} \ No newline at end of file diff --git a/tasks/1.bat b/tasks/1.bat index f627729..24e9bd4 100644 --- a/tasks/1.bat +++ b/tasks/1.bat @@ -22,4 +22,3 @@ echo. echo ============================= echo 报告生成于 %date% %time% echo ============================= -pause \ No newline at end of file diff --git a/tasks/3.bat b/tasks/3.bat index 716e496..203cb4a 100644 --- a/tasks/3.bat +++ b/tasks/3.bat @@ -28,5 +28,4 @@ echo. echo ============================= echo 测试完成。 echo 诊断时间: %time% -echo ============================= -pause \ No newline at end of file +echo ============================= \ No newline at end of file diff --git a/tasks/文件编码检测.py b/tasks/文件编码检测.py new file mode 100644 index 0000000..3744d04 --- /dev/null +++ b/tasks/文件编码检测.py @@ -0,0 +1,12 @@ +import chardet + +def detect_file_encoding(file_path): + with open(file_path, 'rb') as file: + raw_data = file.read() + result = chardet.detect(raw_data) + encoding = result['encoding'] + confidence = result['confidence'] + print(f"Detected encoding: {encoding}, Confidence: {confidence}") + +file_path = r"C:\Users\zy187\Desktop\销售明细.csv" +detect_file_encoding(file_path) \ No newline at end of file diff --git a/templates/index.html b/templates/index.html index 7640d3e..16b6482 100644 --- a/templates/index.html +++ b/templates/index.html @@ -4,11 +4,9 @@ 脚本管理平台 - -

脚本管理平台 系统运行中

-
-
-
@@ -252,7 +235,6 @@
-
脚本列表
@@ -267,14 +249,10 @@
-
-
-
-
-
脚本配置
-
- +
+ + +
+
+ + -
-
- + + + \ No newline at end of file diff --git a/项目说明.md b/项目说明.md new file mode 100644 index 0000000..8481f20 --- /dev/null +++ b/项目说明.md @@ -0,0 +1,78 @@ +# batManage项目说明文档 + +## 项目概述 +batManage是一个脚本管理与执行工具,主要用于对批处理脚本(.bat)和Python脚本(.py)进行统一管理、运行监控和日志记录,支持单次脚本运行模式,具备进程状态检测、重复启动防护等功能。 + +## 核心功能 +1. **脚本运行管理** + - 支持.bat和.py两种类型脚本的启动执行 + - 防止脚本重复运行(通过内存记录和系统进程检测双重校验) + - 实时捕获脚本输出并记录日志 + +2. **进程监控** + - 维护运行中脚本的进程列表 + - 实时检测进程状态,处理进程结束后的资源清理 + - 异常处理与日志记录 + +3. **日志管理** + - 记录脚本启动、运行状态、结束信息 + - 捕获脚本输出内容并写入日志 + - 记录错误信息与异常情况 + +## 关键模块说明 + +### 脚本启动模块(start_single_run_script函数) +该函数是项目核心功能实现,主要流程包括: +1. **运行状态检查** + - 检查内存中是否已有记录的运行进程 + - 检测系统中是否存在对应脚本的运行进程 + +2. **脚本合法性验证** + - 检查脚本文件是否存在 + - 验证脚本类型是否支持(仅支持.bat和.py) + +3. **命令构建** + - 对.bat脚本:使用`cmd /c`命令执行 + - 对.py脚本:使用配置的Python路径执行(默认使用"python") + +4. **异步执行** + - 通过线程异步执行脚本,避免阻塞主程序 + - 实时读取脚本输出并记录 + - 处理脚本执行完成后的状态记录与资源清理 + +## 技术实现细节 +- 使用`subprocess.Popen`创建子进程执行脚本 +- 通过`threading.Thread`实现异步执行 +- 使用进程返回码判断脚本执行结果(0为成功,非0为失败) +- 采用字典`running_processes`维护内存中的进程记录 +- 提供`save_processes`方法持久化进程信息(具体实现未展示) + +## 项目结构 +``` +batManage/ +├── app.py # 核心功能实现 +├── tasks/ # 脚本存放目录 +│ ├── 1.bat +│ ├── 3.bat +│ └── 文件编码检测.py +├── templates/ # 前端模板目录 +│ └── index.html +├── .idea/ # 项目配置目录 +└── config.json # 配置文件 +``` + +## 使用说明 +1. 将需要管理的脚本放入tasks目录 +2. 通过调用`start_single_run_script(script_name)`启动脚本 +3. 系统会自动处理脚本运行过程中的监控与日志记录 + +## 注意事项 +- 确保脚本路径正确,且具有可执行权限 +- 对于Python脚本,可在配置中指定特定的Python解释器路径 +- 系统会自动处理进程异常退出的情况,并清理相关记录 + +## 扩展建议 +1. 增加定时任务功能,支持脚本周期性执行 +2. 完善前端界面,提供可视化的脚本管理与监控 +3. 增加脚本执行权限管理,限制特定脚本的运行权限 +4. 实现脚本执行历史记录查询与统计分析功能 \ No newline at end of file