Files
saas/back_ground_module/data_monitor.py
T

661 lines
24 KiB
Python

"""字段监控(多平台适配版)"""
import sys
from pathlib import Path
import pandas as pd
import zipfile
from datetime import datetime, timezone, timedelta, date
import requests
from typing import Optional, List, Dict, Any
from decimal import Decimal
import time
import numpy as np
import json
import os
os.chdir(Path(__file__).parent)
def replace_decimals(obj):
"""替换Decimal类型为float"""
if isinstance(obj, dict):
return {k: replace_decimals(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [replace_decimals(item) for item in obj]
elif isinstance(obj, Decimal):
return float(obj)
return obj
class NpEncoder(json.JSONEncoder):
"""NumPy类型JSON编码器"""
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(NpEncoder, self).default(obj)
# ---------------------------- 配置项(多平台适配)---------------------------
class Config:
# 基础路径(使用Path对象自动处理跨平台路径)
BASE_DIR = Path(__file__).parent.absolute()
OUTPUT_DIR = BASE_DIR / "output"
# 数据存储目录(英文命名避免编码问题)
DATA_DIR = OUTPUT_DIR / "data_snapshots"
ARCHIVE_DIR = OUTPUT_DIR / "archives"
# API配置
JIANDAOYUN_API_TOKEN = "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN"
# 运行参数
RETAIN_DAYS = 7
COMPRESS_FORMAT = "zip"
MAX_RETRIES = 3
RETRY_DELAY = 0.5 # 秒
# 监控表单配置
MONITOR_APP_ID = "6694d3c4fcb69ca9a111a6c4"
MONITOR_ENTRY_ID = "6850c044f17c934b3ec01fea"
CHANGES_ENTRY_ID = "6863a402a77925690a470cc5"
STATUS_ENTRY_ID = "67ede908eb9c22261016466e"
@classmethod
def get_log_file(cls):
"""获取跨平台兼容的日志文件路径"""
return cls.OUTPUT_DIR / "data_monitor.log"
@classmethod
def get_changes_file(cls):
"""获取跨平台兼容的变更记录文件路径"""
return cls.OUTPUT_DIR / "changes_summary.csv"
# ---------------------- 工具函数(多平台兼容)-----------------------
class Utils:
@staticmethod
def ensure_dir(path):
"""确保目录存在(自动处理Path对象或字符串)"""
path = Path(path) if not isinstance(path, Path) else path
try:
path.mkdir(parents=True, exist_ok=True)
return True
except Exception as e:
print(f"创建目录失败: {e}")
return False
@staticmethod
def get_iso_time():
"""获取ISO格式的UTC时间"""
return datetime.now(timezone.utc).isoformat(timespec='milliseconds') + "Z"
@staticmethod
def is_first_run_today():
"""检查是否是今天的第一次运行"""
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
snapshot_file = Config.DATA_DIR / f"snapshot_{today}.csv"
widget_file = Config.DATA_DIR / f"all_widgets_{today}.csv"
return not (snapshot_file.exists() and widget_file.exists())
@staticmethod
def safe_csv_write(df, file_path):
"""安全的CSV写入(带临时文件和错误处理)"""
file_path = Path(file_path)
temp_file = file_path.parent / (file_path.name + '.tmp')
try:
df.to_csv(temp_file, index=False, encoding='utf-8-sig')
if file_path.exists():
file_path.unlink()
temp_file.rename(file_path)
return True
except Exception as e:
print(f"CSV写入失败: {e}")
if temp_file.exists():
temp_file.unlink()
return False
# ---------------------- API客户端(多平台兼容)-----------------------
class APIClient:
def __init__(self):
self.headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json.json'
}
def request(self, url, payload, method='POST'):
"""带重试机制的API请求"""
for retry in range(Config.MAX_RETRIES + 1):
try:
response = requests.request(
method, url,
headers=self.headers,
data=payload,
timeout=30
)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
if retry == Config.MAX_RETRIES:
raise
time.sleep(Config.RETRY_DELAY)
def data_batch_create(self, data: dict, max_retries: int = 20) -> Optional[Dict]:
"""新建单条表单数据"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/create'
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data": data['data'],
"is_start_workflow": data.get('is_start_workflow', "false"),
"is_start_trigger": data.get('is_start_trigger', "false"),
"transaction_id": data.get('transaction_id', "")
})
for retry in range(max_retries + 1):
try:
response = requests.post(url=url, data=payload, headers=self.headers, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if retry == max_retries:
print(f"创建数据失败: {e}")
return None
time.sleep(3)
def entry_data_list(self, data: dict, max_retries: int = 20) -> Dict:
"""获取多条表单数据"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
all_data_batches = []
last_data_id = None
while True:
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"limit": 100,
"data_id": last_data_id
})
for retry in range(max_retries + 1):
try:
response = requests.post(url=url, data=payload, headers=self.headers, timeout=10)
response.raise_for_status()
data_get = response.json()
if data_get.get("data"):
all_data_batches.extend(data_get['data'])
last_data_id = data_get['data'][-1].get('_id')
break
else:
return {"data": all_data_batches}
except requests.exceptions.RequestException as e:
if retry == max_retries:
print(f"获取数据列表失败: {e}")
return {"data": all_data_batches}
time.sleep(0.1)
if not data_get.get("data") or len(data_get['data']) < 100:
break
return {"data": all_data_batches}
def entry_data_batch_create(self, data: dict, chunk_size: int = 90, max_retries: int = 20) -> List[Optional[Dict]]:
"""新建多条数据"""
data = replace_decimals(data)
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_create'
data_get_list = []
total_length = len(data['data_list'])
num_chunks = (total_length + chunk_size - 1) // chunk_size
for i in range(num_chunks):
start_index = i * chunk_size
end_index = min(start_index + chunk_size, total_length)
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data_list": data['data_list'][start_index:end_index],
"is_start_workflow": data.get('is_start_workflow', "false"),
"is_start_trigger": data.get('is_start_trigger', "false"),
}, cls=NpEncoder)
for retry in range(max_retries + 1):
try:
response = requests.post(url=url, data=payload, headers=self.headers, timeout=10)
response.raise_for_status()
data_get = response.json()
if data_get.get("status") == "success":
data_get_list.append(data_get)
break
except requests.exceptions.RequestException as e:
if retry == max_retries:
print(f"批量创建数据失败: {e}")
data_get_list.append(None)
time.sleep(0.1)
return data_get_list
# ---------------------- 数据处理基类 -----------------------
class DataHandler:
def __init__(self):
self.execution_time = Utils.get_iso_time()
self.today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
self.setup_dirs()
self.api = APIClient()
def setup_dirs(self):
"""初始化所有需要的目录"""
self.data_dir = Config.DATA_DIR
self.archive_dir = Config.ARCHIVE_DIR
Utils.ensure_dir(self.data_dir)
Utils.ensure_dir(self.archive_dir)
# 初始化数据文件路径
self.last_data_file = self.data_dir / "last_data.csv"
self.last_widget_file = self.data_dir / "last_widget_data.csv"
def load_last_data(self):
"""加载上次运行的数据"""
try:
last_data = pd.read_csv(self.last_data_file) if self.last_data_file.exists() else None
last_widget = pd.read_csv(self.last_widget_file) if self.last_widget_file.exists() else None
return last_data, last_widget
except Exception as e:
print(f"加载上次数据失败: {e}")
return None, None
def save_last_data(self, data, widget_data):
"""保存当前数据供下次比较"""
try:
success = Utils.safe_csv_write(data, self.last_data_file)
success &= Utils.safe_csv_write(widget_data, self.last_widget_file)
return success
except Exception as e:
print(f"保存数据失败: {e}")
return False
def field_replacement(self, data, final_data):
"""字段替换方法(由id替换为标签名)"""
# 这里实现具体的字段替换逻辑
# 由于具体替换规则未知,返回原始数据
return final_data
# ---------------------- 数据监控主类 -----------------------
class DataMonitor(DataHandler):
def __init__(self):
super().__init__()
self.last_data, self.last_widget = self.load_last_data()
def fetch_apps(self):
"""获取所有应用列表"""
url = "https://api.jiandaoyun.com/api/v5/app/list"
payload = json.dumps({"skip": 0, "limit": 100})
try:
response = self.api.request(url, payload)
return pd.DataFrame(response.json().get("apps", []))
except Exception as e:
print(f"获取应用列表失败: {e}")
return pd.DataFrame()
def fetch_entries(self, app_df):
"""获取所有表单条目"""
url = "https://api.jiandaoyun.com/api/v5/app/entry/list"
all_entries = []
for _, app in app_df.iterrows():
payload = json.dumps({"app_id": app['app_id']})
try:
response = self.api.request(url, payload)
entries = response.json().get("forms", [])
if entries:
entry_df = pd.DataFrame(entries)
entry_df['app_id'] = app['app_id']
all_entries.append(entry_df)
except Exception as e:
print(f"获取表单条目失败: {e}")
continue
return pd.concat(all_entries, ignore_index=True) if all_entries else pd.DataFrame()
def fetch_widgets(self, entry_df):
"""获取所有字段组件"""
url = "https://api.jiandaoyun.com/api/v5/app/entry/widget/list"
all_widgets = []
for _, entry in entry_df.iterrows():
payload = json.dumps({
"app_id": entry['app_id'],
"entry_id": entry['entry_id']
})
try:
response = self.api.request(url, payload)
widgets = response.json().get('widgets', [])
if widgets:
widget_df = pd.DataFrame(widgets)
widget_df['app_id'] = entry['app_id']
widget_df['entry_id'] = entry['entry_id']
all_widgets.append(widget_df)
except Exception as e:
print(f"获取字段组件失败: {e}")
continue
return pd.concat(all_widgets, ignore_index=True) if all_widgets else pd.DataFrame()
def fetch_monitor_data(self):
"""获取监控数据"""
payload = {
"api_key": Config.MONITOR_APP_ID,
"entry_id": Config.MONITOR_ENTRY_ID
}
try:
data = self.api.entry_data_list(payload).get("data", [])
data_list = pd.DataFrame(data)
# 处理复杂数据类型
for col in data_list.columns:
if data_list[col].apply(lambda x: isinstance(x, (dict, list))).any():
data_list[col] = data_list[col].astype(str)
return data_list.drop_duplicates()
except Exception as e:
print(f"获取监控数据失败: {e}")
return pd.DataFrame()
def match_widgets(self, data_list, widget_list):
"""匹配数据列表和组件列表"""
if '_widget_1750122565203' not in data_list.columns:
print("缺少必需的列 '_widget_1750122565203'")
return pd.DataFrame()
if widget_list.empty:
return pd.DataFrame()
return widget_list[widget_list['entry_id'].isin(data_list['_widget_1750122565203'])]
def archive_old_data(self):
"""归档旧数据"""
keep_dates = [
(datetime.now(timezone.utc) - timedelta(days=i)).strftime("%Y-%m-%d")
for i in range(Config.RETAIN_DAYS)
]
files_to_archive = [
f for f in self.data_dir.iterdir()
if f.is_file() and
(f.name.startswith("snapshot_") or f.name.startswith("all_widgets_")) and
f.suffix == '.csv'
]
for file_path in files_to_archive:
date_str = file_path.stem[9:] if file_path.name.startswith("snapshot_") else file_path.stem[12:]
if date_str not in keep_dates:
year_month = date_str[:7]
archive_path = self.archive_dir / f"snapshots_{year_month}.{Config.COMPRESS_FORMAT}"
try:
with zipfile.ZipFile(archive_path, 'a', zipfile.ZIP_DEFLATED) as zipf:
zipf.write(file_path, arcname=file_path.name)
file_path.unlink()
except Exception as e:
print(f"归档文件失败: {e}")
def compare_data(self, current_data):
"""比较新旧数据差异"""
if not self.last_data_file.exists() or current_data.empty:
return None
try:
last_data = pd.read_csv(self.last_data_file)
if last_data.empty:
return None
last_data['unique_id'] = last_data['name'].astype(str) + last_data['app_id'].astype(str)
current_data['unique_id'] = current_data['name'].astype(str) + current_data['app_id'].astype(str)
merged = pd.merge(
last_data, current_data,
on=['unique_id'],
how='outer',
suffixes=('_last', '_current'),
indicator=True
)
changes = {
'added': merged[merged['_merge'] == 'right_only'],
'deleted': merged[merged['_merge'] == 'left_only'],
'modified': pd.DataFrame()
}
for col in ['label', 'type']:
last_col = f"{col}_last"
current_col = f"{col}_current"
if last_col in merged.columns and current_col in merged.columns:
mask = (merged['_merge'] == 'both') & (merged[last_col] != merged[current_col])
mask = mask & ~merged[last_col].isna() & ~merged[current_col].isna()
if mask.any():
modified = merged.loc[mask].copy()
modified['changed_field'] = col
modified['old_value'] = modified[last_col]
modified['new_value'] = modified[current_col]
modified['change_status'] = 'update'
changes['modified'] = pd.concat([changes['modified'], modified])
return changes
except Exception as e:
print(f"比较数据失败: {e}")
return None
def save_changes(self, changes, apps, entries):
"""保存变更记录"""
if not changes or all(len(v) == 0 for v in changes.values()):
return False
result_rows = []
for change_type in ['added', 'deleted', 'modified']:
df = changes[change_type]
if df.empty:
continue
suffix = 'current' if change_type in ['added', 'modified'] else 'last'
for _, row in df.iterrows():
app_id = row.get(f'app_id_{suffix}')
entry_id = row.get(f'entry_id_{suffix}')
if not app_id or not entry_id:
continue
app_name = 'Unknown App'
entry_name = 'Unknown Entry'
if not apps.empty:
app_match = apps[apps['app_id'] == app_id]
if not app_match.empty:
app_name = app_match['name'].values[0]
if not entries.empty:
entry_match = entries[(entries['app_id'] == app_id) & (entries['entry_id'] == entry_id)]
if not entry_match.empty:
entry_name = entry_match['name'].values[0]
if change_type == 'added':
content = f"Added field: {row.get('label_current', 'Unknown')}"
elif change_type == 'deleted':
content = f"Deleted field: {row.get('label_last', 'Unknown')}"
else:
content = f"Changed from \"{row.get('old_value', 'Unknown')}\" to \"{row.get('new_value', 'Unknown')}\""
result_rows.append({
'timestamp': self.execution_time,
'unique_id': row.get('unique_id', ''),
'app_id': app_id,
'app_name': app_name,
'entry_id': entry_id,
'entry_name': entry_name,
'change_type': change_type,
'details': content
})
if result_rows:
result_df = pd.DataFrame(result_rows)
changes_file = Config.get_changes_file()
try:
result_df.to_csv(
changes_file,
mode='a',
header=not changes_file.exists(),
index=False,
encoding='utf-8-sig'
)
self.add_to_jiandaoyun(result_df)
return True
except Exception as e:
print(f"保存变更记录失败: {e}")
return False
return False
def add_to_jiandaoyun(self, result_df):
"""将变更记录写入简道云"""
all_data = [{
"_widget_1751446961315": {"value": row["app_name"]},
"_widget_1751446961316": {"value": row["entry_name"]},
"_widget_1751446961317": {"value": row["change_type"]},
"_widget_1751446961318": {"value": row["details"]},
"_widget_1751446961319": {"value": row["timestamp"]},
} for _, row in result_df.iterrows()]
payload = {
"api_key": Config.MONITOR_APP_ID,
"entry_id": Config.CHANGES_ENTRY_ID,
"data_list": all_data
}
try:
response = self.api.entry_data_batch_create(payload)
return isinstance(response, list) and len(response) > 0
except Exception as e:
print(f"写入简道云失败: {e}")
return False
def send_task_status(self, task_start_time: str, task_name: str) -> None:
"""将任务状态发送到简道云"""
try:
end_time_utc = datetime.now(timezone.utc)
task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S")
task_start_utc = task_start_naive - timedelta(hours=8)
task_start_utc = task_start_utc.replace(tzinfo=timezone.utc)
run_time = end_time_utc - task_start_utc
run_time_sec = int(run_time.total_seconds())
today_utc = end_time_utc.strftime("%Y-%m-%d")
task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
payload = {
"api_key": Config.MONITOR_APP_ID,
"entry_id": Config.STATUS_ENTRY_ID,
"data": {
"_widget_1744873387500": {"value": today_utc},
"_widget_1743644977694": {"value": task_name},
"_widget_1744873387501": {"value": task_start_iso},
"_widget_1744873387502": {"value": task_end_iso},
"_widget_1744873387504": {"value": run_time_sec},
}
}
self.api.data_batch_create(payload)
except Exception as e:
print(f"发送任务状态失败: {e}")
def run_daily_snapshot(self):
"""执行每日快照任务"""
try:
apps = self.fetch_apps()
entries = self.fetch_entries(apps)
widgets = self.fetch_widgets(entries)
monitor_data = self.fetch_monitor_data()
matched_data = self.match_widgets(monitor_data, widgets)
if matched_data.empty:
print("没有匹配到数据")
return False
today_file = self.data_dir / f"snapshot_{self.today}.csv"
widget_file = self.data_dir / f"all_widgets_{self.today}.csv"
if not Utils.safe_csv_write(matched_data, today_file):
print("保存快照数据失败")
return False
if not Utils.safe_csv_write(widgets, widget_file):
print("保存组件数据失败")
return False
self.archive_old_data()
self.save_last_data(matched_data, widgets)
return True
except Exception as e:
print(f"每日快照任务失败: {e}")
return False
def run_hourly_check(self):
"""执行每小时检查任务"""
try:
apps = self.fetch_apps()
entries = self.fetch_entries(apps)
widgets = self.fetch_widgets(entries)
monitor_data = self.fetch_monitor_data()
current_data = self.match_widgets(monitor_data, widgets)
changes = self.compare_data(current_data)
if changes and any(len(v) > 0 for v in changes.values()):
self.save_changes(changes, apps, entries)
self.save_last_data(current_data, widgets)
return True
except Exception as e:
print(f"每小时检查任务失败: {e}")
return False
def main(self):
"""主运行逻辑"""
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
if Utils.is_first_run_today():
success = self.run_daily_snapshot()
else:
success = self.run_hourly_check()
self.send_task_status(task_start_time, "字段监控")
return success
except Exception as e:
print(f"主运行逻辑失败: {e}")
return False
if __name__ == "__main__":
# 确保输出目录存在
Utils.ensure_dir(Config.OUTPUT_DIR)
# 运行监控任务
monitor = DataMonitor()
if not monitor.main():
sys.exit(1)