Files
bat_manage/app.py
T

717 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Flask, render_template, request, jsonify
from flask_cors import CORS # 处理跨域
import os
import subprocess
import json
import threading
import time
import schedule # 定时任务库
from urllib.parse import unquote
from datetime import datetime
# ====================== 基础配置 ======================
app = Flask(__name__)
# 允许所有跨域请求(外网访问必备)
CORS(app, resources=r"/*")
# 目录配置(自动创建不存在的目录)
BASE_DIR = os.getcwd()
TASKS_DIR = os.path.join(BASE_DIR, "tasks") # 脚本存储目录
LOG_DIR = os.path.join(BASE_DIR, "logs") # 日志存储目录
CONFIG_FILE = os.path.join(BASE_DIR, "config.json") # 配置文件
PROCESSES_FILE = os.path.join(BASE_DIR, "processes.json") # 进程持久化文件
# 常量配置
LOG_LINES = 200 # 内存中保留的日志行数(防止内存溢出)
MONITOR_INTERVAL = 10 # 长期脚本监控间隔(秒)
SCHEDULER_INTERVAL = 1 # 定时任务调度间隔(秒)
# ====================== 全局变量 ======================
SCRIPT_CONFIGS = {} # 脚本配置:{脚本名: {mode, interval, unit...}}
running_processes = {} # 长期运行的进程:{脚本名: subprocess.Popen对象}
script_outputs = {} # 内存日志缓存:{脚本名: [日志行1, 日志行2...]}
scheduled_jobs = {} # 定时任务:{脚本名: schedule.Job对象}
scheduler_running = True # 定时调度器运行标志
monitor_running = True # 长期脚本监控运行标志
# ====================== 初始化函数 ======================
def init_dirs():
"""初始化必要目录(tasks/logs"""
for dir_path in [TASKS_DIR, LOG_DIR]:
if not os.path.exists(dir_path):
os.makedirs(dir_path, exist_ok=True)
app.logger.info(f"创建目录:{dir_path}")
def load_configs():
"""加载脚本配置(从config.json"""
global SCRIPT_CONFIGS
try:
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
SCRIPT_CONFIGS = json.load(f)
# 校验配置格式,确保每个脚本有基础配置
for script_name, config in SCRIPT_CONFIGS.items():
if "mode" not in config:
config["mode"] = "long-running" # 默认长期运行
if config["mode"] == "interval" and ("interval" not in config or "unit" not in config):
config["interval"] = 1
config["unit"] = "hours" # 默认每小时执行
else:
SCRIPT_CONFIGS = {}
app.logger.info("配置文件不存在,初始化空配置")
except Exception as e:
SCRIPT_CONFIGS = {}
app.logger.error(f"加载配置失败:{str(e)},使用空配置")
def save_configs():
"""保存脚本配置到config.json"""
try:
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(SCRIPT_CONFIGS, f, indent=2, ensure_ascii=False)
app.logger.info("配置保存成功")
except Exception as e:
app.logger.error(f"保存配置失败:{str(e)}")
raise # 抛出异常让接口返回错误
def save_processes():
"""保存当前运行的长期任务进程信息到文件"""
processes_data = {}
for script_name, proc in running_processes.items():
# 仅保存长期运行模式的进程
if SCRIPT_CONFIGS.get(script_name, {}).get("mode") == "long-running":
processes_data[script_name] = {
"pid": proc.pid,
"script_name": script_name,
"start_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"cmd": proc.args # 记录启动命令(用于校验身份)
}
try:
with open(PROCESSES_FILE, "w", encoding="utf-8") as f:
json.dump(processes_data, f, indent=2, ensure_ascii=False)
app.logger.info("进程信息已持久化")
except Exception as e:
app.logger.error(f"保存进程信息失败:{str(e)}")
def is_process_alive(pid, expected_cmd):
"""验证进程是否存活且命令行匹配"""
try:
# Windows系统通过tasklist获取进程信息
if os.name == "nt":
# 执行tasklist命令获取进程详情
result = subprocess.run(
["tasklist", "/fi", f"PID eq {pid}", "/fo", "csv", "/v"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8"
)
output = result.stdout
# 检查PID是否存在
if f"{pid}" not in output:
return False
# 检查命令行是否包含目标脚本路径
script_path = get_script_path(expected_cmd[-1]) # 提取脚本路径
return script_path in output
else:
# Linux/macOS系统
result = subprocess.run(
["ps", "-p", str(pid), "-o", "cmd="],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True
)
return result.returncode == 0 and expected_cmd[-1] in result.stdout
except Exception as e:
app.logger.error(f"验证进程失败:{str(e)}")
return False
def load_processes():
"""程序启动时加载并验证持久化的进程信息"""
global running_processes
if not os.path.exists(PROCESSES_FILE):
return
try:
with open(PROCESSES_FILE, "r", encoding="utf-8") as f:
processes_data = json.load(f)
for script_name, data in processes_data.items():
pid = data["pid"]
cmd = data["cmd"]
# 验证进程是否存活且匹配脚本
if is_process_alive(pid, cmd):
# 构造伪Popen对象(仅保留必要属性用于管理)
proc = subprocess.Popen()
proc.pid = pid
proc.args = cmd
running_processes[script_name] = proc
app.logger.info(f"成功接管进程:{script_name}PID: {pid}")
else:
app.logger.warning(f"进程已结束或不匹配:{script_name}PID: {pid}")
except Exception as e:
app.logger.error(f"加载进程信息失败:{str(e)}")
# ====================== 日志处理函数 ======================
def get_log_path(script_name):
"""获取脚本对应的日志文件路径"""
return os.path.join(LOG_DIR, f"{os.path.splitext(script_name)[0]}.log")
def load_script_log(script_name):
"""加载脚本的历史日志(从日志文件)"""
log_path = get_log_path(script_name)
logs = []
if os.path.exists(log_path):
try:
with open(log_path, "r", encoding="utf-8", errors="ignore") as f:
# 读取最后LOG_LINES行,避免大文件加载缓慢
lines = f.readlines()[-LOG_LINES:]
logs = [line.strip() for line in lines if line.strip()]
except Exception as e:
app.logger.error(f"加载{script_name}日志失败:{str(e)}")
logs = [f"⚠️ 加载历史日志失败:{str(e)}"]
return logs
def write_script_log(script_name, content):
"""写入日志到文件和内存缓存"""
# 格式化日志(带时间戳)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_line = f"[{timestamp}] {content}"
# 1. 写入内存缓存(保留最后LOG_LINES行)
if script_name not in script_outputs:
script_outputs[script_name] = []
script_outputs[script_name].append(log_line)
script_outputs[script_name] = script_outputs[script_name][-LOG_LINES:]
# 2. 写入日志文件
log_path = get_log_path(script_name)
try:
with open(log_path, "a", encoding="utf-8", errors="ignore") as f:
f.write(log_line + "\n")
except Exception as e:
app.logger.error(f"写入{script_name}日志失败:{str(e)}")
# 同时记录日志写入错误到内存
error_line = f"[{timestamp}] ⚠️ 日志写入失败:{str(e)}"
script_outputs[script_name].append(error_line)
script_outputs[script_name] = script_outputs[script_name][-LOG_LINES:]
# ====================== 定时任务处理 ======================
def add_scheduled_task(script_name):
"""添加定时任务(基于配置的interval和unit"""
global scheduled_jobs
try:
# 获取脚本配置
config = SCRIPT_CONFIGS.get(script_name)
if not config or config["mode"] != "interval":
app.logger.error(f"{script_name}配置无效,无法添加定时任务")
return False
interval = int(config.get("interval", 1))
unit = config.get("unit", "hours")
# 定义定时任务执行函数
def run_scheduled_script():
script_path = get_script_path(script_name)
if not os.path.exists(script_path):
write_script_log(script_name, f"❌ 脚本不存在:{script_path}")
return
# 构建执行命令
if script_name.endswith(".bat"):
cmd = ["cmd", "/c", script_path]
elif script_name.endswith(".py"):
cmd = ["python", script_path]
else:
write_script_log(script_name, "❌ 不支持的脚本类型")
return
# 执行脚本(单次)
try:
write_script_log(script_name, "✅ 定时任务开始执行")
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
cwd=TASKS_DIR,
timeout=3600 # 超时1小时(可根据需求调整)
)
# 记录执行结果
if result.returncode == 0:
write_script_log(script_name, "✅ 定时任务执行成功")
if result.stdout:
write_script_log(script_name, f"📝 输出:{result.stdout.strip()}")
else:
write_script_log(script_name, f"❌ 定时任务执行失败(返回码:{result.returncode}")
if result.stdout:
write_script_log(script_name, f"📝 错误输出:{result.stdout.strip()}")
except Exception as e:
write_script_log(script_name, f"❌ 定时任务执行异常:{str(e)}")
# 根据unit设置定时频率
job = None
if unit == "minutes":
job = schedule.every(interval).minutes.do(run_scheduled_script)
elif unit == "hours":
job = schedule.every(interval).hours.do(run_scheduled_script)
elif unit == "days":
job = schedule.every(interval).days.do(run_scheduled_script)
else:
write_script_log(script_name, f"❌ 不支持的时间单位:{unit}")
return False
# 保存定时任务
scheduled_jobs[script_name] = job
write_script_log(script_name, f"⏰ 定时任务已添加(每{interval}{unit}执行一次)")
app.logger.info(f"{script_name}定时任务添加成功(每{interval}{unit}")
return True
except Exception as e:
write_script_log(script_name, f"❌ 添加定时任务失败:{str(e)}")
app.logger.error(f"{script_name}添加定时任务失败:{str(e)}")
return False
def remove_scheduled_task(script_name):
"""移除定时任务"""
global scheduled_jobs
if script_name in scheduled_jobs:
schedule.cancel_job(scheduled_jobs[script_name])
del scheduled_jobs[script_name]
write_script_log(script_name, "⏹️ 定时任务已移除")
app.logger.info(f"{script_name}定时任务移除成功")
return True
return False
def run_scheduler():
"""定时任务调度线程(循环检查任务)"""
global scheduler_running
app.logger.info("定时任务调度线程已启动")
while scheduler_running:
schedule.run_pending()
time.sleep(SCHEDULER_INTERVAL)
app.logger.info("定时任务调度线程已停止")
# ====================== 长期运行脚本处理 ======================
def get_script_path(script_name):
"""获取脚本的完整路径"""
return os.path.join(TASKS_DIR, script_name)
def start_long_running_script(script_name):
"""启动长期运行的脚本(带输出捕获)"""
script_path = get_script_path(script_name)
if not os.path.exists(script_path):
write_script_log(script_name, f"❌ 脚本不存在:{script_path}")
return False
# 构建执行命令
if script_name.endswith(".bat"):
cmd = ["cmd", "/c", script_path]
elif script_name.endswith(".py"):
cmd = ["python", script_path]
else:
write_script_log(script_name, "❌ 不支持的脚本类型")
return False
# 定义子线程执行函数
def target():
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE,
text=True,
encoding="utf-8",
errors="ignore",
cwd=TASKS_DIR
)
# 记录运行中的进程
running_processes[script_name] = proc
write_script_log(script_name, "✅ 长期运行脚本已启动")
# 保存进程信息
save_processes()
# 实时捕获输出
while proc.poll() is None:
line = proc.stdout.readline()
if line:
write_script_log(script_name, line.strip())
time.sleep(0.1) # 减少CPU占用
# 进程退出处理
exit_code = proc.returncode
if exit_code == 0:
write_script_log(script_name, f"️ 脚本正常退出(返回码:{exit_code}")
else:
write_script_log(script_name, f"❌ 脚本异常退出(返回码:{exit_code}")
except Exception as e:
write_script_log(script_name, f"❌ 脚本执行异常:{str(e)}")
finally:
# 移除进程记录(监控线程会重启)
running_processes.pop(script_name, None)
save_processes() # 更新进程信息
app.logger.info(f"{script_name}长期运行进程已退出")
# 启动子线程
thread = threading.Thread(target=target, daemon=True)
thread.start()
return True
def monitor_long_running_scripts():
"""监控长期运行的脚本(仅重启长期模式脚本,排除单次/定时)"""
global monitor_running
app.logger.info("长期脚本监控线程已启动")
while monitor_running:
# 遍历所有配置,仅处理「长期运行」模式的脚本
for script_name, config in SCRIPT_CONFIGS.items():
if config.get("mode") == "long-running": # 仅长期模式需要监控重启
# 检查脚本是否在运行(进程存在且未退出)
is_running = script_name in running_processes and running_processes[script_name].poll() is None
if not is_running:
app.logger.info(f"{script_name}长期脚本未运行,尝试重启")
write_script_log(script_name, "⚠️ 脚本未运行,尝试重启...")
start_long_running_script(script_name)
time.sleep(MONITOR_INTERVAL) # 每10秒检查一次
app.logger.info("长期脚本监控线程已停止")
# ====================== 单次运行脚本处理 ======================
def start_single_run_script(script_name):
"""启动单次运行的脚本(执行一次后停止)"""
script_path = get_script_path(script_name)
if not os.path.exists(script_path):
write_script_log(script_name, f"❌ 脚本不存在:{script_path}")
return False
# 构建执行命令
if script_name.endswith(".bat"):
cmd = ["cmd", "/c", script_path]
elif script_name.endswith(".py"):
cmd = ["python", script_path]
else:
write_script_log(script_name, "❌ 不支持的脚本类型")
return False
# 定义子线程执行函数
def target():
try:
write_script_log(script_name, "✅ 单次运行脚本已启动")
# 执行脚本(带超时控制)
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="ignore",
cwd=TASKS_DIR
)
# 记录进程(用于停止功能)
running_processes[script_name] = proc
# 实时捕获输出
while proc.poll() is None:
line = proc.stdout.readline()
if line:
write_script_log(script_name, line.strip())
time.sleep(0.1)
# 记录执行结果
exit_code = proc.returncode
if exit_code == 0:
write_script_log(script_name, f"✅ 单次运行脚本执行成功(返回码:{exit_code}")
else:
write_script_log(script_name, f"❌ 单次运行脚本执行失败(返回码:{exit_code}")
except Exception as e:
write_script_log(script_name, f"❌ 单次运行脚本异常:{str(e)}")
finally:
# 移除进程记录(执行完成/异常终止)
running_processes.pop(script_name, None)
app.logger.info(f"{script_name}单次运行脚本已结束")
# 启动子线程
thread = threading.Thread(target=target, daemon=True)
thread.start()
return True
# ====================== Flask接口 ======================
@app.route("/")
def index():
"""前端页面入口"""
return render_template("index.html")
@app.route("/api/scripts")
def api_scripts():
"""获取脚本列表(含修改时间,支持前端排序)"""
try:
scripts = []
if os.path.exists(TASKS_DIR):
for f in os.listdir(TASKS_DIR):
f_path = os.path.join(TASKS_DIR, f)
# 仅保留.bat/.py文件
if os.path.isfile(f_path) and f.endswith((".bat", ".py")):
# 获取文件修改时间(时间戳,单位:秒)
modify_time = os.path.getmtime(f_path)
# 格式化时间为可读格式(备用,前端也可自行格式化)
modify_time_str = datetime.fromtimestamp(modify_time).strftime("%Y-%m-%d %H:%M:%S")
scripts.append({
"name": f,
"modify_time": modify_time, # 时间戳(用于排序)
"modify_time_str": modify_time_str # 格式化时间(用于显示)
})
# 默认按修改时间降序排序(后端先排序,减少前端计算)
scripts_sorted = sorted(scripts, key=lambda x: x["modify_time"], reverse=True)
return jsonify(scripts_sorted), 200
except Exception as e:
app.logger.error(f"获取脚本列表失败:{str(e)}")
return jsonify({"error": str(e)}), 500
@app.route("/api/scripts/search")
def api_script_search():
"""搜索脚本(支持模糊匹配名称)"""
try:
# 获取前端传入的搜索关键词
keyword = request.args.get("keyword", "").strip().lower()
scripts = []
if os.path.exists(TASKS_DIR):
for f in os.listdir(TASKS_DIR):
f_path = os.path.join(TASKS_DIR, f)
if os.path.isfile(f_path) and f.endswith((".bat", ".py")):
# 模糊匹配(不区分大小写)
if keyword in f.lower():
modify_time = os.path.getmtime(f_path)
modify_time_str = datetime.fromtimestamp(modify_time).strftime("%Y-%m-%d %H:%M:%S")
scripts.append({
"name": f,
"modify_time": modify_time,
"modify_time_str": modify_time_str
})
# 按修改时间降序排序
scripts_sorted = sorted(scripts, key=lambda x: x["modify_time"], reverse=True)
return jsonify(scripts_sorted), 200
except Exception as e:
app.logger.error(f"搜索脚本失败:{str(e)}")
return jsonify({"error": str(e)}), 500
@app.route("/api/config")
def api_get_config():
"""获取所有脚本配置"""
return jsonify(SCRIPT_CONFIGS), 200
@app.route("/api/config", methods=["POST"])
def api_save_config():
"""保存脚本配置(支持三种模式)"""
try:
data = request.get_json()
if not isinstance(data, dict):
return jsonify({"error": "无效的配置格式(需为JSON对象)"}), 400
# 更新配置并保存
for script_name, config in data.items():
# 校验配置必填项
if "mode" not in config:
return jsonify({"error": f"{script_name}缺少'mode'配置(支持:long-running/interval/single-run"}), 400
# 仅定时模式需要校验interval和unit,单次/长期模式无需
if config["mode"] == "interval" and ("interval" not in config or "unit" not in config):
return jsonify({"error": f"{script_name}定时模式需配置'interval''unit'"}), 400
# 保存配置
SCRIPT_CONFIGS[script_name] = config
save_configs()
return jsonify({"msg": "配置已保存"}), 200
except Exception as e:
app.logger.error(f"保存配置失败:{str(e)}")
return jsonify({"error": str(e)}), 500
@app.route("/api/start/<script_name>")
def api_start_script(script_name):
"""启动脚本(支持三种模式:长期/定时/单次)"""
script_name = unquote(script_name) # 解码URL编码的脚本名
# 1. 校验前置条件
if script_name not in SCRIPT_CONFIGS:
return jsonify({"error": f"未找到{script_name}的配置,请先保存配置"}), 404
config = SCRIPT_CONFIGS[script_name]
mode = config.get("mode", "long-running") # 默认长期运行
# 2. 根据模式处理
if mode == "long-running":
# 检查是否已在运行
if script_name in running_processes and running_processes[script_name].poll() is None:
return jsonify({"error": f"{script_name}已在运行"}), 400
# 启动长期脚本
success = start_long_running_script(script_name)
if success:
return jsonify({"msg": f"{script_name}(长期运行模式)已启动"}), 200
else:
return jsonify({"error": f"{script_name}启动失败,请查看日志"}), 500
elif mode == "interval":
# 检查是否已添加定时任务
if script_name in scheduled_jobs:
return jsonify({"error": f"{script_name}定时任务已存在"}), 400
# 添加定时任务
success = add_scheduled_task(script_name)
if success:
return jsonify({"msg": f"{script_name}(定时模式)已调度"}), 200
else:
return jsonify({"error": f"{script_name}定时任务添加失败"}), 500
elif mode == "single-run":
# 检查是否已在运行
if script_name in running_processes and running_processes[script_name].poll() is None:
return jsonify({"error": f"{script_name}已在运行"}), 400
# 启动单次脚本
success = start_single_run_script(script_name)
if success:
return jsonify({"msg": f"{script_name}(单次模式)已启动"}), 200
else:
return jsonify({"error": f"{script_name}启动失败,请查看日志"}), 500
else:
return jsonify({"error": f"不支持的运行模式:{mode}"}), 400
@app.route("/api/stop/<script_name>")
def api_stop_script(script_name):
"""停止脚本(根据模式停止对应进程或任务)"""
script_name = unquote(script_name)
config = SCRIPT_CONFIGS.get(script_name)
if not config:
return jsonify({"error": f"未找到{script_name}的配置"}), 404
mode = config.get("mode", "long-running")
stopped = False
# 处理长期运行模式
if mode == "long-running":
if script_name in running_processes:
proc = running_processes[script_name]
try:
# 终止进程
proc.terminate()
# 等待进程退出
time.sleep(1)
if proc.poll() is None:
proc.kill() # 强制终止
stopped = True
running_processes.pop(script_name, None)
save_processes() # 更新进程信息
write_script_log(script_name, "⏹️ 长期运行脚本已停止")
except Exception as e:
app.logger.error(f"停止{script_name}失败:{str(e)}")
return jsonify({"error": f"停止失败:{str(e)}"}), 500
# 处理定时模式
elif mode == "interval":
stopped = remove_scheduled_task(script_name)
# 处理单次运行模式
elif mode == "single-run":
if script_name in running_processes:
proc = running_processes[script_name]
try:
proc.terminate()
time.sleep(1)
if proc.poll() is None:
proc.kill()
stopped = True
running_processes.pop(script_name, None)
write_script_log(script_name, "⏹️ 单次运行脚本已停止")
except Exception as e:
app.logger.error(f"停止{script_name}失败:{str(e)}")
return jsonify({"error": f"停止失败:{str(e)}"}), 500
if stopped:
return jsonify({"msg": f"{script_name}已停止"}), 200
else:
return jsonify({"error": f"{script_name}未在运行或无法停止"}), 400
@app.route("/api/status/<script_name>")
def api_get_status(script_name):
"""获取脚本运行状态和输出日志"""
script_name = unquote(script_name)
config = SCRIPT_CONFIGS.get(script_name, {})
mode = config.get("mode", "long-running")
# 1. 检查运行状态
status = "stopped"
running = False
scheduled = False
if mode == "long-running":
running = script_name in running_processes and running_processes[script_name].poll() is None
status = "running" if running else "stopped"
elif mode == "interval":
scheduled = script_name in scheduled_jobs
status = "scheduled" if scheduled else "stopped"
elif mode == "single-run":
running = script_name in running_processes and running_processes[script_name].poll() is None
status = "running" if running else "stopped"
# 2. 获取调度信息(仅定时模式)
schedule_info = ""
if mode == "interval" and scheduled:
interval = config.get("interval", 1)
unit = config.get("unit", "hours")
schedule_info = f"{interval}{unit}执行一次"
# 3. 获取输出日志(内存缓存 + 历史日志)
logs = load_script_log(script_name) # 加载文件日志
# 合并内存缓存(内存日志更新更快)
if script_name in script_outputs:
logs = logs + script_outputs[script_name][len(logs):]
return jsonify({
"status": status,
"running": running,
"scheduled": scheduled,
"mode": mode,
"schedule_info": schedule_info,
"output": logs
}), 200
# ====================== 程序入口 ======================
def init_app():
"""应用初始化总函数"""
init_dirs()
load_configs()
load_processes() # 加载并接管进程
# 启动监控线程和调度线程
threading.Thread(target=monitor_long_running_scripts, daemon=True).start()
threading.Thread(target=run_scheduler, daemon=True).start()
import atexit
# 注册退出钩子,确保进程信息最终被保存
atexit.register(save_processes)
if __name__ == "__main__":
init_app()
# 启动Flask服务(debug模式便于开发,生产环境需关闭)
app.run(host="0.0.0.0", port=5000, debug=True)