"""字段监控(多平台适配版)""" import sys from pathlib import Path import pandas as pd import zipfile from datetime import datetime, timezone, timedelta, date import requests from typing import Optional, List, Dict, Any from decimal import Decimal import time import numpy as np import json def replace_decimals(obj): """替换Decimal类型为float""" if isinstance(obj, dict): return {k: replace_decimals(v) for k, v in obj.items()} elif isinstance(obj, list): return [replace_decimals(item) for item in obj] elif isinstance(obj, Decimal): return float(obj) return obj class NpEncoder(json.JSONEncoder): """NumPy类型JSON编码器""" def default(self, obj): if isinstance(obj, np.integer): return int(obj) elif isinstance(obj, np.floating): return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() else: return super(NpEncoder, self).default(obj) # ---------------------------- 配置项(多平台适配)--------------------------- class Config: # 基础路径(使用Path对象自动处理跨平台路径) BASE_DIR = Path(__file__).parent.absolute() OUTPUT_DIR = BASE_DIR / "output" # 数据存储目录(英文命名避免编码问题) DATA_DIR = OUTPUT_DIR / "data_snapshots" ARCHIVE_DIR = OUTPUT_DIR / "archives" # API配置 JIANDAOYUN_API_TOKEN = "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN" # 运行参数 RETAIN_DAYS = 7 COMPRESS_FORMAT = "zip" MAX_RETRIES = 3 RETRY_DELAY = 0.5 # 秒 # 监控表单配置 MONITOR_APP_ID = "6694d3c4fcb69ca9a111a6c4" MONITOR_ENTRY_ID = "6850c044f17c934b3ec01fea" CHANGES_ENTRY_ID = "6863a402a77925690a470cc5" STATUS_ENTRY_ID = "67ede908eb9c22261016466e" @classmethod def get_log_file(cls): """获取跨平台兼容的日志文件路径""" return cls.OUTPUT_DIR / "data_monitor.log" @classmethod def get_changes_file(cls): """获取跨平台兼容的变更记录文件路径""" return cls.OUTPUT_DIR / "changes_summary.csv" # ---------------------- 工具函数(多平台兼容)----------------------- class Utils: @staticmethod def ensure_dir(path): """确保目录存在(自动处理Path对象或字符串)""" path = Path(path) if not isinstance(path, Path) else path try: path.mkdir(parents=True, exist_ok=True) return True except Exception as e: print(f"创建目录失败: {e}") return False @staticmethod def get_iso_time(): """获取ISO格式的UTC时间""" return datetime.now(timezone.utc).isoformat(timespec='milliseconds') + "Z" @staticmethod def is_first_run_today(): """检查是否是今天的第一次运行""" today = datetime.now(timezone.utc).strftime("%Y-%m-%d") snapshot_file = Config.DATA_DIR / f"snapshot_{today}.csv" widget_file = Config.DATA_DIR / f"all_widgets_{today}.csv" return not (snapshot_file.exists() and widget_file.exists()) @staticmethod def safe_csv_write(df, file_path): """安全的CSV写入(带临时文件和错误处理)""" file_path = Path(file_path) temp_file = file_path.parent / (file_path.name + '.tmp') try: df.to_csv(temp_file, index=False, encoding='utf-8-sig') if file_path.exists(): file_path.unlink() temp_file.rename(file_path) return True except Exception as e: print(f"CSV写入失败: {e}") if temp_file.exists(): temp_file.unlink() return False # ---------------------- API客户端(多平台兼容)----------------------- class APIClient: def __init__(self): self.headers = { 'Authorization': Config.JIANDAOYUN_API_TOKEN, 'Content-Type': 'application/json.json' } def request(self, url, payload, method='POST'): """带重试机制的API请求""" for retry in range(Config.MAX_RETRIES + 1): try: response = requests.request( method, url, headers=self.headers, data=payload, timeout=30 ) response.raise_for_status() return response except requests.exceptions.RequestException as e: if retry == Config.MAX_RETRIES: raise time.sleep(Config.RETRY_DELAY) def data_batch_create(self, data: dict, max_retries: int = 20) -> Optional[Dict]: """新建单条表单数据""" url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/create' payload = json.dumps({ "app_id": data['api_key'], "entry_id": data['entry_id'], "data": data['data'], "is_start_workflow": data.get('is_start_workflow', "false"), "is_start_trigger": data.get('is_start_trigger', "false"), "transaction_id": data.get('transaction_id', "") }) for retry in range(max_retries + 1): try: response = requests.post(url=url, data=payload, headers=self.headers, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: if retry == max_retries: print(f"创建数据失败: {e}") return None time.sleep(3) def entry_data_list(self, data: dict, max_retries: int = 20) -> Dict: """获取多条表单数据""" url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list' all_data_batches = [] last_data_id = None while True: payload = json.dumps({ "app_id": data['api_key'], "entry_id": data['entry_id'], "limit": 100, "data_id": last_data_id }) for retry in range(max_retries + 1): try: response = requests.post(url=url, data=payload, headers=self.headers, timeout=10) response.raise_for_status() data_get = response.json() if data_get.get("data"): all_data_batches.extend(data_get['data']) last_data_id = data_get['data'][-1].get('_id') break else: return {"data": all_data_batches} except requests.exceptions.RequestException as e: if retry == max_retries: print(f"获取数据列表失败: {e}") return {"data": all_data_batches} time.sleep(0.1) if not data_get.get("data") or len(data_get['data']) < 100: break return {"data": all_data_batches} def entry_data_batch_create(self, data: dict, chunk_size: int = 90, max_retries: int = 20) -> List[Optional[Dict]]: """新建多条数据""" data = replace_decimals(data) url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_create' data_get_list = [] total_length = len(data['data_list']) num_chunks = (total_length + chunk_size - 1) // chunk_size for i in range(num_chunks): start_index = i * chunk_size end_index = min(start_index + chunk_size, total_length) payload = json.dumps({ "app_id": data['api_key'], "entry_id": data['entry_id'], "data_list": data['data_list'][start_index:end_index], "is_start_workflow": data.get('is_start_workflow', "false"), "is_start_trigger": data.get('is_start_trigger', "false"), }, cls=NpEncoder) for retry in range(max_retries + 1): try: response = requests.post(url=url, data=payload, headers=self.headers, timeout=10) response.raise_for_status() data_get = response.json() if data_get.get("status") == "success": data_get_list.append(data_get) break except requests.exceptions.RequestException as e: if retry == max_retries: print(f"批量创建数据失败: {e}") data_get_list.append(None) time.sleep(0.1) return data_get_list # ---------------------- 数据处理基类 ----------------------- class DataHandler: def __init__(self): self.execution_time = Utils.get_iso_time() self.today = datetime.now(timezone.utc).strftime("%Y-%m-%d") self.setup_dirs() self.api = APIClient() def setup_dirs(self): """初始化所有需要的目录""" self.data_dir = Config.DATA_DIR self.archive_dir = Config.ARCHIVE_DIR Utils.ensure_dir(self.data_dir) Utils.ensure_dir(self.archive_dir) # 初始化数据文件路径 self.last_data_file = self.data_dir / "last_data.csv" self.last_widget_file = self.data_dir / "last_widget_data.csv" def load_last_data(self): """加载上次运行的数据""" try: last_data = pd.read_csv(self.last_data_file) if self.last_data_file.exists() else None last_widget = pd.read_csv(self.last_widget_file) if self.last_widget_file.exists() else None return last_data, last_widget except Exception as e: print(f"加载上次数据失败: {e}") return None, None def save_last_data(self, data, widget_data): """保存当前数据供下次比较""" try: success = Utils.safe_csv_write(data, self.last_data_file) success &= Utils.safe_csv_write(widget_data, self.last_widget_file) return success except Exception as e: print(f"保存数据失败: {e}") return False def field_replacement(self, data, final_data): """字段替换方法(由id替换为标签名)""" # 这里实现具体的字段替换逻辑 # 由于具体替换规则未知,返回原始数据 return final_data # ---------------------- 数据监控主类 ----------------------- class DataMonitor(DataHandler): def __init__(self): super().__init__() self.last_data, self.last_widget = self.load_last_data() def fetch_apps(self): """获取所有应用列表""" url = "https://api.jiandaoyun.com/api/v5/app/list" payload = json.dumps({"skip": 0, "limit": 100}) try: response = self.api.request(url, payload) return pd.DataFrame(response.json().get("apps", [])) except Exception as e: print(f"获取应用列表失败: {e}") return pd.DataFrame() def fetch_entries(self, app_df): """获取所有表单条目""" url = "https://api.jiandaoyun.com/api/v5/app/entry/list" all_entries = [] for _, app in app_df.iterrows(): payload = json.dumps({"app_id": app['app_id']}) try: response = self.api.request(url, payload) entries = response.json().get("forms", []) if entries: entry_df = pd.DataFrame(entries) entry_df['app_id'] = app['app_id'] all_entries.append(entry_df) except Exception as e: print(f"获取表单条目失败: {e}") continue return pd.concat(all_entries, ignore_index=True) if all_entries else pd.DataFrame() def fetch_widgets(self, entry_df): """获取所有字段组件""" url = "https://api.jiandaoyun.com/api/v5/app/entry/widget/list" all_widgets = [] for _, entry in entry_df.iterrows(): payload = json.dumps({ "app_id": entry['app_id'], "entry_id": entry['entry_id'] }) try: response = self.api.request(url, payload) widgets = response.json().get('widgets', []) if widgets: widget_df = pd.DataFrame(widgets) widget_df['app_id'] = entry['app_id'] widget_df['entry_id'] = entry['entry_id'] all_widgets.append(widget_df) except Exception as e: print(f"获取字段组件失败: {e}") continue return pd.concat(all_widgets, ignore_index=True) if all_widgets else pd.DataFrame() def fetch_monitor_data(self): """获取监控数据""" payload = { "api_key": Config.MONITOR_APP_ID, "entry_id": Config.MONITOR_ENTRY_ID } try: data = self.api.entry_data_list(payload).get("data", []) data_list = pd.DataFrame(data) # 处理复杂数据类型 for col in data_list.columns: if data_list[col].apply(lambda x: isinstance(x, (dict, list))).any(): data_list[col] = data_list[col].astype(str) return data_list.drop_duplicates() except Exception as e: print(f"获取监控数据失败: {e}") return pd.DataFrame() def match_widgets(self, data_list, widget_list): """匹配数据列表和组件列表""" if '_widget_1750122565203' not in data_list.columns: print("缺少必需的列 '_widget_1750122565203'") return pd.DataFrame() if widget_list.empty: return pd.DataFrame() return widget_list[widget_list['entry_id'].isin(data_list['_widget_1750122565203'])] def archive_old_data(self): """归档旧数据""" keep_dates = [ (datetime.now(timezone.utc) - timedelta(days=i)).strftime("%Y-%m-%d") for i in range(Config.RETAIN_DAYS) ] files_to_archive = [ f for f in self.data_dir.iterdir() if f.is_file() and (f.name.startswith("snapshot_") or f.name.startswith("all_widgets_")) and f.suffix == '.csv' ] for file_path in files_to_archive: date_str = file_path.stem[9:] if file_path.name.startswith("snapshot_") else file_path.stem[12:] if date_str not in keep_dates: year_month = date_str[:7] archive_path = self.archive_dir / f"snapshots_{year_month}.{Config.COMPRESS_FORMAT}" try: with zipfile.ZipFile(archive_path, 'a', zipfile.ZIP_DEFLATED) as zipf: zipf.write(file_path, arcname=file_path.name) file_path.unlink() except Exception as e: print(f"归档文件失败: {e}") def compare_data(self, current_data): """比较新旧数据差异""" if not self.last_data_file.exists() or current_data.empty: return None try: last_data = pd.read_csv(self.last_data_file) if last_data.empty: return None last_data['unique_id'] = last_data['name'].astype(str) + last_data['app_id'].astype(str) current_data['unique_id'] = current_data['name'].astype(str) + current_data['app_id'].astype(str) merged = pd.merge( last_data, current_data, on=['unique_id'], how='outer', suffixes=('_last', '_current'), indicator=True ) changes = { 'added': merged[merged['_merge'] == 'right_only'], 'deleted': merged[merged['_merge'] == 'left_only'], 'modified': pd.DataFrame() } for col in ['label', 'type']: last_col = f"{col}_last" current_col = f"{col}_current" if last_col in merged.columns and current_col in merged.columns: mask = (merged['_merge'] == 'both') & (merged[last_col] != merged[current_col]) mask = mask & ~merged[last_col].isna() & ~merged[current_col].isna() if mask.any(): modified = merged.loc[mask].copy() modified['changed_field'] = col modified['old_value'] = modified[last_col] modified['new_value'] = modified[current_col] modified['change_status'] = 'update' changes['modified'] = pd.concat([changes['modified'], modified]) return changes except Exception as e: print(f"比较数据失败: {e}") return None def save_changes(self, changes, apps, entries): """保存变更记录""" if not changes or all(len(v) == 0 for v in changes.values()): return False result_rows = [] for change_type in ['added', 'deleted', 'modified']: df = changes[change_type] if df.empty: continue suffix = 'current' if change_type in ['added', 'modified'] else 'last' for _, row in df.iterrows(): app_id = row.get(f'app_id_{suffix}') entry_id = row.get(f'entry_id_{suffix}') if not app_id or not entry_id: continue app_name = 'Unknown App' entry_name = 'Unknown Entry' if not apps.empty: app_match = apps[apps['app_id'] == app_id] if not app_match.empty: app_name = app_match['name'].values[0] if not entries.empty: entry_match = entries[(entries['app_id'] == app_id) & (entries['entry_id'] == entry_id)] if not entry_match.empty: entry_name = entry_match['name'].values[0] if change_type == 'added': content = f"Added field: {row.get('label_current', 'Unknown')}" elif change_type == 'deleted': content = f"Deleted field: {row.get('label_last', 'Unknown')}" else: content = f"Changed from \"{row.get('old_value', 'Unknown')}\" to \"{row.get('new_value', 'Unknown')}\"" result_rows.append({ 'timestamp': self.execution_time, 'unique_id': row.get('unique_id', ''), 'app_id': app_id, 'app_name': app_name, 'entry_id': entry_id, 'entry_name': entry_name, 'change_type': change_type, 'details': content }) if result_rows: result_df = pd.DataFrame(result_rows) changes_file = Config.get_changes_file() try: result_df.to_csv( changes_file, mode='a', header=not changes_file.exists(), index=False, encoding='utf-8-sig' ) self.add_to_jiandaoyun(result_df) return True except Exception as e: print(f"保存变更记录失败: {e}") return False return False def add_to_jiandaoyun(self, result_df): """将变更记录写入简道云""" all_data = [{ "_widget_1751446961315": {"value": row["app_name"]}, "_widget_1751446961316": {"value": row["entry_name"]}, "_widget_1751446961317": {"value": row["change_type"]}, "_widget_1751446961318": {"value": row["details"]}, "_widget_1751446961319": {"value": row["timestamp"]}, } for _, row in result_df.iterrows()] payload = { "api_key": Config.MONITOR_APP_ID, "entry_id": Config.CHANGES_ENTRY_ID, "data_list": all_data } try: response = self.api.entry_data_batch_create(payload) return isinstance(response, list) and len(response) > 0 except Exception as e: print(f"写入简道云失败: {e}") return False def send_task_status(self, task_start_time: str, task_name: str) -> None: """将任务状态发送到简道云""" try: end_time_utc = datetime.now(timezone.utc) task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S") task_start_utc = task_start_naive - timedelta(hours=8) task_start_utc = task_start_utc.replace(tzinfo=timezone.utc) run_time = end_time_utc - task_start_utc run_time_sec = int(run_time.total_seconds()) today_utc = end_time_utc.strftime("%Y-%m-%d") task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ") task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ") payload = { "api_key": Config.MONITOR_APP_ID, "entry_id": Config.STATUS_ENTRY_ID, "data": { "_widget_1744873387500": {"value": today_utc}, "_widget_1743644977694": {"value": task_name}, "_widget_1744873387501": {"value": task_start_iso}, "_widget_1744873387502": {"value": task_end_iso}, "_widget_1744873387504": {"value": run_time_sec}, } } self.api.data_batch_create(payload) except Exception as e: print(f"发送任务状态失败: {e}") def run_daily_snapshot(self): """执行每日快照任务""" try: apps = self.fetch_apps() entries = self.fetch_entries(apps) widgets = self.fetch_widgets(entries) monitor_data = self.fetch_monitor_data() matched_data = self.match_widgets(monitor_data, widgets) if matched_data.empty: print("没有匹配到数据") return False today_file = self.data_dir / f"snapshot_{self.today}.csv" widget_file = self.data_dir / f"all_widgets_{self.today}.csv" if not Utils.safe_csv_write(matched_data, today_file): print("保存快照数据失败") return False if not Utils.safe_csv_write(widgets, widget_file): print("保存组件数据失败") return False self.archive_old_data() self.save_last_data(matched_data, widgets) return True except Exception as e: print(f"每日快照任务失败: {e}") return False def run_hourly_check(self): """执行每小时检查任务""" try: apps = self.fetch_apps() entries = self.fetch_entries(apps) widgets = self.fetch_widgets(entries) monitor_data = self.fetch_monitor_data() current_data = self.match_widgets(monitor_data, widgets) changes = self.compare_data(current_data) if changes and any(len(v) > 0 for v in changes.values()): self.save_changes(changes, apps, entries) self.save_last_data(current_data, widgets) return True except Exception as e: print(f"每小时检查任务失败: {e}") return False def main(self): """主运行逻辑""" task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: if Utils.is_first_run_today(): success = self.run_daily_snapshot() else: success = self.run_hourly_check() self.send_task_status(task_start_time, "字段监控") return success except Exception as e: print(f"主运行逻辑失败: {e}") return False if __name__ == "__main__": # 确保输出目录存在 Utils.ensure_dir(Config.OUTPUT_DIR) # 运行监控任务 monitor = DataMonitor() if not monitor.main(): sys.exit(1)