Files
2026-04-18 09:22:23 +08:00

117 lines
3.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 可以引用一些第三方库.
import requests
import time
import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
version = str(triggerConf.get('_widget_17758020150661'))
research_leve1 = triggerConf.get('_widget_17758020157781')
research_leve2 = triggerConf.get('_widget_17758020169011')
# 统一处理可能是JSON字符串或者原生list的情况
def parse_list_value(value):
if isinstance(value, list):
return value
try:
# 尝试解析JSON数组字符串
return json.loads(value)
except (json.JSONDecodeError, TypeError):
# 解析失败就返回单个元素的列表
return [str(value)]
research_leve1_list = parse_list_value(research_leve1)
research_leve2_list = parse_list_value(research_leve2)
# 简道云 API 调用 TokenBearer 形式)
api_key="675b900991ad2491c69389ca"
# 简道云 API 通用请求头
entry_id ="69c0affbdef38560ddc3d0d0"
API_TOKEN = "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN"
HEADERS = {'Authorization': API_TOKEN, 'Content-Type': 'application/json'}
def entry_data_list(
data: Dict[str, Any],
max_retries: int = 5,
# 获取多条表单数据(自动分页:用上一页最后一条的 _id 作为 data_id
limit: int = 90,
timeout: int = 10,
) -> Dict[str, List[Dict[str, Any]]]:
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
all_rows: List[Dict[str, Any]] = []
last_data_id: Optional[str] = None
while True:
payload: Dict[str, Any] = {
'app_id': data['api_key'],
'entry_id': data['entry_id'],
'limit': limit,
'data_id': last_data_id,
}
if data.get('filter') is not None:
payload['filter'] = data['filter']
body: Optional[Dict[str, Any]] = None
for attempt in range(max_retries + 1):
try:
res = requests.post(url=url, json=payload, headers=HEADERS, timeout=timeout)
res.raise_for_status()
# 网络抖动/超时等异常,按最大重试次数重试
body = res.json()
break
except requests.RequestException:
if attempt >= max_retries:
raise
time.sleep(0.5)
batch = (body or {}).get('data') or []
if not batch:
# 下一页从本页最后一条数据之后继续拉取
break
all_rows.extend(batch)
last_data_id = batch[-1].get('_id')
return {'data': all_rows}
filter_not_empty = {
'rel': 'and',
'cond': [
{
'field': '_widget_1774235645167',
'type': 'text',
'method': 'eq',
'value': [version],
},
{
'field': '_widget_1774235645172',
'type': 'text',
'method': 'eq',
'value': research_leve1_list,
}
],
}
# 拉取满足过滤条件的所有数据
result = entry_data_list({'api_key': api_key, 'entry_id': entry_id, 'filter': filter_not_empty})
data_list = result.get("data")
all_data = []
for data in data_list :
# 先检查字段是否存在,避免KeyError
if "_widget_1774235645156" in data and "_widget_1774235645158" in data:
field_value = data["_widget_1774235645156"]
# 判断两个list是否有交集(只要有一个共同元素就算匹配)
if any(item in field_value for item in research_leve2_list):
item = data["_widget_1774235645158"]
if isinstance(item, list):
all_data.extend(item)
else:
all_data.append(item)
all_data = list(set(all_data))
return {
"all_data": ["业绩管理","AI海报"],
"other": all_data
}