659 lines
24 KiB
Python
659 lines
24 KiB
Python
"""字段监控(多平台适配版)"""
|
|
import sys
|
|
from pathlib import Path
|
|
import pandas as pd
|
|
import zipfile
|
|
from datetime import datetime, timezone, timedelta, date
|
|
import requests
|
|
from typing import Optional, List, Dict, Any
|
|
from decimal import Decimal
|
|
import time
|
|
import numpy as np
|
|
import json
|
|
|
|
|
|
def replace_decimals(obj):
|
|
"""替换Decimal类型为float"""
|
|
if isinstance(obj, dict):
|
|
return {k: replace_decimals(v) for k, v in obj.items()}
|
|
elif isinstance(obj, list):
|
|
return [replace_decimals(item) for item in obj]
|
|
elif isinstance(obj, Decimal):
|
|
return float(obj)
|
|
return obj
|
|
|
|
|
|
class NpEncoder(json.JSONEncoder):
|
|
"""NumPy类型JSON编码器"""
|
|
def default(self, obj):
|
|
if isinstance(obj, np.integer):
|
|
return int(obj)
|
|
elif isinstance(obj, np.floating):
|
|
return float(obj)
|
|
elif isinstance(obj, np.ndarray):
|
|
return obj.tolist()
|
|
else:
|
|
return super(NpEncoder, self).default(obj)
|
|
|
|
|
|
# ---------------------------- 配置项(多平台适配)---------------------------
|
|
class Config:
|
|
# 基础路径(使用Path对象自动处理跨平台路径)
|
|
BASE_DIR = Path(__file__).parent.absolute()
|
|
OUTPUT_DIR = BASE_DIR / "output"
|
|
|
|
# 数据存储目录(英文命名避免编码问题)
|
|
DATA_DIR = OUTPUT_DIR / "data_snapshots"
|
|
ARCHIVE_DIR = OUTPUT_DIR / "archives"
|
|
|
|
# API配置
|
|
JIANDAOYUN_API_TOKEN = "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN"
|
|
|
|
# 运行参数
|
|
RETAIN_DAYS = 7
|
|
COMPRESS_FORMAT = "zip"
|
|
MAX_RETRIES = 3
|
|
RETRY_DELAY = 0.5 # 秒
|
|
|
|
# 监控表单配置
|
|
MONITOR_APP_ID = "6694d3c4fcb69ca9a111a6c4"
|
|
MONITOR_ENTRY_ID = "6850c044f17c934b3ec01fea"
|
|
CHANGES_ENTRY_ID = "6863a402a77925690a470cc5"
|
|
STATUS_ENTRY_ID = "67ede908eb9c22261016466e"
|
|
|
|
@classmethod
|
|
def get_log_file(cls):
|
|
"""获取跨平台兼容的日志文件路径"""
|
|
return cls.OUTPUT_DIR / "data_monitor.log"
|
|
|
|
@classmethod
|
|
def get_changes_file(cls):
|
|
"""获取跨平台兼容的变更记录文件路径"""
|
|
return cls.OUTPUT_DIR / "changes_summary.csv"
|
|
|
|
|
|
# ---------------------- 工具函数(多平台兼容)-----------------------
|
|
class Utils:
|
|
@staticmethod
|
|
def ensure_dir(path):
|
|
"""确保目录存在(自动处理Path对象或字符串)"""
|
|
path = Path(path) if not isinstance(path, Path) else path
|
|
try:
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return True
|
|
except Exception as e:
|
|
print(f"创建目录失败: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def get_iso_time():
|
|
"""获取ISO格式的UTC时间"""
|
|
return datetime.now(timezone.utc).isoformat(timespec='milliseconds') + "Z"
|
|
|
|
@staticmethod
|
|
def is_first_run_today():
|
|
"""检查是否是今天的第一次运行"""
|
|
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
snapshot_file = Config.DATA_DIR / f"snapshot_{today}.csv"
|
|
widget_file = Config.DATA_DIR / f"all_widgets_{today}.csv"
|
|
return not (snapshot_file.exists() and widget_file.exists())
|
|
|
|
@staticmethod
|
|
def safe_csv_write(df, file_path):
|
|
"""安全的CSV写入(带临时文件和错误处理)"""
|
|
file_path = Path(file_path)
|
|
temp_file = file_path.parent / (file_path.name + '.tmp')
|
|
|
|
try:
|
|
df.to_csv(temp_file, index=False, encoding='utf-8-sig')
|
|
if file_path.exists():
|
|
file_path.unlink()
|
|
temp_file.rename(file_path)
|
|
return True
|
|
except Exception as e:
|
|
print(f"CSV写入失败: {e}")
|
|
if temp_file.exists():
|
|
temp_file.unlink()
|
|
return False
|
|
|
|
|
|
# ---------------------- API客户端(多平台兼容)-----------------------
|
|
class APIClient:
|
|
def __init__(self):
|
|
self.headers = {
|
|
'Authorization': Config.JIANDAOYUN_API_TOKEN,
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
def request(self, url, payload, method='POST'):
|
|
"""带重试机制的API请求"""
|
|
for retry in range(Config.MAX_RETRIES + 1):
|
|
try:
|
|
response = requests.request(
|
|
method, url,
|
|
headers=self.headers,
|
|
data=payload,
|
|
timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
return response
|
|
except requests.exceptions.RequestException as e:
|
|
if retry == Config.MAX_RETRIES:
|
|
raise
|
|
time.sleep(Config.RETRY_DELAY)
|
|
|
|
def data_batch_create(self, data: dict, max_retries: int = 20) -> Optional[Dict]:
|
|
"""新建单条表单数据"""
|
|
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/create'
|
|
|
|
payload = json.dumps({
|
|
"app_id": data['api_key'],
|
|
"entry_id": data['entry_id'],
|
|
"data": data['data'],
|
|
"is_start_workflow": data.get('is_start_workflow', "false"),
|
|
"is_start_trigger": data.get('is_start_trigger', "false"),
|
|
"transaction_id": data.get('transaction_id', "")
|
|
})
|
|
|
|
for retry in range(max_retries + 1):
|
|
try:
|
|
response = requests.post(url=url, data=payload, headers=self.headers, timeout=10)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
if retry == max_retries:
|
|
print(f"创建数据失败: {e}")
|
|
return None
|
|
time.sleep(3)
|
|
|
|
def entry_data_list(self, data: dict, max_retries: int = 20) -> Dict:
|
|
"""获取多条表单数据"""
|
|
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
|
|
all_data_batches = []
|
|
last_data_id = None
|
|
|
|
while True:
|
|
payload = json.dumps({
|
|
"app_id": data['api_key'],
|
|
"entry_id": data['entry_id'],
|
|
"limit": 100,
|
|
"data_id": last_data_id
|
|
})
|
|
|
|
for retry in range(max_retries + 1):
|
|
try:
|
|
response = requests.post(url=url, data=payload, headers=self.headers, timeout=10)
|
|
response.raise_for_status()
|
|
data_get = response.json()
|
|
|
|
if data_get.get("data"):
|
|
all_data_batches.extend(data_get['data'])
|
|
last_data_id = data_get['data'][-1].get('_id')
|
|
break
|
|
else:
|
|
return {"data": all_data_batches}
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if retry == max_retries:
|
|
print(f"获取数据列表失败: {e}")
|
|
return {"data": all_data_batches}
|
|
time.sleep(0.1)
|
|
|
|
if not data_get.get("data") or len(data_get['data']) < 100:
|
|
break
|
|
|
|
return {"data": all_data_batches}
|
|
|
|
def entry_data_batch_create(self, data: dict, chunk_size: int = 90, max_retries: int = 20) -> List[Optional[Dict]]:
|
|
"""新建多条数据"""
|
|
data = replace_decimals(data)
|
|
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_create'
|
|
data_get_list = []
|
|
|
|
total_length = len(data['data_list'])
|
|
num_chunks = (total_length + chunk_size - 1) // chunk_size
|
|
|
|
for i in range(num_chunks):
|
|
start_index = i * chunk_size
|
|
end_index = min(start_index + chunk_size, total_length)
|
|
|
|
payload = json.dumps({
|
|
"app_id": data['api_key'],
|
|
"entry_id": data['entry_id'],
|
|
"data_list": data['data_list'][start_index:end_index],
|
|
"is_start_workflow": data.get('is_start_workflow', "false"),
|
|
"is_start_trigger": data.get('is_start_trigger', "false"),
|
|
}, cls=NpEncoder)
|
|
|
|
for retry in range(max_retries + 1):
|
|
try:
|
|
response = requests.post(url=url, data=payload, headers=self.headers, timeout=10)
|
|
response.raise_for_status()
|
|
data_get = response.json()
|
|
if data_get.get("status") == "success":
|
|
data_get_list.append(data_get)
|
|
break
|
|
except requests.exceptions.RequestException as e:
|
|
if retry == max_retries:
|
|
print(f"批量创建数据失败: {e}")
|
|
data_get_list.append(None)
|
|
time.sleep(0.1)
|
|
|
|
return data_get_list
|
|
|
|
|
|
# ---------------------- 数据处理基类 -----------------------
|
|
class DataHandler:
|
|
def __init__(self):
|
|
self.execution_time = Utils.get_iso_time()
|
|
self.today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
self.setup_dirs()
|
|
self.api = APIClient()
|
|
|
|
def setup_dirs(self):
|
|
"""初始化所有需要的目录"""
|
|
self.data_dir = Config.DATA_DIR
|
|
self.archive_dir = Config.ARCHIVE_DIR
|
|
Utils.ensure_dir(self.data_dir)
|
|
Utils.ensure_dir(self.archive_dir)
|
|
|
|
# 初始化数据文件路径
|
|
self.last_data_file = self.data_dir / "last_data.csv"
|
|
self.last_widget_file = self.data_dir / "last_widget_data.csv"
|
|
|
|
def load_last_data(self):
|
|
"""加载上次运行的数据"""
|
|
try:
|
|
last_data = pd.read_csv(self.last_data_file) if self.last_data_file.exists() else None
|
|
last_widget = pd.read_csv(self.last_widget_file) if self.last_widget_file.exists() else None
|
|
return last_data, last_widget
|
|
except Exception as e:
|
|
print(f"加载上次数据失败: {e}")
|
|
return None, None
|
|
|
|
def save_last_data(self, data, widget_data):
|
|
"""保存当前数据供下次比较"""
|
|
try:
|
|
success = Utils.safe_csv_write(data, self.last_data_file)
|
|
success &= Utils.safe_csv_write(widget_data, self.last_widget_file)
|
|
return success
|
|
except Exception as e:
|
|
print(f"保存数据失败: {e}")
|
|
return False
|
|
|
|
def field_replacement(self, data, final_data):
|
|
"""字段替换方法(由id替换为标签名)"""
|
|
# 这里实现具体的字段替换逻辑
|
|
# 由于具体替换规则未知,返回原始数据
|
|
return final_data
|
|
|
|
|
|
# ---------------------- 数据监控主类 -----------------------
|
|
class DataMonitor(DataHandler):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.last_data, self.last_widget = self.load_last_data()
|
|
|
|
def fetch_apps(self):
|
|
"""获取所有应用列表"""
|
|
url = "https://api.jiandaoyun.com/api/v5/app/list"
|
|
payload = json.dumps({"skip": 0, "limit": 100})
|
|
try:
|
|
response = self.api.request(url, payload)
|
|
return pd.DataFrame(response.json().get("apps", []))
|
|
except Exception as e:
|
|
print(f"获取应用列表失败: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def fetch_entries(self, app_df):
|
|
"""获取所有表单条目"""
|
|
url = "https://api.jiandaoyun.com/api/v5/app/entry/list"
|
|
all_entries = []
|
|
|
|
for _, app in app_df.iterrows():
|
|
payload = json.dumps({"app_id": app['app_id']})
|
|
try:
|
|
response = self.api.request(url, payload)
|
|
entries = response.json().get("forms", [])
|
|
|
|
if entries:
|
|
entry_df = pd.DataFrame(entries)
|
|
entry_df['app_id'] = app['app_id']
|
|
all_entries.append(entry_df)
|
|
except Exception as e:
|
|
print(f"获取表单条目失败: {e}")
|
|
continue
|
|
|
|
return pd.concat(all_entries, ignore_index=True) if all_entries else pd.DataFrame()
|
|
|
|
def fetch_widgets(self, entry_df):
|
|
"""获取所有字段组件"""
|
|
url = "https://api.jiandaoyun.com/api/v5/app/entry/widget/list"
|
|
all_widgets = []
|
|
|
|
for _, entry in entry_df.iterrows():
|
|
payload = json.dumps({
|
|
"app_id": entry['app_id'],
|
|
"entry_id": entry['entry_id']
|
|
})
|
|
try:
|
|
response = self.api.request(url, payload)
|
|
widgets = response.json().get('widgets', [])
|
|
|
|
if widgets:
|
|
widget_df = pd.DataFrame(widgets)
|
|
widget_df['app_id'] = entry['app_id']
|
|
widget_df['entry_id'] = entry['entry_id']
|
|
all_widgets.append(widget_df)
|
|
except Exception as e:
|
|
print(f"获取字段组件失败: {e}")
|
|
continue
|
|
|
|
return pd.concat(all_widgets, ignore_index=True) if all_widgets else pd.DataFrame()
|
|
|
|
def fetch_monitor_data(self):
|
|
"""获取监控数据"""
|
|
payload = {
|
|
"api_key": Config.MONITOR_APP_ID,
|
|
"entry_id": Config.MONITOR_ENTRY_ID
|
|
}
|
|
try:
|
|
data = self.api.entry_data_list(payload).get("data", [])
|
|
data_list = pd.DataFrame(data)
|
|
|
|
# 处理复杂数据类型
|
|
for col in data_list.columns:
|
|
if data_list[col].apply(lambda x: isinstance(x, (dict, list))).any():
|
|
data_list[col] = data_list[col].astype(str)
|
|
|
|
return data_list.drop_duplicates()
|
|
except Exception as e:
|
|
print(f"获取监控数据失败: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def match_widgets(self, data_list, widget_list):
|
|
"""匹配数据列表和组件列表"""
|
|
if '_widget_1750122565203' not in data_list.columns:
|
|
print("缺少必需的列 '_widget_1750122565203'")
|
|
return pd.DataFrame()
|
|
|
|
if widget_list.empty:
|
|
return pd.DataFrame()
|
|
|
|
return widget_list[widget_list['entry_id'].isin(data_list['_widget_1750122565203'])]
|
|
|
|
def archive_old_data(self):
|
|
"""归档旧数据"""
|
|
keep_dates = [
|
|
(datetime.now(timezone.utc) - timedelta(days=i)).strftime("%Y-%m-%d")
|
|
for i in range(Config.RETAIN_DAYS)
|
|
]
|
|
|
|
files_to_archive = [
|
|
f for f in self.data_dir.iterdir()
|
|
if f.is_file() and
|
|
(f.name.startswith("snapshot_") or f.name.startswith("all_widgets_")) and
|
|
f.suffix == '.csv'
|
|
]
|
|
|
|
for file_path in files_to_archive:
|
|
date_str = file_path.stem[9:] if file_path.name.startswith("snapshot_") else file_path.stem[12:]
|
|
|
|
if date_str not in keep_dates:
|
|
year_month = date_str[:7]
|
|
archive_path = self.archive_dir / f"snapshots_{year_month}.{Config.COMPRESS_FORMAT}"
|
|
|
|
try:
|
|
with zipfile.ZipFile(archive_path, 'a', zipfile.ZIP_DEFLATED) as zipf:
|
|
zipf.write(file_path, arcname=file_path.name)
|
|
file_path.unlink()
|
|
except Exception as e:
|
|
print(f"归档文件失败: {e}")
|
|
|
|
def compare_data(self, current_data):
|
|
"""比较新旧数据差异"""
|
|
if not self.last_data_file.exists() or current_data.empty:
|
|
return None
|
|
|
|
try:
|
|
last_data = pd.read_csv(self.last_data_file)
|
|
if last_data.empty:
|
|
return None
|
|
|
|
last_data['unique_id'] = last_data['name'].astype(str) + last_data['app_id'].astype(str)
|
|
current_data['unique_id'] = current_data['name'].astype(str) + current_data['app_id'].astype(str)
|
|
|
|
merged = pd.merge(
|
|
last_data, current_data,
|
|
on=['unique_id'],
|
|
how='outer',
|
|
suffixes=('_last', '_current'),
|
|
indicator=True
|
|
)
|
|
|
|
changes = {
|
|
'added': merged[merged['_merge'] == 'right_only'],
|
|
'deleted': merged[merged['_merge'] == 'left_only'],
|
|
'modified': pd.DataFrame()
|
|
}
|
|
|
|
for col in ['label', 'type']:
|
|
last_col = f"{col}_last"
|
|
current_col = f"{col}_current"
|
|
|
|
if last_col in merged.columns and current_col in merged.columns:
|
|
mask = (merged['_merge'] == 'both') & (merged[last_col] != merged[current_col])
|
|
mask = mask & ~merged[last_col].isna() & ~merged[current_col].isna()
|
|
|
|
if mask.any():
|
|
modified = merged.loc[mask].copy()
|
|
modified['changed_field'] = col
|
|
modified['old_value'] = modified[last_col]
|
|
modified['new_value'] = modified[current_col]
|
|
modified['change_status'] = 'update'
|
|
changes['modified'] = pd.concat([changes['modified'], modified])
|
|
|
|
return changes
|
|
except Exception as e:
|
|
print(f"比较数据失败: {e}")
|
|
return None
|
|
|
|
def save_changes(self, changes, apps, entries):
|
|
"""保存变更记录"""
|
|
if not changes or all(len(v) == 0 for v in changes.values()):
|
|
return False
|
|
|
|
result_rows = []
|
|
|
|
for change_type in ['added', 'deleted', 'modified']:
|
|
df = changes[change_type]
|
|
if df.empty:
|
|
continue
|
|
|
|
suffix = 'current' if change_type in ['added', 'modified'] else 'last'
|
|
|
|
for _, row in df.iterrows():
|
|
app_id = row.get(f'app_id_{suffix}')
|
|
entry_id = row.get(f'entry_id_{suffix}')
|
|
|
|
if not app_id or not entry_id:
|
|
continue
|
|
|
|
app_name = 'Unknown App'
|
|
entry_name = 'Unknown Entry'
|
|
|
|
if not apps.empty:
|
|
app_match = apps[apps['app_id'] == app_id]
|
|
if not app_match.empty:
|
|
app_name = app_match['name'].values[0]
|
|
|
|
if not entries.empty:
|
|
entry_match = entries[(entries['app_id'] == app_id) & (entries['entry_id'] == entry_id)]
|
|
if not entry_match.empty:
|
|
entry_name = entry_match['name'].values[0]
|
|
|
|
if change_type == 'added':
|
|
content = f"Added field: {row.get('label_current', 'Unknown')}"
|
|
elif change_type == 'deleted':
|
|
content = f"Deleted field: {row.get('label_last', 'Unknown')}"
|
|
else:
|
|
content = f"Changed from \"{row.get('old_value', 'Unknown')}\" to \"{row.get('new_value', 'Unknown')}\""
|
|
|
|
result_rows.append({
|
|
'timestamp': self.execution_time,
|
|
'unique_id': row.get('unique_id', ''),
|
|
'app_id': app_id,
|
|
'app_name': app_name,
|
|
'entry_id': entry_id,
|
|
'entry_name': entry_name,
|
|
'change_type': change_type,
|
|
'details': content
|
|
})
|
|
|
|
if result_rows:
|
|
result_df = pd.DataFrame(result_rows)
|
|
changes_file = Config.get_changes_file()
|
|
try:
|
|
result_df.to_csv(
|
|
changes_file,
|
|
mode='a',
|
|
header=not changes_file.exists(),
|
|
index=False,
|
|
encoding='utf-8-sig'
|
|
)
|
|
self.add_to_jiandaoyun(result_df)
|
|
return True
|
|
except Exception as e:
|
|
print(f"保存变更记录失败: {e}")
|
|
return False
|
|
return False
|
|
|
|
def add_to_jiandaoyun(self, result_df):
|
|
"""将变更记录写入简道云"""
|
|
all_data = [{
|
|
"_widget_1751446961315": {"value": row["app_name"]},
|
|
"_widget_1751446961316": {"value": row["entry_name"]},
|
|
"_widget_1751446961317": {"value": row["change_type"]},
|
|
"_widget_1751446961318": {"value": row["details"]},
|
|
"_widget_1751446961319": {"value": row["timestamp"]},
|
|
} for _, row in result_df.iterrows()]
|
|
|
|
payload = {
|
|
"api_key": Config.MONITOR_APP_ID,
|
|
"entry_id": Config.CHANGES_ENTRY_ID,
|
|
"data_list": all_data
|
|
}
|
|
|
|
try:
|
|
response = self.api.entry_data_batch_create(payload)
|
|
return isinstance(response, list) and len(response) > 0
|
|
except Exception as e:
|
|
print(f"写入简道云失败: {e}")
|
|
return False
|
|
|
|
def send_task_status(self, task_start_time: str, task_name: str) -> None:
|
|
"""将任务状态发送到简道云"""
|
|
try:
|
|
end_time_utc = datetime.now(timezone.utc)
|
|
task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S")
|
|
task_start_utc = task_start_naive - timedelta(hours=8)
|
|
task_start_utc = task_start_utc.replace(tzinfo=timezone.utc)
|
|
|
|
run_time = end_time_utc - task_start_utc
|
|
run_time_sec = int(run_time.total_seconds())
|
|
|
|
today_utc = end_time_utc.strftime("%Y-%m-%d")
|
|
task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
payload = {
|
|
"api_key": Config.MONITOR_APP_ID,
|
|
"entry_id": Config.STATUS_ENTRY_ID,
|
|
"data": {
|
|
"_widget_1744873387500": {"value": today_utc},
|
|
"_widget_1743644977694": {"value": task_name},
|
|
"_widget_1744873387501": {"value": task_start_iso},
|
|
"_widget_1744873387502": {"value": task_end_iso},
|
|
"_widget_1744873387504": {"value": run_time_sec},
|
|
}
|
|
}
|
|
|
|
self.api.data_batch_create(payload)
|
|
except Exception as e:
|
|
print(f"发送任务状态失败: {e}")
|
|
|
|
def run_daily_snapshot(self):
|
|
"""执行每日快照任务"""
|
|
try:
|
|
apps = self.fetch_apps()
|
|
entries = self.fetch_entries(apps)
|
|
widgets = self.fetch_widgets(entries)
|
|
monitor_data = self.fetch_monitor_data()
|
|
matched_data = self.match_widgets(monitor_data, widgets)
|
|
|
|
if matched_data.empty:
|
|
print("没有匹配到数据")
|
|
return False
|
|
|
|
today_file = self.data_dir / f"snapshot_{self.today}.csv"
|
|
widget_file = self.data_dir / f"all_widgets_{self.today}.csv"
|
|
|
|
if not Utils.safe_csv_write(matched_data, today_file):
|
|
print("保存快照数据失败")
|
|
return False
|
|
if not Utils.safe_csv_write(widgets, widget_file):
|
|
print("保存组件数据失败")
|
|
return False
|
|
|
|
self.archive_old_data()
|
|
self.save_last_data(matched_data, widgets)
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"每日快照任务失败: {e}")
|
|
return False
|
|
|
|
def run_hourly_check(self):
|
|
"""执行每小时检查任务"""
|
|
try:
|
|
apps = self.fetch_apps()
|
|
entries = self.fetch_entries(apps)
|
|
widgets = self.fetch_widgets(entries)
|
|
monitor_data = self.fetch_monitor_data()
|
|
current_data = self.match_widgets(monitor_data, widgets)
|
|
|
|
changes = self.compare_data(current_data)
|
|
if changes and any(len(v) > 0 for v in changes.values()):
|
|
self.save_changes(changes, apps, entries)
|
|
|
|
self.save_last_data(current_data, widgets)
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"每小时检查任务失败: {e}")
|
|
return False
|
|
|
|
def main(self):
|
|
"""主运行逻辑"""
|
|
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
try:
|
|
if Utils.is_first_run_today():
|
|
success = self.run_daily_snapshot()
|
|
else:
|
|
success = self.run_hourly_check()
|
|
|
|
self.send_task_status(task_start_time, "字段监控")
|
|
return success
|
|
except Exception as e:
|
|
print(f"主运行逻辑失败: {e}")
|
|
return False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# 确保输出目录存在
|
|
Utils.ensure_dir(Config.OUTPUT_DIR)
|
|
|
|
# 运行监控任务
|
|
monitor = DataMonitor()
|
|
if not monitor.main():
|
|
sys.exit(1) |