Files
saas/back_ground_module/data_monitor.py
T

461 lines
17 KiB
Python

"""字段监控(多平台适配版)"""
import sys
import platform
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pandas as pd
import zipfile
import json
import requests
import time
from api import API
from log_config import configure_task_logger, configure_error_task_logger
from back_ground_module import CommonModule
# 初始化日志记录器
logger = configure_task_logger()
error_task_logger = configure_error_task_logger()
common_tools = CommonModule()
# ---------------------------- 配置项(多平台适配)---------------------------
class Config:
# 基础路径(使用Path对象自动处理跨平台路径)
BASE_DIR = Path(__file__).parent.absolute()
OUTPUT_DIR = BASE_DIR / "output"
# 数据存储目录(英文命名避免编码问题)
DATA_DIR = OUTPUT_DIR / "data_snapshots"
ARCHIVE_DIR = OUTPUT_DIR / "archives"
# 运行参数
RETAIN_DAYS = 7
COMPRESS_FORMAT = "zip"
MAX_RETRIES = 3
RETRY_DELAY = 0.5 # 秒
@classmethod
def get_log_file(cls):
"""获取跨平台兼容的日志文件路径"""
return cls.OUTPUT_DIR / "data_monitor.log"
@classmethod
def get_changes_file(cls):
"""获取跨平台兼容的变更记录文件路径"""
return cls.OUTPUT_DIR / "changes_summary.csv"
# ---------------------- 工具函数(多平台兼容)-----------------------
class Utils:
@staticmethod
def ensure_dir(path):
"""确保目录存在(自动处理Path对象或字符串)"""
path = Path(path) if not isinstance(path, Path) else path
try:
path.mkdir(parents=True, exist_ok=True)
logger.debug(f"Directory ensured: {path}")
return True
except Exception as e:
error_task_logger.error(f"Failed to create directory {path}: {str(e)}")
return False
@staticmethod
def get_iso_time():
"""获取ISO格式的UTC时间"""
return datetime.now(timezone.utc).isoformat(timespec='milliseconds') + "Z"
@staticmethod
def is_first_run_today():
"""检查是否是今天的第一次运行"""
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
snapshot_file = Config.DATA_DIR / f"snapshot_{today}.csv"
widget_file = Config.DATA_DIR / f"all_widgets_{today}.csv"
return not (snapshot_file.exists() and widget_file.exists())
@staticmethod
def safe_csv_write(df, file_path):
"""安全的CSV写入(带临时文件和错误处理)"""
file_path = Path(file_path)
temp_file = file_path.parent / (file_path.name + '.tmp')
try:
df.to_csv(temp_file, index=False, encoding='utf-8-sig')
if file_path.exists():
file_path.unlink()
temp_file.rename(file_path)
return True
except Exception as e:
error_task_logger.error(f"Failed to write {file_path}: {str(e)}")
if temp_file.exists():
temp_file.unlink()
return False
# ---------------------- API客户端(多平台兼容)-----------------------
class APIClient:
def __init__(self):
self.headers = {
'Authorization': 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN',
'Content-Type': 'application/json'
}
self.api = API()
def request(self, url, payload, method='POST'):
"""带重试机制的API请求"""
for retry in range(Config.MAX_RETRIES + 1):
try:
response = requests.request(
method, url,
headers=self.headers,
data=payload,
timeout=30
)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
if retry == Config.MAX_RETRIES:
raise
time.sleep(Config.RETRY_DELAY)
logger.warning(f"Request failed (attempt {retry + 1}/{Config.MAX_RETRIES}): {str(e)}")
# ---------------------- 数据处理基类 -----------------------
class DataHandler:
def __init__(self):
self.execution_time = Utils.get_iso_time()
self.today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
self.setup_dirs()
self.api = APIClient()
def setup_dirs(self):
"""初始化所有需要的目录"""
self.data_dir = Config.DATA_DIR
self.archive_dir = Config.ARCHIVE_DIR
Utils.ensure_dir(self.data_dir)
Utils.ensure_dir(self.archive_dir)
# 初始化数据文件路径
self.last_data_file = self.data_dir / "last_data.csv"
self.last_widget_file = self.data_dir / "last_widget_data.csv"
def load_last_data(self):
"""加载上次运行的数据"""
try:
last_data = pd.read_csv(self.last_data_file) if self.last_data_file.exists() else None
last_widget = pd.read_csv(self.last_widget_file) if self.last_widget_file.exists() else None
return last_data, last_widget
except Exception as e:
error_task_logger.error(f"Failed to load last data: {str(e)}")
return None, None
def save_last_data(self, data, widget_data):
"""保存当前数据供下次比较"""
try:
success = Utils.safe_csv_write(data, self.last_data_file)
success &= Utils.safe_csv_write(widget_data, self.last_widget_file)
return success
except Exception as e:
error_task_logger.error(f"Failed to save current data: {str(e)}")
return False
# ---------------------- 数据监控主类 -----------------------
class DataMonitor(DataHandler):
def __init__(self):
super().__init__()
self.last_data, self.last_widget = self.load_last_data()
def fetch_apps(self):
"""获取所有应用列表"""
url = "https://api.jiandaoyun.com/api/v5/app/list"
payload = json.dumps({"skip": 0, "limit": 100})
response = self.api.request(url, payload)
return pd.DataFrame(response.json().get("apps", []))
def fetch_entries(self, app_df):
"""获取所有表单条目"""
url = "https://api.jiandaoyun.com/api/v5/app/entry/list"
all_entries = []
for _, app in app_df.iterrows():
payload = json.dumps({"app_id": app['app_id']})
response = self.api.request(url, payload)
entries = response.json().get("forms", [])
if entries:
entry_df = pd.DataFrame(entries)
entry_df['app_id'] = app['app_id']
all_entries.append(entry_df)
return pd.concat(all_entries, ignore_index=True) if all_entries else None
def fetch_widgets(self, entry_df):
"""获取所有字段组件"""
url = "https://api.jiandaoyun.com/api/v5/app/entry/widget/list"
all_widgets = []
for _, entry in entry_df.iterrows():
payload = json.dumps({
"app_id": entry['app_id'],
"entry_id": entry['entry_id']
})
response = self.api.request(url, payload)
widgets = response.json().get('widgets', [])
if widgets:
widget_df = pd.DataFrame(widgets)
widget_df['app_id'] = entry['app_id']
widget_df['entry_id'] = entry['entry_id']
all_widgets.append(widget_df)
return pd.concat(all_widgets, ignore_index=True) if all_widgets else None
def fetch_monitor_data(self):
"""获取监控数据"""
payload = {
"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "6850c044f17c934b3ec01fea"
}
data = self.api.api.entry_data_list(payload).get("data")
data_list = pd.DataFrame(data)
# 处理复杂数据类型
for col in data_list.columns:
if data_list[col].apply(lambda x: isinstance(x, (dict, list))).any():
data_list[col] = data_list[col].astype(str)
return data_list.drop_duplicates()
def match_widgets(self, data_list, widget_list):
"""匹配数据列表和组件列表"""
if '_widget_1750122565203' not in data_list.columns:
raise ValueError("Missing required column '_widget_1750122565203'")
return widget_list[widget_list['entry_id'].isin(data_list['_widget_1750122565203'])]
def archive_old_data(self):
"""归档旧数据"""
keep_dates = [
(datetime.now(timezone.utc) - timedelta(days=i)).strftime("%Y-%m-%d")
for i in range(Config.RETAIN_DAYS)
]
# 查找需要归档的文件
files_to_archive = [
f for f in self.data_dir.iterdir()
if f.is_file() and
(f.name.startswith("snapshot_") or f.name.startswith("all_widgets_")) and
f.suffix == '.csv'
]
for file_path in files_to_archive:
date_str = file_path.stem[9:] if file_path.name.startswith("snapshot_") else file_path.stem[12:]
if date_str not in keep_dates:
year_month = date_str[:7]
archive_path = self.archive_dir / f"snapshots_{year_month}.{Config.COMPRESS_FORMAT}"
try:
with zipfile.ZipFile(archive_path, 'a', zipfile.ZIP_DEFLATED) as zipf:
zipf.write(file_path, arcname=file_path.name)
file_path.unlink()
logger.debug(f"Archived {file_path.name} to {archive_path}")
except Exception as e:
error_task_logger.error(f"Failed to archive {file_path}: {str(e)}")
def compare_data(self, current_data):
"""比较新旧数据差异"""
if not self.last_data_file.exists():
return None
last_data = pd.read_csv(self.last_data_file)
last_data['unique_id'] = last_data['name'].astype(str) + last_data['app_id'].astype(str)
current_data['unique_id'] = current_data['name'].astype(str) + current_data['app_id'].astype(str)
merged = pd.merge(
last_data, current_data,
on=['unique_id'],
how='outer',
suffixes=('_last', '_current'),
indicator=True
)
changes = {
'added': merged[merged['_merge'] == 'right_only'],
'deleted': merged[merged['_merge'] == 'left_only'],
'modified': pd.DataFrame()
}
for col in ['label', 'type']:
last_col = f"{col}_last"
current_col = f"{col}_current"
if last_col in merged.columns and current_col in merged.columns:
mask = (merged['_merge'] == 'both') & (merged[last_col] != merged[current_col])
mask = mask & ~merged[last_col].isna() & ~merged[current_col].isna()
if mask.any():
modified = merged.loc[mask].copy()
modified['changed_field'] = col
modified['old_value'] = modified[last_col]
modified['new_value'] = modified[current_col]
modified['change_status'] = 'update'
changes['modified'] = pd.concat([changes['modified'], modified])
return changes
def save_changes(self, changes, apps, entries):
"""保存变更记录"""
result_rows = []
for change_type in ['added', 'deleted', 'modified']:
suffix = 'current' if change_type in ['added', 'modified'] else 'last'
for _, row in changes[change_type].iterrows():
app_id = row[f'app_id_{suffix}']
entry_id = row[f'entry_id_{suffix}']
app_name = apps.loc[apps['app_id'] == app_id, 'name'].values[0] if not apps[
apps['app_id'] == app_id].empty else 'Unknown App'
entry_name = entries.loc[
(entries['app_id'] == app_id) & (entries['entry_id'] == entry_id), 'name'
].values[0] if not entries[
(entries['app_id'] == app_id) & (entries['entry_id'] == entry_id)
].empty else 'Unknown Entry'
if change_type == 'added':
content = f"Added field: {row['label_current']}"
elif change_type == 'deleted':
content = f"Deleted field: {row['label_last']}"
else:
content = f"Changed from \"{row['old_value']}\" to \"{row['new_value']}\""
result_rows.append({
'timestamp': self.execution_time,
'unique_id': row['unique_id'],
'app_id': app_id,
'app_name': app_name,
'entry_id': entry_id,
'entry_name': entry_name,
'change_type': change_type,
'details': content
})
if result_rows:
result_df = pd.DataFrame(result_rows)
changes_file = Config.get_changes_file()
try:
# 追加模式写入,保留历史记录
result_df.to_csv(
changes_file,
mode='a',
header=not changes_file.exists(),
index=False,
encoding='utf-8-sig'
)
self.add_to_jiandaoyun(result_df)
return True
except Exception as e:
error_task_logger.error(f"Failed to save changes: {str(e)}")
return False
return False
def add_to_jiandaoyun(self, result_df):
"""将变更记录写入简道云"""
all_data = [{
"_widget_1751446961315": {"value": row["app_name"]},
"_widget_1751446961316": {"value": row["entry_name"]},
"_widget_1751446961317": {"value": row["change_type"]},
"_widget_1751446961318": {"value": row["details"]},
"_widget_1751446961319": {"value": row["timestamp"]},
} for _, row in result_df.iterrows()]
payload = {
"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "6863a402a77925690a470cc5",
"data_list": all_data
}
response = self.api.api.entry_data_batch_create(payload)
if isinstance(response, list):
logger.info(f"Successfully wrote {len(response)} records to Jiandaoyun")
return True
else:
error_task_logger.error(f"Failed to write to Jiandaoyun: {response.get('message', 'Unknown error')}")
return False
def run_daily_snapshot(self):
"""执行每日快照任务"""
logger.info("=== Starting daily snapshot task ===")
try:
apps = self.fetch_apps()
entries = self.fetch_entries(apps)
widgets = self.fetch_widgets(entries)
monitor_data = self.fetch_monitor_data()
matched_data = self.match_widgets(monitor_data, widgets)
# 保存数据
today_file = self.data_dir / f"snapshot_{self.today}.csv"
widget_file = self.data_dir / f"all_widgets_{self.today}.csv"
if not Utils.safe_csv_write(matched_data, today_file):
raise Exception("Failed to save snapshot data")
if not Utils.safe_csv_write(widgets, widget_file):
raise Exception("Failed to save widget data")
self.archive_old_data()
self.save_last_data(matched_data, widgets)
logger.info("=== Daily snapshot task completed successfully ===")
return True
except Exception as e:
error_task_logger.error(f"Daily snapshot task failed: {str(e)}")
return False
def run_hourly_check(self):
"""执行每小时检查任务"""
logger.info("=== Starting hourly check task ===")
try:
apps = self.fetch_apps()
entries = self.fetch_entries(apps)
widgets = self.fetch_widgets(entries)
monitor_data = self.fetch_monitor_data()
current_data = self.match_widgets(monitor_data, widgets)
changes = self.compare_data(current_data)
if changes and any(len(v) > 0 for v in changes.values()):
self.save_changes(changes, apps, entries)
self.save_last_data(current_data, widgets)
logger.info("=== Hourly check task completed successfully ===")
return True
except Exception as e:
error_task_logger.error(f"Hourly check task failed: {str(e)}")
return False
def main(self):
"""主运行逻辑"""
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
logger.info(f"=== Starting data monitoring task ({self.execution_time}) ===")
if Utils.is_first_run_today():
success = self.run_daily_snapshot()
else:
success = self.run_hourly_check()
common_tools.send_task_status(task_start_time, "字段监控")
logger.info("=== Data monitoring task completed ===")
return success
except Exception as e:
error_task_logger.error(f"Data monitoring task failed: {e}")
common_tools.send_task_error(task_start_time, "字段监控", str(e))
return False
if __name__ == "__main__":
# 确保输出目录存在
Utils.ensure_dir(Config.OUTPUT_DIR)
# 运行监控任务
monitor = DataMonitor()
if not monitor.main():
sys.exit(1)