"""字段监控(多平台适配版)""" import sys import platform from datetime import datetime, timedelta, timezone from pathlib import Path import pandas as pd import zipfile import json import requests import time from api import API from log_config import configure_task_logger, configure_error_task_logger from back_ground_module import CommonModule # 初始化日志记录器 logger = configure_task_logger() error_task_logger = configure_error_task_logger() common_tools = CommonModule() # ---------------------------- 配置项(多平台适配)--------------------------- class Config: # 基础路径(使用Path对象自动处理跨平台路径) BASE_DIR = Path(__file__).parent.absolute() OUTPUT_DIR = BASE_DIR / "output" # 数据存储目录(英文命名避免编码问题) DATA_DIR = OUTPUT_DIR / "data_snapshots" ARCHIVE_DIR = OUTPUT_DIR / "archives" # 运行参数 RETAIN_DAYS = 7 COMPRESS_FORMAT = "zip" MAX_RETRIES = 3 RETRY_DELAY = 0.5 # 秒 @classmethod def get_log_file(cls): """获取跨平台兼容的日志文件路径""" return cls.OUTPUT_DIR / "data_monitor.log" @classmethod def get_changes_file(cls): """获取跨平台兼容的变更记录文件路径""" return cls.OUTPUT_DIR / "changes_summary.csv" # ---------------------- 工具函数(多平台兼容)----------------------- class Utils: @staticmethod def ensure_dir(path): """确保目录存在(自动处理Path对象或字符串)""" path = Path(path) if not isinstance(path, Path) else path try: path.mkdir(parents=True, exist_ok=True) logger.debug(f"Directory ensured: {path}") return True except Exception as e: error_task_logger.error(f"Failed to create directory {path}: {str(e)}") return False @staticmethod def get_iso_time(): """获取ISO格式的UTC时间""" return datetime.now(timezone.utc).isoformat(timespec='milliseconds') + "Z" @staticmethod def is_first_run_today(): """检查是否是今天的第一次运行""" today = datetime.now(timezone.utc).strftime("%Y-%m-%d") snapshot_file = Config.DATA_DIR / f"snapshot_{today}.csv" widget_file = Config.DATA_DIR / f"all_widgets_{today}.csv" return not (snapshot_file.exists() and widget_file.exists()) @staticmethod def safe_csv_write(df, file_path): """安全的CSV写入(带临时文件和错误处理)""" file_path = Path(file_path) temp_file = file_path.parent / (file_path.name + '.tmp') try: df.to_csv(temp_file, index=False, encoding='utf-8-sig') if file_path.exists(): file_path.unlink() temp_file.rename(file_path) return True except Exception as e: error_task_logger.error(f"Failed to write {file_path}: {str(e)}") if temp_file.exists(): temp_file.unlink() return False # ---------------------- API客户端(多平台兼容)----------------------- class APIClient: def __init__(self): self.headers = { 'Authorization': 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN', 'Content-Type': 'application/json' } self.api = API() def request(self, url, payload, method='POST'): """带重试机制的API请求""" for retry in range(Config.MAX_RETRIES + 1): try: response = requests.request( method, url, headers=self.headers, data=payload, timeout=30 ) response.raise_for_status() return response except requests.exceptions.RequestException as e: if retry == Config.MAX_RETRIES: raise time.sleep(Config.RETRY_DELAY) logger.warning(f"Request failed (attempt {retry + 1}/{Config.MAX_RETRIES}): {str(e)}") # ---------------------- 数据处理基类 ----------------------- class DataHandler: def __init__(self): self.execution_time = Utils.get_iso_time() self.today = datetime.now(timezone.utc).strftime("%Y-%m-%d") self.setup_dirs() self.api = APIClient() def setup_dirs(self): """初始化所有需要的目录""" self.data_dir = Config.DATA_DIR self.archive_dir = Config.ARCHIVE_DIR Utils.ensure_dir(self.data_dir) Utils.ensure_dir(self.archive_dir) # 初始化数据文件路径 self.last_data_file = self.data_dir / "last_data.csv" self.last_widget_file = self.data_dir / "last_widget_data.csv" def load_last_data(self): """加载上次运行的数据""" try: last_data = pd.read_csv(self.last_data_file) if self.last_data_file.exists() else None last_widget = pd.read_csv(self.last_widget_file) if self.last_widget_file.exists() else None return last_data, last_widget except Exception as e: error_task_logger.error(f"Failed to load last data: {str(e)}") return None, None def save_last_data(self, data, widget_data): """保存当前数据供下次比较""" try: success = Utils.safe_csv_write(data, self.last_data_file) success &= Utils.safe_csv_write(widget_data, self.last_widget_file) return success except Exception as e: error_task_logger.error(f"Failed to save current data: {str(e)}") return False # ---------------------- 数据监控主类 ----------------------- class DataMonitor(DataHandler): def __init__(self): super().__init__() self.last_data, self.last_widget = self.load_last_data() def fetch_apps(self): """获取所有应用列表""" url = "https://api.jiandaoyun.com/api/v5/app/list" payload = json.dumps({"skip": 0, "limit": 100}) response = self.api.request(url, payload) return pd.DataFrame(response.json().get("apps", [])) def fetch_entries(self, app_df): """获取所有表单条目""" url = "https://api.jiandaoyun.com/api/v5/app/entry/list" all_entries = [] for _, app in app_df.iterrows(): payload = json.dumps({"app_id": app['app_id']}) response = self.api.request(url, payload) entries = response.json().get("forms", []) if entries: entry_df = pd.DataFrame(entries) entry_df['app_id'] = app['app_id'] all_entries.append(entry_df) return pd.concat(all_entries, ignore_index=True) if all_entries else None def fetch_widgets(self, entry_df): """获取所有字段组件""" url = "https://api.jiandaoyun.com/api/v5/app/entry/widget/list" all_widgets = [] for _, entry in entry_df.iterrows(): payload = json.dumps({ "app_id": entry['app_id'], "entry_id": entry['entry_id'] }) response = self.api.request(url, payload) widgets = response.json().get('widgets', []) if widgets: widget_df = pd.DataFrame(widgets) widget_df['app_id'] = entry['app_id'] widget_df['entry_id'] = entry['entry_id'] all_widgets.append(widget_df) return pd.concat(all_widgets, ignore_index=True) if all_widgets else None def fetch_monitor_data(self): """获取监控数据""" payload = { "api_key": "6694d3c4fcb69ca9a111a6c4", "entry_id": "6850c044f17c934b3ec01fea" } data = self.api.api.entry_data_list(payload).get("data") data_list = pd.DataFrame(data) # 处理复杂数据类型 for col in data_list.columns: if data_list[col].apply(lambda x: isinstance(x, (dict, list))).any(): data_list[col] = data_list[col].astype(str) return data_list.drop_duplicates() def match_widgets(self, data_list, widget_list): """匹配数据列表和组件列表""" if '_widget_1750122565203' not in data_list.columns: raise ValueError("Missing required column '_widget_1750122565203'") return widget_list[widget_list['entry_id'].isin(data_list['_widget_1750122565203'])] def archive_old_data(self): """归档旧数据""" keep_dates = [ (datetime.now(timezone.utc) - timedelta(days=i)).strftime("%Y-%m-%d") for i in range(Config.RETAIN_DAYS) ] # 查找需要归档的文件 files_to_archive = [ f for f in self.data_dir.iterdir() if f.is_file() and (f.name.startswith("snapshot_") or f.name.startswith("all_widgets_")) and f.suffix == '.csv' ] for file_path in files_to_archive: date_str = file_path.stem[9:] if file_path.name.startswith("snapshot_") else file_path.stem[12:] if date_str not in keep_dates: year_month = date_str[:7] archive_path = self.archive_dir / f"snapshots_{year_month}.{Config.COMPRESS_FORMAT}" try: with zipfile.ZipFile(archive_path, 'a', zipfile.ZIP_DEFLATED) as zipf: zipf.write(file_path, arcname=file_path.name) file_path.unlink() logger.debug(f"Archived {file_path.name} to {archive_path}") except Exception as e: error_task_logger.error(f"Failed to archive {file_path}: {str(e)}") def compare_data(self, current_data): """比较新旧数据差异""" if not self.last_data_file.exists(): return None last_data = pd.read_csv(self.last_data_file) last_data['unique_id'] = last_data['name'].astype(str) + last_data['app_id'].astype(str) current_data['unique_id'] = current_data['name'].astype(str) + current_data['app_id'].astype(str) merged = pd.merge( last_data, current_data, on=['unique_id'], how='outer', suffixes=('_last', '_current'), indicator=True ) changes = { 'added': merged[merged['_merge'] == 'right_only'], 'deleted': merged[merged['_merge'] == 'left_only'], 'modified': pd.DataFrame() } for col in ['label', 'type']: last_col = f"{col}_last" current_col = f"{col}_current" if last_col in merged.columns and current_col in merged.columns: mask = (merged['_merge'] == 'both') & (merged[last_col] != merged[current_col]) mask = mask & ~merged[last_col].isna() & ~merged[current_col].isna() if mask.any(): modified = merged.loc[mask].copy() modified['changed_field'] = col modified['old_value'] = modified[last_col] modified['new_value'] = modified[current_col] modified['change_status'] = 'update' changes['modified'] = pd.concat([changes['modified'], modified]) return changes def save_changes(self, changes, apps, entries): """保存变更记录""" result_rows = [] for change_type in ['added', 'deleted', 'modified']: suffix = 'current' if change_type in ['added', 'modified'] else 'last' for _, row in changes[change_type].iterrows(): app_id = row[f'app_id_{suffix}'] entry_id = row[f'entry_id_{suffix}'] app_name = apps.loc[apps['app_id'] == app_id, 'name'].values[0] if not apps[ apps['app_id'] == app_id].empty else 'Unknown App' entry_name = entries.loc[ (entries['app_id'] == app_id) & (entries['entry_id'] == entry_id), 'name' ].values[0] if not entries[ (entries['app_id'] == app_id) & (entries['entry_id'] == entry_id) ].empty else 'Unknown Entry' if change_type == 'added': content = f"Added field: {row['label_current']}" elif change_type == 'deleted': content = f"Deleted field: {row['label_last']}" else: content = f"Changed from \"{row['old_value']}\" to \"{row['new_value']}\"" result_rows.append({ 'timestamp': self.execution_time, 'unique_id': row['unique_id'], 'app_id': app_id, 'app_name': app_name, 'entry_id': entry_id, 'entry_name': entry_name, 'change_type': change_type, 'details': content }) if result_rows: result_df = pd.DataFrame(result_rows) changes_file = Config.get_changes_file() try: # 追加模式写入,保留历史记录 result_df.to_csv( changes_file, mode='a', header=not changes_file.exists(), index=False, encoding='utf-8-sig' ) self.add_to_jiandaoyun(result_df) return True except Exception as e: error_task_logger.error(f"Failed to save changes: {str(e)}") return False return False def add_to_jiandaoyun(self, result_df): """将变更记录写入简道云""" all_data = [{ "_widget_1751446961315": {"value": row["app_name"]}, "_widget_1751446961316": {"value": row["entry_name"]}, "_widget_1751446961317": {"value": row["change_type"]}, "_widget_1751446961318": {"value": row["details"]}, "_widget_1751446961319": {"value": row["timestamp"]}, } for _, row in result_df.iterrows()] payload = { "api_key": "6694d3c4fcb69ca9a111a6c4", "entry_id": "6863a402a77925690a470cc5", "data_list": all_data } response = self.api.api.entry_data_batch_create(payload) if isinstance(response, list): logger.info(f"Successfully wrote {len(response)} records to Jiandaoyun") return True else: error_task_logger.error(f"Failed to write to Jiandaoyun: {response.get('message', 'Unknown error')}") return False def run_daily_snapshot(self): """执行每日快照任务""" logger.info("=== Starting daily snapshot task ===") try: apps = self.fetch_apps() entries = self.fetch_entries(apps) widgets = self.fetch_widgets(entries) monitor_data = self.fetch_monitor_data() matched_data = self.match_widgets(monitor_data, widgets) # 保存数据 today_file = self.data_dir / f"snapshot_{self.today}.csv" widget_file = self.data_dir / f"all_widgets_{self.today}.csv" if not Utils.safe_csv_write(matched_data, today_file): raise Exception("Failed to save snapshot data") if not Utils.safe_csv_write(widgets, widget_file): raise Exception("Failed to save widget data") self.archive_old_data() self.save_last_data(matched_data, widgets) logger.info("=== Daily snapshot task completed successfully ===") return True except Exception as e: error_task_logger.error(f"Daily snapshot task failed: {str(e)}") return False def run_hourly_check(self): """执行每小时检查任务""" logger.info("=== Starting hourly check task ===") try: apps = self.fetch_apps() entries = self.fetch_entries(apps) widgets = self.fetch_widgets(entries) monitor_data = self.fetch_monitor_data() current_data = self.match_widgets(monitor_data, widgets) changes = self.compare_data(current_data) if changes and any(len(v) > 0 for v in changes.values()): self.save_changes(changes, apps, entries) self.save_last_data(current_data, widgets) logger.info("=== Hourly check task completed successfully ===") return True except Exception as e: error_task_logger.error(f"Hourly check task failed: {str(e)}") return False def main(self): """主运行逻辑""" task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: logger.info(f"=== Starting data monitoring task ({self.execution_time}) ===") if Utils.is_first_run_today(): success = self.run_daily_snapshot() else: success = self.run_hourly_check() common_tools.send_task_status(task_start_time, "字段监控") logger.info("=== Data monitoring task completed ===") return success except Exception as e: error_task_logger.error(f"Data monitoring task failed: {e}") common_tools.send_task_error(task_start_time, "字段监控", str(e)) return False if __name__ == "__main__": # 确保输出目录存在 Utils.ensure_dir(Config.OUTPUT_DIR) # 运行监控任务 monitor = DataMonitor() if not monitor.main(): sys.exit(1)