Files
saas/test/字段监控.py
T
2025-08-12 13:43:10 +08:00

634 lines
26 KiB
Python

import sys
from datetime import datetime, timedelta, timezone
import pandas as pd
import zipfile
import logging
from pathlib import Path
import json
import requests
from api import API
import time
import os
# ---------------------------- 配置项 ----------------------------
output_dir = "output" # 设置输出目录
os.makedirs(output_dir, exist_ok=True) # 创建输出目录(如果不存在)
DATA_DIR = "数据快照存储" # 数据快照存储目录
ARCHIVE_DIR = "压缩包存储" # 压缩包存储目录
RETAIN_DAYS = 7 # 保留最近多少天的数据
COMPRESS_FORMAT = "zip" # 压缩格式
LOG_FILE = "data_monitor.log" # 日志文件路径
CHANGES_FILE = "changes_summary.csv" # 变更汇总文件路径
MAX_RETRIES = 3 # 最大重试次数
RETRY_DELAY = 0.5 # 重试延迟时间(秒)
# ---------------------- 初始化日志配置 -----------------------
def setup_logging():
"""配置日志记录"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler(LOG_FILE)
]
)
return logging.getLogger(__name__)
logger = setup_logging()
# ---------------------- 工具函数 -----------------------
def get_system_agnostic_path(*path_parts):
"""获取跨平台兼容的路径"""
return str(Path(*path_parts))
def ensure_directory(path):
"""确保目录存在(兼容所有平台)"""
Path(path).mkdir(parents=True, exist_ok=True)
logger.debug(f"确保目录存在: {path}")
def get_iso8601_time():
"""获取当前时间的ISO 8601格式字符串 (UTC)"""
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
def is_first_run_today():
"""判断是否是今天的第一次运行"""
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
snapshot_file = get_system_agnostic_path(output_dir, DATA_DIR, f"snapshot_{today}.csv")
widget_file = get_system_agnostic_path(output_dir, DATA_DIR, f"all_widgets_{today}.csv")
# 如果快照文件和完整字段文件都已存在,说明今天已经运行过
if os.path.exists(snapshot_file) and os.path.exists(widget_file):
logger.info(f"检测到今日文件已存在: {snapshot_file}{widget_file}")
return False
return True
# ---------------------- 数据监控类 -----------------------
class DataMonitor:
def __init__(self):
self.execution_time = get_iso8601_time()
self.today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
# 初始化目录
self.data_dir = get_system_agnostic_path(output_dir, DATA_DIR)
self.archive_dir = get_system_agnostic_path(output_dir, ARCHIVE_DIR)
ensure_directory(self.data_dir)
ensure_directory(self.archive_dir)
# 初始化上次数据文件路径
self.last_data_file = get_system_agnostic_path(self.data_dir, "last_data.csv")
self.last_widget_data_file = get_system_agnostic_path(self.data_dir, "last_widget_data.csv")
self.api_instance = API()
self.headers = {
'Authorization': 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN',
'Content-Type': 'application/json'
}
# 加载上次数据
self._load_last_data()
def _load_last_data(self):
"""从文件加载上次的数据"""
try:
if os.path.exists(self.last_data_file):
self.last_data = pd.read_csv(self.last_data_file)
logger.info(f"从文件加载上次数据: {self.last_data_file}")
else:
logger.info("没有找到上次数据文件")
self.last_data = None
if os.path.exists(self.last_widget_data_file):
self.last_widget_data = pd.read_csv(self.last_widget_data_file)
logger.info(f"从文件加载上次字段数据: {self.last_widget_data_file}")
else:
logger.info("没有找到上次字段数据文件")
self.last_widget_data = None
except Exception as e:
logger.error(f"加载上次数据失败: {str(e)}")
self.last_data = None
self.last_widget_data = None
def _save_last_data(self, data, widget_data):
"""保存当前数据到文件"""
try:
data.to_csv(self.last_data_file, index=False)
widget_data.to_csv(self.last_widget_data_file, index=False)
logger.info(f"成功保存当前数据到文件: {self.last_data_file}{self.last_widget_data_file}")
return True
except Exception as e:
logger.error(f"保存当前数据失败: {str(e)}")
return False
def make_api_request(self, url, payload, method='POST'):
"""带重试机制的API请求"""
retries = 0
while retries <= MAX_RETRIES:
try:
response = requests.request(
method,
url,
headers=self.headers,
data=payload,
timeout=30
)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
retries += 1
if retries <= MAX_RETRIES:
logger.warning(f"请求失败 (尝试 {retries}/{MAX_RETRIES}): {str(e)}")
time.sleep(RETRY_DELAY)
else:
logger.error(f"请求失败,已达到最大重试次数 {MAX_RETRIES}")
raise
return None
def fetch_app_data(self):
"""获取应用数据"""
url = "https://api.jiandaoyun.com/api/v5/app/list"
payload = json.dumps({"skip": 0, "limit": 100})
try:
response = self.make_api_request(url, payload)
apps = response.json().get("apps", [])
all_app_id = pd.DataFrame(apps)
return all_app_id
except Exception as e:
logger.error(f"获取应用数据失败: {str(e)}")
raise
def fetch_entry_data(self, app_df):
"""获取表单数据"""
all_entries = []
url = "https://api.jiandaoyun.com/api/v5/app/entry/list"
for _, app_row in app_df.iterrows():
retries = 0
while retries <= MAX_RETRIES:
try:
payload = json.dumps({"app_id": app_row['app_id']})
response = self.make_api_request(url, payload)
entries = response.json().get("forms", [])
if entries:
entry_df = pd.DataFrame(entries)
entry_df['app_id'] = app_row['app_id']
all_entries.append(entry_df)
break
except Exception as e:
retries += 1
if retries > MAX_RETRIES:
logger.error(f"获取应用 {app_row['app_id']} 的表单数据失败: {str(e)}")
break
time.sleep(RETRY_DELAY)
return pd.concat(all_entries, ignore_index=True) if all_entries else None
def fetch_widget_data(self, entry_df):
"""获取字段数据"""
all_widgets = []
url = "https://api.jiandaoyun.com/api/v5/app/entry/widget/list"
for _, entry_row in entry_df.iterrows():
retries = 0
while retries <= MAX_RETRIES:
try:
payload = json.dumps({
"app_id": entry_row['app_id'],
"entry_id": entry_row['entry_id']
})
response = self.make_api_request(url, payload)
response_data = response.json()
widgets = response_data.get('widgets', [])
# data_modify_time = response_data.get('dataModifyTime', '')
if widgets:
widget_df = pd.DataFrame(widgets)
widget_df['app_id'] = entry_row['app_id']
widget_df['entry_id'] = entry_row['entry_id']
# widget_df['dataModifyTime'] = data_modify_time
all_widgets.append(widget_df)
break
except Exception as e:
retries += 1
if retries > MAX_RETRIES:
logger.error(f"获取表单 {entry_row['entry_id']} 的字段数据失败: {str(e)}")
break
time.sleep(RETRY_DELAY)
return pd.concat(all_widgets, ignore_index=True) if all_widgets else None
def save_all_widgets_data(self, widget_data):
"""保存完整字段数据"""
try:
filename = get_system_agnostic_path(self.data_dir, f"all_widgets_{self.today}.csv")
widget_data = widget_data.copy()
# 使用临时文件确保写入安全
temp_file = filename + '.tmp'
widget_data.to_csv(temp_file, index=False)
# 替换原文件
if os.path.exists(filename):
os.remove(filename)
os.rename(temp_file, filename)
logger.info(f"成功保存完整字段数据: {filename}")
return True
except Exception as e:
logger.error(f"保存完整字段数据失败: {str(e)}")
return False
def fetch_monitor_data(self):
"""获取待监控表单数据"""
retries = 0
while retries <= MAX_RETRIES:
try:
payload = {"api_key": "6694d3c4fcb69ca9a111a6c4", "entry_id": "6850c044f17c934b3ec01fea"}
data = self.api_instance.entry_data_list(payload).get("data")
data_list = pd.DataFrame(data)
for col in data_list.columns:
if data_list[col].apply(lambda x: isinstance(x, (dict, list))).any():
data_list[col] = data_list[col].astype(str)
return data_list.drop_duplicates()
except Exception as e:
retries += 1
if retries > MAX_RETRIES:
logger.error(f"获取待监控表单数据失败: {str(e)}")
raise
time.sleep(RETRY_DELAY)
return None
def match_widget_data(self, data_list, widget_list):
"""匹配字段数据"""
try:
if '_widget_1750122565203' not in data_list.columns:
raise ValueError("数据列表中缺少 '_widget_1750122565203'")
matched = widget_list[widget_list['entry_id'].isin(data_list['_widget_1750122565203'])]
logger.info(f"匹配到 {len(matched)} 条字段数据")
return matched
except Exception as e:
logger.error(f"字段数据匹配失败: {str(e)}")
raise
def save_daily_snapshot(self, data):
"""保存当日数据快照"""
try:
filename = get_system_agnostic_path(self.data_dir, f"snapshot_{self.today}.csv")
data = data.copy()
data['unique_id'] = data['name'].astype(str) + data['app_id'].astype(str)
# if 'dataModifyTime' not in data.columns:
# data['dataModifyTime'] = ''
# 使用临时文件确保写入安全
temp_file = filename + '.tmp'
data.to_csv(temp_file, index=False)
# 替换原文件
if os.path.exists(filename):
os.remove(filename)
os.rename(temp_file, filename)
logger.info(f"成功保存今日数据快照: {filename}")
return True
except Exception as e:
logger.error(f"保存数据快照失败: {str(e)}")
return False
def archive_old_snapshots(self):
"""归档7天前的数据快照(包括完整字段数据)"""
try:
keep_dates = [(datetime.now(timezone.utc) - timedelta(days=i)).strftime("%Y-%m-%d")
for i in range(RETAIN_DAYS)]
# 归档普通数据快照
all_files = [f for f in os.listdir(self.data_dir)
if f.startswith("snapshot_") and f.endswith(".csv")]
# 归档完整字段数据
widget_files = [f for f in os.listdir(self.data_dir)
if f.startswith("all_widgets_") and f.endswith(".csv")]
all_files.extend(widget_files)
archived_files = 0
for filename in all_files:
# 从文件名中提取日期
if filename.startswith("snapshot_"):
date_str = filename[9:-4]
elif filename.startswith("all_widgets_"):
date_str = filename[12:-4]
else:
continue
if date_str not in keep_dates:
year_month = date_str[:7]
archive_name = get_system_agnostic_path(self.archive_dir,
f"snapshots_{year_month}.{COMPRESS_FORMAT}")
file_path = get_system_agnostic_path(self.data_dir, filename)
with zipfile.ZipFile(archive_name, 'a', zipfile.ZIP_DEFLATED) as zipf:
zipf.write(file_path, arcname=filename)
os.remove(file_path)
archived_files += 1
logger.debug(f"已归档 {filename}{archive_name}")
logger.info(f"归档完成,共处理 {archived_files} 个文件")
return True
except Exception as e:
logger.error(f"归档过程中出错: {str(e)}")
return False
def compare_with_last_run(self, current_data):
"""与上次运行的数据比较"""
if not os.path.exists(self.last_data_file):
logger.warning("没有找到上次数据文件可供比较")
return None
try:
# 从文件加载上次数据
last_data = pd.read_csv(self.last_data_file)
# 确保有必要的列
if 'unique_id' not in last_data.columns:
last_data['unique_id'] = last_data['name'].astype(str) + last_data['app_id'].astype(str)
if 'unique_id' not in current_data.columns:
current_data['unique_id'] = current_data['name'].astype(str) + current_data['app_id'].astype(str)
# 合并数据
merged = pd.merge(
last_data,
current_data,
on=['unique_id'],
how='outer',
suffixes=('_last', '_current'),
indicator=True
)
changes = {
'added': merged[merged['_merge'] == 'right_only'].copy(),
'deleted': merged[merged['_merge'] == 'left_only'].copy(),
'modified': pd.DataFrame()
}
# 比较指定字段的变化
compare_fields = ['label', 'type']
for col in compare_fields:
last_col = f"{col}_last"
current_col = f"{col}_current"
if last_col in merged.columns and current_col in merged.columns:
mask = merged[last_col] != merged[current_col]
if mask.any():
modified = merged.loc[mask].copy()
modified['changed_field'] = col
modified['old_value'] = modified[last_col]
modified['new_value'] = modified[current_col]
modified['change_status'] = 'update'
changes['modified'] = pd.concat([changes['modified'], modified])
# 记录比较结果统计
logger.info(
f"数据比较结果: 新增 {len(changes['added'])} 条, "
f"删除 {len(changes['deleted'])} 条, "
f"修改 {len(changes['modified'])}"
)
return changes
except Exception as e:
logger.error(f"数据比较失败: {str(e)}", exc_info=True)
return None
def save_changes_to_csv(self, changes, all_app_id, all_entries):
"""将变更数据保存到CSV文件"""
try:
result_rows = []
if not changes['added'].empty:
for _, row in changes['added'].iterrows():
app_name = all_app_id.loc[all_app_id['app_id'] == row['app_id_current'], 'name'].values[0] \
if not all_app_id[all_app_id['app_id'] == row['app_id_current']].empty else '未知应用'
entry_name = all_entries.loc[(all_entries['app_id'] == row['app_id_current']) &
(all_entries['entry_id'] == row['entry_id_current']), 'name'].values[0] \
if not all_entries[(all_entries['app_id'] == row['app_id_current']) &
(all_entries['entry_id'] == row['entry_id_current'])].empty else '未知表单'
result_rows.append({
'程序执行时间': self.execution_time,
'unique_id': row['unique_id'],
'app_id': row['app_id_current'],
'app_name': app_name,
'entry_id': row['entry_id_current'],
'entry_name': entry_name,
'change_type': '新增',
'具体内容': f"新增字段: {row['label_current']}"
})
if not changes['deleted'].empty:
for _, row in changes['deleted'].iterrows():
app_name = all_app_id.loc[all_app_id['app_id'] == row['app_id_last'], 'name'].values[0] \
if not all_app_id[all_app_id['app_id'] == row['app_id_last']].empty else '未知应用'
entry_name = all_entries.loc[(all_entries['app_id'] == row['app_id_last']) &
(all_entries['entry_id'] == row['entry_id_last']), 'name'].values[
0] \
if not all_entries[(all_entries['app_id'] == row['app_id_last']) &
(all_entries['entry_id'] == row['entry_id_last'])].empty else '未知表单'
result_rows.append({
'程序执行时间': self.execution_time,
'unique_id': row['unique_id'],
'app_id': row['app_id_last'],
'app_name': app_name,
'entry_id': row['entry_id_last'],
'entry_name': entry_name,
'change_type': '删除',
'具体内容': f"删除字段: {row['label_last']}"
})
if not changes['modified'].empty:
modified_df = changes['modified'][changes['modified']['change_status'] == 'update']
for _, row in modified_df.iterrows():
app_name = all_app_id.loc[all_app_id['app_id'] == row['app_id_current'], 'name'].values[0] \
if not all_app_id[all_app_id['app_id'] == row['app_id_current']].empty else '未知应用'
entry_name = all_entries.loc[(all_entries['app_id'] == row['app_id_current']) &
(all_entries['entry_id'] == row['entry_id_current']), 'name'].values[0] \
if not all_entries[(all_entries['app_id'] == row['app_id_current']) &
(all_entries['entry_id'] == row['entry_id_current'])].empty else '未知表单'
result_rows.append({
'程序执行时间': self.execution_time,
'unique_id': row['unique_id'],
'app_id': row['app_id_current'],
'app_name': app_name,
'entry_id': row['entry_id_current'],
'entry_name': entry_name,
'change_type': '修改',
'具体内容': f"\"{row['old_value']}\"修改为\"{row['new_value']}\""
})
if result_rows:
result_df = pd.DataFrame(result_rows)
changes_file = get_system_agnostic_path(self.data_dir, CHANGES_FILE)
if os.path.exists(changes_file):
result_df.to_csv(changes_file, mode='a', header=False, index=False, encoding='utf-8-sig')
else:
result_df.to_csv(changes_file, index=False, encoding='utf-8-sig')
logger.info(f"变更数据已保存到 {changes_file}")
return True
else:
logger.info("没有检测到任何变更,不生成变更文件")
return False
except Exception as e:
logger.error(f"保存变更数据到CSV失败: {str(e)}", exc_info=True)
return False
def run_daily_snapshot(self):
"""执行每日数据快照任务"""
logger.info("=== 开始每日数据快照任务 ===")
try:
logger.info("获取应用数据...")
app_df = self.fetch_app_data()
logger.info(f"获取到 {len(app_df)} 个应用")
logger.info("获取表单数据...")
entry_df = self.fetch_entry_data(app_df)
if entry_df is None:
raise RuntimeError("没有获取到表单数据")
logger.info(f"获取到 {len(entry_df)} 个表单")
logger.info("获取字段数据...")
widget_df = self.fetch_widget_data(entry_df)
if widget_df is None:
raise RuntimeError("没有获取到字段数据")
logger.info(f"获取到 {len(widget_df)} 个字段")
# 保存完整字段数据
logger.info("保存完整字段数据...")
if not self.save_all_widgets_data(widget_df):
raise RuntimeError("保存完整字段数据失败")
logger.info("获取待监控表单数据...")
data_list = self.fetch_monitor_data()
logger.info("待监控数据获取成功")
logger.info("匹配字段数据...")
matched_data = self.match_widget_data(data_list, widget_df)
logger.info(f"匹配完成,共找到 {len(matched_data)} 条记录")
logger.info("保存今日数据快照...")
if not self.save_daily_snapshot(matched_data):
raise RuntimeError("保存今日快照失败")
logger.info("归档旧数据...")
if not self.archive_old_snapshots():
raise RuntimeError("归档旧数据失败")
# 保存当前数据用于后续比较
self._save_last_data(matched_data.copy(), widget_df.copy())
logger.info("=== 每日数据快照任务成功完成 ===")
return True
except Exception as e:
logger.error(f"每日快照任务执行失败: {str(e)}", exc_info=True)
return False
def run_hourly_check(self):
"""执行每小时数据检查任务"""
logger.info("=== 开始每小时数据检查任务 ===")
try:
logger.info("获取应用数据...")
app_df = self.fetch_app_data()
logger.info(f"获取到 {len(app_df)} 个应用")
logger.info("获取表单数据...")
entry_df = self.fetch_entry_data(app_df)
if entry_df is None:
raise RuntimeError("没有获取到表单数据")
logger.info(f"获取到 {len(entry_df)} 个表单")
logger.info("获取字段数据...")
widget_df = self.fetch_widget_data(entry_df)
if widget_df is None:
raise RuntimeError("没有获取到字段数据")
logger.info(f"获取到 {len(widget_df)} 个字段")
logger.info("获取待监控表单数据...")
data_list = self.fetch_monitor_data()
logger.info("待监控数据获取成功")
logger.info("匹配字段数据...")
current_data = self.match_widget_data(data_list, widget_df)
logger.info(f"匹配完成,共找到 {len(current_data)} 条记录")
logger.info("比较数据变化...")
changes = self.compare_with_last_run(current_data)
if changes is None:
logger.info("没有可比较的数据变更")
return True
if not changes or not any(len(v) > 0 for v in changes.values()):
logger.info("没有检测到任何变更")
return True
if not self.save_changes_to_csv(changes, app_df, entry_df):
raise RuntimeError("保存变更数据失败")
# 更新上次数据为当前数据
self._save_last_data(current_data.copy(), widget_df.copy())
logger.info("=== 每小时数据检查任务成功完成 ===")
return True
except Exception as e:
logger.error(f"每小时检查任务执行失败: {str(e)}", exc_info=True)
return False
def run(self):
"""执行完整的数据监控流程"""
logger.info(f"=== 开始数据监控任务 ({self.execution_time}) ===")
# 判断是否是今天的第一次运行
if is_first_run_today():
logger.info("检测到是今天的第一次运行,执行每日数据快照任务")
success = self.run_daily_snapshot()
else:
logger.info("执行每小时数据检查任务")
success = self.run_hourly_check()
if not success:
logger.error("=== 数据监控任务执行失败 ===")
return False
logger.info("=== 数据监控任务成功完成 ===")
return True
if __name__ == "__main__":
# 创建监控实例并执行
monitor = DataMonitor()
if not monitor.run():
sys.exit(1)