简道云成员id与字段监控分离

This commit is contained in:
z66
2025-08-28 11:03:46 +08:00
parent 5879eb7842
commit 2840d4871a
6 changed files with 951 additions and 169 deletions
+321 -123
View File
@@ -1,21 +1,40 @@
"""字段监控(多平台适配版)"""
import sys
import platform
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pandas as pd
import zipfile
import json
from datetime import datetime, timezone, timedelta, date
import requests
from typing import Optional, List, Dict, Any
from decimal import Decimal
import time
from api import API
from log_config import configure_task_logger, configure_error_task_logger
from back_ground_module import CommonModule
import numpy as np
import json
def replace_decimals(obj):
"""替换Decimal类型为float"""
if isinstance(obj, dict):
return {k: replace_decimals(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [replace_decimals(item) for item in obj]
elif isinstance(obj, Decimal):
return float(obj)
return obj
class NpEncoder(json.JSONEncoder):
"""NumPy类型JSON编码器"""
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(NpEncoder, self).default(obj)
# 初始化日志记录器
logger = configure_task_logger()
error_task_logger = configure_error_task_logger()
common_tools = CommonModule()
# ---------------------------- 配置项(多平台适配)---------------------------
class Config:
@@ -27,12 +46,21 @@ class Config:
DATA_DIR = OUTPUT_DIR / "data_snapshots"
ARCHIVE_DIR = OUTPUT_DIR / "archives"
# API配置
JIANDAOYUN_API_TOKEN = "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN"
# 运行参数
RETAIN_DAYS = 7
COMPRESS_FORMAT = "zip"
MAX_RETRIES = 3
RETRY_DELAY = 0.5 # 秒
# 监控表单配置
MONITOR_APP_ID = "6694d3c4fcb69ca9a111a6c4"
MONITOR_ENTRY_ID = "6850c044f17c934b3ec01fea"
CHANGES_ENTRY_ID = "6863a402a77925690a470cc5"
STATUS_ENTRY_ID = "67ede908eb9c22261016466e"
@classmethod
def get_log_file(cls):
"""获取跨平台兼容的日志文件路径"""
@@ -43,6 +71,7 @@ class Config:
"""获取跨平台兼容的变更记录文件路径"""
return cls.OUTPUT_DIR / "changes_summary.csv"
# ---------------------- 工具函数(多平台兼容)-----------------------
class Utils:
@staticmethod
@@ -51,10 +80,9 @@ class Utils:
path = Path(path) if not isinstance(path, Path) else path
try:
path.mkdir(parents=True, exist_ok=True)
logger.debug(f"Directory ensured: {path}")
return True
except Exception as e:
error_task_logger.error(f"Failed to create directory {path}: {str(e)}")
print(f"创建目录失败: {e}")
return False
@staticmethod
@@ -83,19 +111,19 @@ class Utils:
temp_file.rename(file_path)
return True
except Exception as e:
error_task_logger.error(f"Failed to write {file_path}: {str(e)}")
print(f"CSV写入失败: {e}")
if temp_file.exists():
temp_file.unlink()
return False
# ---------------------- API客户端(多平台兼容)-----------------------
class APIClient:
def __init__(self):
self.headers = {
'Authorization': 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN',
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
self.api = API()
def request(self, url, payload, method='POST'):
"""带重试机制的API请求"""
@@ -113,7 +141,106 @@ class APIClient:
if retry == Config.MAX_RETRIES:
raise
time.sleep(Config.RETRY_DELAY)
logger.warning(f"Request failed (attempt {retry + 1}/{Config.MAX_RETRIES}): {str(e)}")
def data_batch_create(self, data: dict, max_retries: int = 20) -> Optional[Dict]:
"""新建单条表单数据"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/create'
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data": data['data'],
"is_start_workflow": data.get('is_start_workflow', "false"),
"is_start_trigger": data.get('is_start_trigger', "false"),
"transaction_id": data.get('transaction_id', "")
})
for retry in range(max_retries + 1):
try:
response = requests.post(url=url, data=payload, headers=self.headers, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if retry == max_retries:
print(f"创建数据失败: {e}")
return None
time.sleep(3)
def entry_data_list(self, data: dict, max_retries: int = 20) -> Dict:
"""获取多条表单数据"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
all_data_batches = []
last_data_id = None
while True:
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"limit": 100,
"data_id": last_data_id
})
for retry in range(max_retries + 1):
try:
response = requests.post(url=url, data=payload, headers=self.headers, timeout=10)
response.raise_for_status()
data_get = response.json()
if data_get.get("data"):
all_data_batches.extend(data_get['data'])
last_data_id = data_get['data'][-1].get('_id')
break
else:
return {"data": all_data_batches}
except requests.exceptions.RequestException as e:
if retry == max_retries:
print(f"获取数据列表失败: {e}")
return {"data": all_data_batches}
time.sleep(0.1)
if not data_get.get("data") or len(data_get['data']) < 100:
break
return {"data": all_data_batches}
def entry_data_batch_create(self, data: dict, chunk_size: int = 90, max_retries: int = 20) -> List[Optional[Dict]]:
"""新建多条数据"""
data = replace_decimals(data)
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_create'
data_get_list = []
total_length = len(data['data_list'])
num_chunks = (total_length + chunk_size - 1) // chunk_size
for i in range(num_chunks):
start_index = i * chunk_size
end_index = min(start_index + chunk_size, total_length)
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data_list": data['data_list'][start_index:end_index],
"is_start_workflow": data.get('is_start_workflow', "false"),
"is_start_trigger": data.get('is_start_trigger', "false"),
}, cls=NpEncoder)
for retry in range(max_retries + 1):
try:
response = requests.post(url=url, data=payload, headers=self.headers, timeout=10)
response.raise_for_status()
data_get = response.json()
if data_get.get("status") == "success":
data_get_list.append(data_get)
break
except requests.exceptions.RequestException as e:
if retry == max_retries:
print(f"批量创建数据失败: {e}")
data_get_list.append(None)
time.sleep(0.1)
return data_get_list
# ---------------------- 数据处理基类 -----------------------
class DataHandler:
@@ -141,7 +268,7 @@ class DataHandler:
last_widget = pd.read_csv(self.last_widget_file) if self.last_widget_file.exists() else None
return last_data, last_widget
except Exception as e:
error_task_logger.error(f"Failed to load last data: {str(e)}")
print(f"加载上次数据失败: {e}")
return None, None
def save_last_data(self, data, widget_data):
@@ -151,9 +278,16 @@ class DataHandler:
success &= Utils.safe_csv_write(widget_data, self.last_widget_file)
return success
except Exception as e:
error_task_logger.error(f"Failed to save current data: {str(e)}")
print(f"保存数据失败: {e}")
return False
def field_replacement(self, data, final_data):
"""字段替换方法(由id替换为标签名)"""
# 这里实现具体的字段替换逻辑
# 由于具体替换规则未知,返回原始数据
return final_data
# ---------------------- 数据监控主类 -----------------------
class DataMonitor(DataHandler):
def __init__(self):
@@ -164,8 +298,12 @@ class DataMonitor(DataHandler):
"""获取所有应用列表"""
url = "https://api.jiandaoyun.com/api/v5/app/list"
payload = json.dumps({"skip": 0, "limit": 100})
response = self.api.request(url, payload)
return pd.DataFrame(response.json().get("apps", []))
try:
response = self.api.request(url, payload)
return pd.DataFrame(response.json().get("apps", []))
except Exception as e:
print(f"获取应用列表失败: {e}")
return pd.DataFrame()
def fetch_entries(self, app_df):
"""获取所有表单条目"""
@@ -174,15 +312,19 @@ class DataMonitor(DataHandler):
for _, app in app_df.iterrows():
payload = json.dumps({"app_id": app['app_id']})
response = self.api.request(url, payload)
entries = response.json().get("forms", [])
try:
response = self.api.request(url, payload)
entries = response.json().get("forms", [])
if entries:
entry_df = pd.DataFrame(entries)
entry_df['app_id'] = app['app_id']
all_entries.append(entry_df)
if entries:
entry_df = pd.DataFrame(entries)
entry_df['app_id'] = app['app_id']
all_entries.append(entry_df)
except Exception as e:
print(f"获取表单条目失败: {e}")
continue
return pd.concat(all_entries, ignore_index=True) if all_entries else None
return pd.concat(all_entries, ignore_index=True) if all_entries else pd.DataFrame()
def fetch_widgets(self, entry_df):
"""获取所有字段组件"""
@@ -194,37 +336,50 @@ class DataMonitor(DataHandler):
"app_id": entry['app_id'],
"entry_id": entry['entry_id']
})
response = self.api.request(url, payload)
widgets = response.json().get('widgets', [])
try:
response = self.api.request(url, payload)
widgets = response.json().get('widgets', [])
if widgets:
widget_df = pd.DataFrame(widgets)
widget_df['app_id'] = entry['app_id']
widget_df['entry_id'] = entry['entry_id']
all_widgets.append(widget_df)
if widgets:
widget_df = pd.DataFrame(widgets)
widget_df['app_id'] = entry['app_id']
widget_df['entry_id'] = entry['entry_id']
all_widgets.append(widget_df)
except Exception as e:
print(f"获取字段组件失败: {e}")
continue
return pd.concat(all_widgets, ignore_index=True) if all_widgets else None
return pd.concat(all_widgets, ignore_index=True) if all_widgets else pd.DataFrame()
def fetch_monitor_data(self):
"""获取监控数据"""
payload = {
"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "6850c044f17c934b3ec01fea"
"api_key": Config.MONITOR_APP_ID,
"entry_id": Config.MONITOR_ENTRY_ID
}
data = self.api.api.entry_data_list(payload).get("data")
data_list = pd.DataFrame(data)
try:
data = self.api.entry_data_list(payload).get("data", [])
data_list = pd.DataFrame(data)
# 处理复杂数据类型
for col in data_list.columns:
if data_list[col].apply(lambda x: isinstance(x, (dict, list))).any():
data_list[col] = data_list[col].astype(str)
# 处理复杂数据类型
for col in data_list.columns:
if data_list[col].apply(lambda x: isinstance(x, (dict, list))).any():
data_list[col] = data_list[col].astype(str)
return data_list.drop_duplicates()
return data_list.drop_duplicates()
except Exception as e:
print(f"获取监控数据失败: {e}")
return pd.DataFrame()
def match_widgets(self, data_list, widget_list):
"""匹配数据列表和组件列表"""
if '_widget_1750122565203' not in data_list.columns:
raise ValueError("Missing required column '_widget_1750122565203'")
print("缺少必需的列 '_widget_1750122565203'")
return pd.DataFrame()
if widget_list.empty:
return pd.DataFrame()
return widget_list[widget_list['entry_id'].isin(data_list['_widget_1750122565203'])]
def archive_old_data(self):
@@ -234,12 +389,11 @@ class DataMonitor(DataHandler):
for i in range(Config.RETAIN_DAYS)
]
# 查找需要归档的文件
files_to_archive = [
f for f in self.data_dir.iterdir()
if f.is_file() and
(f.name.startswith("snapshot_") or f.name.startswith("all_widgets_")) and
f.suffix == '.csv'
(f.name.startswith("snapshot_") or f.name.startswith("all_widgets_")) and
f.suffix == '.csv'
]
for file_path in files_to_archive:
@@ -253,80 +407,101 @@ class DataMonitor(DataHandler):
with zipfile.ZipFile(archive_path, 'a', zipfile.ZIP_DEFLATED) as zipf:
zipf.write(file_path, arcname=file_path.name)
file_path.unlink()
logger.debug(f"Archived {file_path.name} to {archive_path}")
except Exception as e:
error_task_logger.error(f"Failed to archive {file_path}: {str(e)}")
print(f"归档文件失败: {e}")
def compare_data(self, current_data):
"""比较新旧数据差异"""
if not self.last_data_file.exists():
if not self.last_data_file.exists() or current_data.empty:
return None
last_data = pd.read_csv(self.last_data_file)
last_data['unique_id'] = last_data['name'].astype(str) + last_data['app_id'].astype(str)
current_data['unique_id'] = current_data['name'].astype(str) + current_data['app_id'].astype(str)
try:
last_data = pd.read_csv(self.last_data_file)
if last_data.empty:
return None
merged = pd.merge(
last_data, current_data,
on=['unique_id'],
how='outer',
suffixes=('_last', '_current'),
indicator=True
)
last_data['unique_id'] = last_data['name'].astype(str) + last_data['app_id'].astype(str)
current_data['unique_id'] = current_data['name'].astype(str) + current_data['app_id'].astype(str)
changes = {
'added': merged[merged['_merge'] == 'right_only'],
'deleted': merged[merged['_merge'] == 'left_only'],
'modified': pd.DataFrame()
}
merged = pd.merge(
last_data, current_data,
on=['unique_id'],
how='outer',
suffixes=('_last', '_current'),
indicator=True
)
for col in ['label', 'type']:
last_col = f"{col}_last"
current_col = f"{col}_current"
changes = {
'added': merged[merged['_merge'] == 'right_only'],
'deleted': merged[merged['_merge'] == 'left_only'],
'modified': pd.DataFrame()
}
if last_col in merged.columns and current_col in merged.columns:
mask = (merged['_merge'] == 'both') & (merged[last_col] != merged[current_col])
mask = mask & ~merged[last_col].isna() & ~merged[current_col].isna()
for col in ['label', 'type']:
last_col = f"{col}_last"
current_col = f"{col}_current"
if mask.any():
modified = merged.loc[mask].copy()
modified['changed_field'] = col
modified['old_value'] = modified[last_col]
modified['new_value'] = modified[current_col]
modified['change_status'] = 'update'
changes['modified'] = pd.concat([changes['modified'], modified])
if last_col in merged.columns and current_col in merged.columns:
mask = (merged['_merge'] == 'both') & (merged[last_col] != merged[current_col])
mask = mask & ~merged[last_col].isna() & ~merged[current_col].isna()
return changes
if mask.any():
modified = merged.loc[mask].copy()
modified['changed_field'] = col
modified['old_value'] = modified[last_col]
modified['new_value'] = modified[current_col]
modified['change_status'] = 'update'
changes['modified'] = pd.concat([changes['modified'], modified])
return changes
except Exception as e:
print(f"比较数据失败: {e}")
return None
def save_changes(self, changes, apps, entries):
"""保存变更记录"""
if not changes or all(len(v) == 0 for v in changes.values()):
return False
result_rows = []
for change_type in ['added', 'deleted', 'modified']:
df = changes[change_type]
if df.empty:
continue
suffix = 'current' if change_type in ['added', 'modified'] else 'last'
for _, row in changes[change_type].iterrows():
app_id = row[f'app_id_{suffix}']
entry_id = row[f'entry_id_{suffix}']
for _, row in df.iterrows():
app_id = row.get(f'app_id_{suffix}')
entry_id = row.get(f'entry_id_{suffix}')
app_name = apps.loc[apps['app_id'] == app_id, 'name'].values[0] if not apps[
apps['app_id'] == app_id].empty else 'Unknown App'
entry_name = entries.loc[
(entries['app_id'] == app_id) & (entries['entry_id'] == entry_id), 'name'
].values[0] if not entries[
(entries['app_id'] == app_id) & (entries['entry_id'] == entry_id)
].empty else 'Unknown Entry'
if not app_id or not entry_id:
continue
app_name = 'Unknown App'
entry_name = 'Unknown Entry'
if not apps.empty:
app_match = apps[apps['app_id'] == app_id]
if not app_match.empty:
app_name = app_match['name'].values[0]
if not entries.empty:
entry_match = entries[(entries['app_id'] == app_id) & (entries['entry_id'] == entry_id)]
if not entry_match.empty:
entry_name = entry_match['name'].values[0]
if change_type == 'added':
content = f"Added field: {row['label_current']}"
content = f"Added field: {row.get('label_current', 'Unknown')}"
elif change_type == 'deleted':
content = f"Deleted field: {row['label_last']}"
content = f"Deleted field: {row.get('label_last', 'Unknown')}"
else:
content = f"Changed from \"{row['old_value']}\" to \"{row['new_value']}\""
content = f"Changed from \"{row.get('old_value', 'Unknown')}\" to \"{row.get('new_value', 'Unknown')}\""
result_rows.append({
'timestamp': self.execution_time,
'unique_id': row['unique_id'],
'unique_id': row.get('unique_id', ''),
'app_id': app_id,
'app_name': app_name,
'entry_id': entry_id,
@@ -339,7 +514,6 @@ class DataMonitor(DataHandler):
result_df = pd.DataFrame(result_rows)
changes_file = Config.get_changes_file()
try:
# 追加模式写入,保留历史记录
result_df.to_csv(
changes_file,
mode='a',
@@ -350,7 +524,7 @@ class DataMonitor(DataHandler):
self.add_to_jiandaoyun(result_df)
return True
except Exception as e:
error_task_logger.error(f"Failed to save changes: {str(e)}")
print(f"保存变更记录失败: {e}")
return False
return False
@@ -365,24 +539,51 @@ class DataMonitor(DataHandler):
} for _, row in result_df.iterrows()]
payload = {
"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "6863a402a77925690a470cc5",
"api_key": Config.MONITOR_APP_ID,
"entry_id": Config.CHANGES_ENTRY_ID,
"data_list": all_data
}
response = self.api.api.entry_data_batch_create(payload)
if isinstance(response, list):
logger.info(f"Successfully wrote {len(response)} records to Jiandaoyun")
return True
else:
error_task_logger.error(f"Failed to write to Jiandaoyun: {response.get('message', 'Unknown error')}")
try:
response = self.api.entry_data_batch_create(payload)
return isinstance(response, list) and len(response) > 0
except Exception as e:
print(f"写入简道云失败: {e}")
return False
def send_task_status(self, task_start_time: str, task_name: str) -> None:
"""将任务状态发送到简道云"""
try:
end_time_utc = datetime.now(timezone.utc)
task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S")
task_start_utc = task_start_naive - timedelta(hours=8)
task_start_utc = task_start_utc.replace(tzinfo=timezone.utc)
run_time = end_time_utc - task_start_utc
run_time_sec = int(run_time.total_seconds())
today_utc = end_time_utc.strftime("%Y-%m-%d")
task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
payload = {
"api_key": Config.MONITOR_APP_ID,
"entry_id": Config.STATUS_ENTRY_ID,
"data": {
"_widget_1744873387500": {"value": today_utc},
"_widget_1743644977694": {"value": task_name},
"_widget_1744873387501": {"value": task_start_iso},
"_widget_1744873387502": {"value": task_end_iso},
"_widget_1744873387504": {"value": run_time_sec},
}
}
self.api.data_batch_create(payload)
except Exception as e:
print(f"发送任务状态失败: {e}")
def run_daily_snapshot(self):
"""执行每日快照任务"""
logger.info("=== Starting daily snapshot task ===")
try:
apps = self.fetch_apps()
entries = self.fetch_entries(apps)
@@ -390,28 +591,30 @@ class DataMonitor(DataHandler):
monitor_data = self.fetch_monitor_data()
matched_data = self.match_widgets(monitor_data, widgets)
# 保存数据
if matched_data.empty:
print("没有匹配到数据")
return False
today_file = self.data_dir / f"snapshot_{self.today}.csv"
widget_file = self.data_dir / f"all_widgets_{self.today}.csv"
if not Utils.safe_csv_write(matched_data, today_file):
raise Exception("Failed to save snapshot data")
print("保存快照数据失败")
return False
if not Utils.safe_csv_write(widgets, widget_file):
raise Exception("Failed to save widget data")
print("保存组件数据失败")
return False
self.archive_old_data()
self.save_last_data(matched_data, widgets)
logger.info("=== Daily snapshot task completed successfully ===")
return True
except Exception as e:
error_task_logger.error(f"Daily snapshot task failed: {str(e)}")
print(f"每日快照任务失败: {e}")
return False
def run_hourly_check(self):
"""执行每小时检查任务"""
logger.info("=== Starting hourly check task ===")
try:
apps = self.fetch_apps()
entries = self.fetch_entries(apps)
@@ -425,32 +628,27 @@ class DataMonitor(DataHandler):
self.save_last_data(current_data, widgets)
logger.info("=== Hourly check task completed successfully ===")
return True
except Exception as e:
error_task_logger.error(f"Hourly check task failed: {str(e)}")
print(f"每小时检查任务失败: {e}")
return False
def main(self):
"""主运行逻辑"""
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
logger.info(f"=== Starting data monitoring task ({self.execution_time}) ===")
if Utils.is_first_run_today():
success = self.run_daily_snapshot()
else:
success = self.run_hourly_check()
common_tools.send_task_status(task_start_time, "字段监控")
logger.info("=== Data monitoring task completed ===")
self.send_task_status(task_start_time, "字段监控")
return success
except Exception as e:
error_task_logger.error(f"Data monitoring task failed: {e}")
common_tools.send_task_error(task_start_time, "字段监控", str(e))
print(f"主运行逻辑失败: {e}")
return False
if __name__ == "__main__":
# 确保输出目录存在
Utils.ensure_dir(Config.OUTPUT_DIR)