499 lines
20 KiB
Python
499 lines
20 KiB
Python
from typing import Optional, List, Dict, Any
|
||
|
||
import requests
|
||
import json
|
||
import pandas as pd
|
||
from datetime import datetime, timezone, timedelta, date, UTC
|
||
import time
|
||
from decimal import Decimal
|
||
import numpy as np
|
||
import requests
|
||
import json
|
||
|
||
|
||
def replace_decimals(obj):
|
||
if isinstance(obj, dict):
|
||
return {k: replace_decimals(v) for k, v in obj.items()}
|
||
elif isinstance(obj, list):
|
||
return [replace_decimals(item) for item in obj]
|
||
elif isinstance(obj, Decimal):
|
||
return float(obj) # 或者 str(obj)
|
||
return obj
|
||
|
||
|
||
class NpEncoder(json.JSONEncoder):
|
||
def default(self, obj):
|
||
if isinstance(obj, np.integer):
|
||
return int(obj)
|
||
elif isinstance(obj, np.floating):
|
||
return float(obj)
|
||
elif isinstance(obj, np.ndarray):
|
||
return obj.tolist()
|
||
else:
|
||
return super(NpEncoder, self).default(obj)
|
||
|
||
|
||
class update_ID_form:
|
||
"""更新简道云员工ID表"""
|
||
|
||
def __init__(self):
|
||
self.headers = {
|
||
'Authorization': 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN', # 曹伟应用api测试 app_key
|
||
'Content-Type': 'application/json.json'
|
||
}
|
||
self.url = "https://api.jiandaoyun.com/api/v5/corp/department/user/list"
|
||
self.payload1 = {
|
||
"api_key": "6694d3c4fcb69ca9a111a6c4",
|
||
"entry_id": "6769204a1902c9341340a1bc",
|
||
}
|
||
self.delete_payload = {
|
||
"api_key": "6694d3c4fcb69ca9a111a6c4",
|
||
"entry_id": "6769204a1902c9341340a1bc",
|
||
}
|
||
self.update_payload = {
|
||
"api_key": "6694d3c4fcb69ca9a111a6c4",
|
||
"entry_id": "6769204a1902c9341340a1bc",
|
||
}
|
||
|
||
def get_department_members(self):
|
||
"""获取部门成员及ID表"""
|
||
try:
|
||
payload = json.dumps({
|
||
"dept_no": 1,
|
||
"has_child": True
|
||
})
|
||
response = requests.request("POST", self.url, headers=self.headers, data=payload)
|
||
search_department_member = response.json()
|
||
departments_members = search_department_member.get('users')
|
||
df1 = pd.DataFrame(departments_members)
|
||
return df1
|
||
except Exception as e:
|
||
return None
|
||
|
||
@staticmethod
|
||
def data_batch_create(data: dict, max_retries: int = 20) -> Optional[requests.Response]: # 新建单条数据
|
||
"""
|
||
新建单条表单数据
|
||
:param max_retries: 最大重试次数
|
||
:param data: 应该包含应用id、表单id,以及新建的数据data['data']
|
||
:return: 返回创建后简道云返回的信息
|
||
"""
|
||
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/create'
|
||
|
||
headers = {
|
||
'Authorization': 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN', # 曹伟应用api测试 app_key
|
||
'Content-Type': 'application/json.json'
|
||
}
|
||
|
||
"""
|
||
data 样式 # 后续优化发送数据样式 目前输入字段,后续优化输入表单名称
|
||
jiandaoyun_data['data'] = {"_widget_1731650067055":{"value":f'{username}{password}'},
|
||
"_widget_1731650067056":{"value": f"{group}"}}
|
||
"""
|
||
# noinspection DuplicatedCode
|
||
payload = json.dumps({
|
||
"app_id": data['api_key'], # 应用ID
|
||
"entry_id": data['entry_id'], # 表单ID
|
||
"data": data['data'],
|
||
"is_start_workflow": data.get('is_start_workflow', "false"),
|
||
"is_start_trigger": data.get('is_start_trigger', "false"),
|
||
"transaction_id": data.get('transaction_id', "")
|
||
}
|
||
)
|
||
|
||
retries = 0
|
||
while retries <= max_retries:
|
||
try:
|
||
res: requests.Response = requests.post(url=url, data=payload, headers=headers, timeout=10)
|
||
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
|
||
data_get = res.json()
|
||
if res.status_code == 200:
|
||
return data_get
|
||
else:
|
||
retries += 1
|
||
time.sleep(3) # 在重试之间稍作停顿
|
||
except requests.exceptions.RequestException as e:
|
||
retries += 1
|
||
time.sleep(3) # 在重试之间稍作停顿
|
||
if retries > max_retries:
|
||
return None
|
||
|
||
@staticmethod
|
||
def entry_widget_list(data: dict) -> Optional[Dict[str, Any]]: # 获取表单字段
|
||
"""
|
||
获取表单字段
|
||
:param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息
|
||
:return:
|
||
"""
|
||
|
||
url = 'https://api.jiandaoyun.com/api/v5/app/entry/widget/list'
|
||
|
||
headers = {
|
||
'Authorization': "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN", # 曹伟应用api测试 app_key
|
||
'Content-Type': 'application/json.json'
|
||
}
|
||
|
||
payload = json.dumps({
|
||
"app_id": data['api_key'],
|
||
"entry_id": data['entry_id'],
|
||
})
|
||
|
||
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
|
||
return res.json()
|
||
|
||
@staticmethod
|
||
def field_replacement(data: dict, data_get: dict) -> dict:
|
||
"""
|
||
字段替换,将id替换为标签名,即唯一值替换为表单中显示字段的名字
|
||
:param data: 简道云插件发送过来的data,包含表单id、数据id、应用id
|
||
:param data_get: 简道云请求的数据,一般是根据数据id获取到表单的数据
|
||
:return: 将根据数据id获取到的表单数据,进行替换,返回替换后的数据
|
||
"""
|
||
|
||
# 获取表单对应字段标签名称
|
||
widget_list = update_ID_form.entry_widget_list(data)
|
||
|
||
# 检查widget_list是否有效
|
||
if not widget_list or 'widgets' not in widget_list or not isinstance(widget_list['widgets'], list):
|
||
raise ValueError("映射表没有接受到数据")
|
||
|
||
# 创建一个映射表,将_widget_名称映射到label
|
||
name_to_label = {widget['name']: widget['label'] for widget in widget_list['widgets']}
|
||
|
||
def replace_keys(obj):
|
||
"""递归替换字典中的键名"""
|
||
if isinstance(obj, dict):
|
||
new_dict = {}
|
||
for key, value in obj.items():
|
||
new_key = name_to_label.get(key, key)
|
||
new_dict[new_key] = replace_keys(value)
|
||
return new_dict
|
||
elif isinstance(obj, list):
|
||
return [replace_keys(item) for item in obj]
|
||
else:
|
||
return obj
|
||
|
||
# 复制 data_get,避免修改原始数据
|
||
data_get_copy = json.loads(json.dumps(data_get)) # 深拷贝
|
||
|
||
# 替换 data 字段下的所有键
|
||
if 'data' in data_get_copy:
|
||
data_get_copy['data'] = replace_keys(data_get_copy['data'])
|
||
|
||
return data_get_copy
|
||
@staticmethod
|
||
def entry_data_list(data: dict, replace: bool = False, max_retries: int = 20) -> Dict: # 获取多条表单数据
|
||
"""
|
||
获取多条表单数据
|
||
:param max_retries: 最大重试次数
|
||
:param replace: 是否替换字段
|
||
:param data:
|
||
api_key: 应用id
|
||
entry_id: 表单id
|
||
:return:
|
||
"""
|
||
|
||
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
|
||
|
||
headers = {
|
||
'Authorization': 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN', # 曹伟应用api测试 app_key
|
||
'Content-Type': 'application/json.json'
|
||
}
|
||
all_data_batches = [] # 用于存储每次请求返回的数据批次
|
||
last_data_id = None
|
||
exit_flag = False
|
||
while True:
|
||
payload = json.dumps({
|
||
"app_id": data['api_key'], # 应用ID
|
||
"entry_id": data['entry_id'], # 表单ID
|
||
"limit": 100,
|
||
"data_id": last_data_id
|
||
})
|
||
retries = 0
|
||
while retries <= max_retries:
|
||
try:
|
||
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
|
||
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
|
||
data_get = res.json()
|
||
if data_get["data"]:
|
||
all_data_batches.extend(data_get['data'])
|
||
last_data_id = data_get['data'][-1].get('_id')
|
||
break # 成功则跳出循环
|
||
else:
|
||
if 'data' not in data_get or len(data_get['data']) == 0:
|
||
exit_flag = True
|
||
break
|
||
retries += 1
|
||
time.sleep(0.1) # 在重试之间稍作停顿
|
||
except requests.exceptions.RequestException as e:
|
||
retries += 1
|
||
time.sleep(0.1) # 在重试之间稍作停顿
|
||
if retries > max_retries:
|
||
all_data_batches.append(None) # 或者可以选择记录失败的payload以便后续处理
|
||
|
||
if exit_flag:
|
||
break
|
||
|
||
# 构建最终返回的字典
|
||
final_data = {
|
||
'data': all_data_batches # 'data' 键对应的值是列表的列表
|
||
}
|
||
if replace:
|
||
print("进行了替换")
|
||
return_data = update_ID_form.field_replacement(data, final_data) # 字段替换,由id替换为标签名
|
||
|
||
return return_data
|
||
else:
|
||
return final_data
|
||
|
||
def send_task_status(self, task_start_time: str, task_name: str) -> None:
|
||
"""
|
||
将任务状态发送到简道云(开始时间为北京时间,需转换到 UTC)
|
||
:param task_start_time: 任务开始时间(字符串格式:"%Y-%m-%d %H:%M:%S",表示北京时间 UTC+8)
|
||
:param task_name: 任务名称
|
||
"""
|
||
try:
|
||
# 1. 获取当前 UTC 时间(时区感知对象)
|
||
end_time_utc = datetime.now(UTC) # ✅ 替代 utcnow()
|
||
|
||
# 2. 解析传入的北京时间(UTC+8)
|
||
task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S")
|
||
|
||
# 3. 转换为 UTC 时间(减去 8 小时,并附加 UTC 时区)
|
||
task_start_utc = task_start_naive - timedelta(hours=8)
|
||
task_start_utc = task_start_utc.replace(tzinfo=timezone.utc) # 显式标记为 UTC
|
||
|
||
# 4. 计算运行时间(时区感知对象可直接相减)
|
||
run_time = end_time_utc - task_start_utc
|
||
run_time_sec = int(run_time.total_seconds())
|
||
|
||
# 5. 格式化时间为 UTC 的 ISO 8601 格式(带 "Z")
|
||
today_utc = end_time_utc.strftime("%Y-%m-%d")
|
||
task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
|
||
# 6. 构造请求数据(所有时间以 UTC 格式发送)
|
||
payload = {
|
||
"api_key": "6694d3c4fcb69ca9a111a6c4",
|
||
"entry_id": "67ede908eb9c22261016466e",
|
||
"data": {
|
||
"_widget_1744873387500": {"value": today_utc}, # UTC 日期
|
||
"_widget_1743644977694": {"value": task_name},
|
||
"_widget_1744873387501": {"value": task_start_iso}, # UTC 开始时间
|
||
"_widget_1744873387502": {"value": task_end_iso}, # UTC 结束时间
|
||
"_widget_1744873387504": {"value": run_time_sec},
|
||
}
|
||
}
|
||
|
||
# 7. 发送请求
|
||
response = update_ID_form.data_batch_create(payload)
|
||
except Exception as e:
|
||
pass
|
||
|
||
@staticmethod
|
||
def entry_data_batch_create(
|
||
data: dict,
|
||
chunk_size: int = 90,
|
||
max_retries: int = 20
|
||
) -> List[Optional[requests.Response]]: # 新建多条数据 注意简道云限制1次最多100条数据
|
||
"""
|
||
新建多条数据
|
||
:param max_retries: 最大重试次数,此处设置20次
|
||
:param data:应包含数据id、表单id、以及需要新建的信息,新建信息应该是一个列表
|
||
:param chunk_size: 简道云限制批量新建一次最多100条,这里默认值设置为90条一次
|
||
:return:返回请求后的结果
|
||
"""
|
||
data = replace_decimals(data)
|
||
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_create'
|
||
|
||
headers = {
|
||
'Authorization': 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN', # 曹伟应用api测试 appKey
|
||
'Content-Type': 'application/json.json'
|
||
}
|
||
|
||
"""
|
||
data_list 样式 # 后续优化发送数据样式 目前输入字段,后续优化输入表单名称
|
||
jiandaoyun_data_list['data_list'] = [{"_widget_1731650067055":{"value":f'{username}{password}'},
|
||
"_widget_1731650067056":{"value": f"{group}"}},
|
||
{"_widget_1731650067055":{"value":f'{username}{password}'},
|
||
"_widget_1731650067056":{"value": f"{group}"}}]
|
||
"""
|
||
|
||
# 获取data_list长度
|
||
total_length = len(data['data_list'])
|
||
|
||
# 计算需要发送的次数
|
||
num_chunks = (total_length + chunk_size - 1) // chunk_size # //整除向下取证,需要加上chunk_size - 1保证不会有缺失数据
|
||
data_get_list = []
|
||
for i in range(num_chunks):
|
||
start_index = i * chunk_size
|
||
end_index = min(start_index + chunk_size, total_length)
|
||
payload = json.dumps({
|
||
"app_id": data['api_key'], # 应用ID
|
||
"entry_id": data['entry_id'], # 表单ID
|
||
"data_list": data['data_list'][start_index:end_index],
|
||
"is_start_workflow": data.get('is_start_workflow', "false"),
|
||
"is_start_trigger": data.get('is_start_trigger', "false"),
|
||
}, cls=NpEncoder)
|
||
retries = 0
|
||
while retries <= max_retries:
|
||
try:
|
||
res: requests.Response = requests.post(url=url, data=payload, headers=headers, timeout=10)
|
||
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
|
||
data_get = res.json()
|
||
if data_get["status"] == "success":
|
||
data_get_list.append(data_get)
|
||
break # 成功则跳出循环
|
||
else:
|
||
|
||
retries += 1
|
||
time.sleep(3) # 在重试之间稍作停顿
|
||
except requests.exceptions.RequestException as e:
|
||
|
||
retries += 1
|
||
time.sleep(0.1) # 在重试之间稍作停顿
|
||
if retries > max_retries:
|
||
data_get_list.append(None) # 或者可以选择记录失败的payload以便后续处理
|
||
|
||
return data_get_list
|
||
|
||
def get_existing_id_form(self):
|
||
"""读取现有的ID表"""
|
||
try:
|
||
now_ID_form = update_ID_form.entry_data_list(self.payload1).get('data')
|
||
df = pd.DataFrame(now_ID_form)
|
||
return df
|
||
except Exception as e:
|
||
return None
|
||
|
||
@staticmethod
|
||
def entry_data_batch_delete(
|
||
data: dict,
|
||
chunk_size: int = 90,
|
||
max_retries: int = 20
|
||
) -> List[Optional[requests.Response]]: # 新建多条数据 注意简道云限制1次最多100条数据
|
||
"""
|
||
批量删除数据
|
||
:param data: 应包含应用ID、表单ID、数据ID列表
|
||
:param chunk_size:单词删除最大条数,默认90
|
||
:param max_retries:重试次数,默认20
|
||
:return:
|
||
"""
|
||
|
||
data = replace_decimals(data)
|
||
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_delete'
|
||
|
||
headers = {
|
||
'Authorization': 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN', # 曹伟应用api测试 appKey
|
||
'Content-Type': 'application/json.json'
|
||
}
|
||
|
||
# 获取data_list长度
|
||
total_length = len(data['data_ids'])
|
||
|
||
# 计算需要发送的次数
|
||
num_chunks = (total_length + chunk_size - 1) // chunk_size # //整除向下取证,需要加上chunk_size - 1保证不会有缺失数据
|
||
data_get_list = []
|
||
|
||
for i in range(num_chunks):
|
||
start_index = i * chunk_size
|
||
end_index = min(start_index + chunk_size, total_length)
|
||
payload = json.dumps({
|
||
"app_id": data['api_key'], # 应用ID
|
||
"entry_id": data['entry_id'], # 表单ID
|
||
"data_ids": data['data_ids'][start_index:end_index],
|
||
}, cls=NpEncoder)
|
||
|
||
retries = 0
|
||
while retries <= max_retries:
|
||
try:
|
||
res: requests.Response = requests.post(url=url, data=payload, headers=headers, timeout=10)
|
||
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
|
||
data_get = res.json()
|
||
|
||
if data_get["status"] == "success":
|
||
data_get_list.append(data_get)
|
||
break # 成功则跳出循环
|
||
else:
|
||
|
||
retries += 1
|
||
time.sleep(3) # 在重试之间稍作停顿
|
||
except requests.exceptions.RequestException as e:
|
||
|
||
retries += 1
|
||
time.sleep(0.1) # 在重试之间稍作停顿
|
||
if retries > max_retries:
|
||
data_get_list.append(None) # 或者可以选择记录失败的payload以便后续处理
|
||
|
||
return data_get_list
|
||
|
||
def delete_existing_data(self, df):
|
||
"""批量删除现有数据"""
|
||
try:
|
||
all_data = []
|
||
for index, i in df.iterrows():
|
||
all_data.append(i["_id"])
|
||
self.delete_payload["data_ids"] = all_data
|
||
res = update_ID_form.entry_data_batch_delete(self.delete_payload)
|
||
except Exception as e:
|
||
pass
|
||
|
||
def update_data(self, df1):
|
||
"""批量写入新数据"""
|
||
try:
|
||
all_data1 = []
|
||
for index, i in df1.iterrows():
|
||
all_data1.append({
|
||
"_widget_1734942794144": {"value": i["name"]},
|
||
"_widget_1734942794145": {"value": i["username"]},
|
||
})
|
||
self.update_payload["data_list"] = all_data1
|
||
update_ID_form.entry_data_batch_create(self.update_payload)
|
||
except Exception as e:
|
||
pass
|
||
|
||
def get_out_department_memberships(self):
|
||
"""获取外部公司成员及ID表"""
|
||
try:
|
||
|
||
url = "https://api.jiandaoyun.com/api/v5/corp/guest/user/list"
|
||
|
||
headers = {
|
||
'Authorization': 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN',
|
||
'Content-Type': 'application/json.json'
|
||
}
|
||
|
||
response = requests.post(url, headers=headers)
|
||
all_data = []
|
||
member_list = response.json().get("member_list", [])
|
||
for member in member_list:
|
||
name = member.get("name")
|
||
username = member.get("username") # 用户id
|
||
all_data.append({"name": name, "username": username})
|
||
df2 = pd.DataFrame(all_data)
|
||
return df2
|
||
except:
|
||
print("获取外部公司成员及ID表失败")
|
||
|
||
def main(self):
|
||
"""主函数"""
|
||
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
try:
|
||
df1 = self.get_department_members()
|
||
df2 = self.get_out_department_memberships()
|
||
if df1 is not None:
|
||
df = self.get_existing_id_form()
|
||
if df is not None:
|
||
self.delete_existing_data(df)
|
||
self.update_data(df1)
|
||
self.update_data(df2)
|
||
|
||
self.send_task_status(task_start_time, "简道云员工ID表更新")
|
||
except Exception as e:
|
||
print(str(e))
|
||
|
||
|
||
if __name__ == '__main__':
|
||
daily_task = update_ID_form()
|
||
daily_task.main()
|