4.18备份

This commit is contained in:
2026-04-18 09:22:23 +08:00
parent cc445c405a
commit cc4148b67d
20 changed files with 13949 additions and 543 deletions
+100
View File
@@ -0,0 +1,100 @@
# 可以引用一些第三方库.
import requests
import time
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
version = str(triggerConf.get('_widget_17758020150661'))
research_leve1 = str(triggerConf.get('_widget_17758020157781'))
research_leve2 =str(triggerConf.get('_widget_17758020169011'))
# 简道云 API 调用 TokenBearer 形式)
api_key="675b900991ad2491c69389ca"
# 简道云 API 通用请求头
entry_id ="69c0affbdef38560ddc3d0d0"
API_TOKEN = "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN"
HEADERS = {'Authorization': API_TOKEN, 'Content-Type': 'application/json'}
def entry_data_list(
data: Dict[str, Any],
max_retries: int = 5,
# 获取多条表单数据(自动分页:用上一页最后一条的 _id 作为 data_id
limit: int = 90,
timeout: int = 10,
) -> Dict[str, List[Dict[str, Any]]]:
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
all_rows: List[Dict[str, Any]] = []
last_data_id: Optional[str] = None
while True:
payload: Dict[str, Any] = {
'app_id': data['api_key'],
'entry_id': data['entry_id'],
'limit': limit,
'data_id': last_data_id,
}
if data.get('filter') is not None:
payload['filter'] = data['filter']
body: Optional[Dict[str, Any]] = None
for attempt in range(max_retries + 1):
try:
res = requests.post(url=url, json=payload, headers=HEADERS, timeout=timeout)
res.raise_for_status()
# 网络抖动/超时等异常,按最大重试次数重试
body = res.json()
break
except requests.RequestException:
if attempt >= max_retries:
raise
time.sleep(0.5)
batch = (body or {}).get('data') or []
if not batch:
# 下一页从本页最后一条数据之后继续拉取
break
all_rows.extend(batch)
last_data_id = batch[-1].get('_id')
return {'data': all_rows}
filter_not_empty = {
'rel': 'and',
'cond': [
{
'field': '_widget_1774235645167',
'type': 'text',
'method': 'eq',
'value': [version],
},
{
'field': '_widget_1774235645172',
'type': 'text',
'method': 'eq',
'value': [research_leve1],
}
],
}
# 拉取满足过滤条件的所有数据
result = entry_data_list({'api_key': api_key, 'entry_id': entry_id, 'filter': filter_not_empty})
data_list = result.get("data")
all_data = []
for data in data_list :
# 先检查字段是否存在,避免KeyError
if "_widget_1774235645156" in data and "_widget_1774235645158" in data:
if research_leve2 in data["_widget_1774235645156"]:
item = data["_widget_1774235645158"]
if isinstance(item, list):
all_data.extend(item)
else:
all_data.append(item)
all_data = list(set(all_data))
all_data = ["业绩管理","AI海报"]
return {
"all_data": ["业绩管理","AI海报"],
"other":all_data
}
+116
View File
@@ -0,0 +1,116 @@
# 可以引用一些第三方库.
import requests
import time
import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
version = str(triggerConf.get('_widget_17758020150661'))
research_leve1 = triggerConf.get('_widget_17758020157781')
research_leve2 = triggerConf.get('_widget_17758020169011')
# 统一处理可能是JSON字符串或者原生list的情况
def parse_list_value(value):
if isinstance(value, list):
return value
try:
# 尝试解析JSON数组字符串
return json.loads(value)
except (json.JSONDecodeError, TypeError):
# 解析失败就返回单个元素的列表
return [str(value)]
research_leve1_list = parse_list_value(research_leve1)
research_leve2_list = parse_list_value(research_leve2)
# 简道云 API 调用 TokenBearer 形式)
api_key="675b900991ad2491c69389ca"
# 简道云 API 通用请求头
entry_id ="69c0affbdef38560ddc3d0d0"
API_TOKEN = "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN"
HEADERS = {'Authorization': API_TOKEN, 'Content-Type': 'application/json'}
def entry_data_list(
data: Dict[str, Any],
max_retries: int = 5,
# 获取多条表单数据(自动分页:用上一页最后一条的 _id 作为 data_id
limit: int = 90,
timeout: int = 10,
) -> Dict[str, List[Dict[str, Any]]]:
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
all_rows: List[Dict[str, Any]] = []
last_data_id: Optional[str] = None
while True:
payload: Dict[str, Any] = {
'app_id': data['api_key'],
'entry_id': data['entry_id'],
'limit': limit,
'data_id': last_data_id,
}
if data.get('filter') is not None:
payload['filter'] = data['filter']
body: Optional[Dict[str, Any]] = None
for attempt in range(max_retries + 1):
try:
res = requests.post(url=url, json=payload, headers=HEADERS, timeout=timeout)
res.raise_for_status()
# 网络抖动/超时等异常,按最大重试次数重试
body = res.json()
break
except requests.RequestException:
if attempt >= max_retries:
raise
time.sleep(0.5)
batch = (body or {}).get('data') or []
if not batch:
# 下一页从本页最后一条数据之后继续拉取
break
all_rows.extend(batch)
last_data_id = batch[-1].get('_id')
return {'data': all_rows}
filter_not_empty = {
'rel': 'and',
'cond': [
{
'field': '_widget_1774235645167',
'type': 'text',
'method': 'eq',
'value': [version],
},
{
'field': '_widget_1774235645172',
'type': 'text',
'method': 'eq',
'value': research_leve1_list,
}
],
}
# 拉取满足过滤条件的所有数据
result = entry_data_list({'api_key': api_key, 'entry_id': entry_id, 'filter': filter_not_empty})
data_list = result.get("data")
all_data = []
for data in data_list :
# 先检查字段是否存在,避免KeyError
if "_widget_1774235645156" in data and "_widget_1774235645158" in data:
field_value = data["_widget_1774235645156"]
# 判断两个list是否有交集(只要有一个共同元素就算匹配)
if any(item in field_value for item in research_leve2_list):
item = data["_widget_1774235645158"]
if isinstance(item, list):
all_data.extend(item)
else:
all_data.append(item)
all_data = list(set(all_data))
return {
"all_data": ["业绩管理","AI海报"],
"other": all_data
}