186 lines
7.3 KiB
Python
186 lines
7.3 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
import requests
|
|
import json
|
|
import time
|
|
import re
|
|
from datetime import datetime
|
|
from dateutil.relativedelta import relativedelta
|
|
from pathlib import Path
|
|
from urllib.parse import quote
|
|
from io import BytesIO
|
|
|
|
ROOT = Path('.').absolute() # 当前工作目录
|
|
|
|
|
|
# 生成 token,参数不需要修改
|
|
def generateToken() -> str:
|
|
""" 生成 token """
|
|
|
|
token_api = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
|
|
|
|
# 该信息在钉钉开放应用中
|
|
data = {
|
|
"appKey": "ding5kqocon5s9oph5uq",
|
|
"appSecret": 'HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_'
|
|
}
|
|
|
|
res = requests.post(token_api, json=data)
|
|
token = res.json()['accessToken']
|
|
|
|
return token
|
|
|
|
def instances(TOKEN,formInstanceId,data_new) -> str:
|
|
""" 函数功能:更新表单内容 """
|
|
api = f'https://api.dingtalk.com//v1.0/yida/forms/instances'
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"x-acs-dingtalk-access-token": TOKEN
|
|
}
|
|
|
|
payload = {
|
|
"appType" : "APP_TNVBVZ3K8G56HG03Z45Q",
|
|
"systemToken" : "CH7669818R0WN18TYTYJ42PE6GY22WZN0BYWKD1",
|
|
"userId" : "2268275546837446", # 曹伟 id
|
|
"language" : "zh_CN",
|
|
"useLatestVersion" : "false",
|
|
"formInstanceId" : formInstanceId,
|
|
"updateFormDataJson" : json.dumps(data_new, cls=NpEncoder),
|
|
}
|
|
|
|
res = requests.put(api, headers=headers, json=payload)
|
|
|
|
return res.json()
|
|
|
|
def read_form_instances(token,formUuid,createFromTimeGMT,createToTimeGMT,page=1,n = 100,searchField={}):
|
|
""" 函数功能:读取普通表单的所有数据 """
|
|
|
|
api = f'https://api.dingtalk.com/v1.0/yida/forms/instances/search'
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"x-acs-dingtalk-access-token": token
|
|
}
|
|
|
|
formData = {
|
|
'currentPage':page,
|
|
'pageSize':n,
|
|
"appType": "APP_TNVBVZ3K8G56HG03Z45Q",
|
|
"systemToken": "CH7669818R0WN18TYTYJ42PE6GY22WZN0BYWKD1",
|
|
"userId": "yida_pub_account",
|
|
"language": "zh_CN",
|
|
"formUuid": formUuid,
|
|
"createFromTimeGMT":createFromTimeGMT,
|
|
"createToTimeGMT":createToTimeGMT,
|
|
"searchFieldJson":json.dumps(searchField)
|
|
}
|
|
|
|
res = requests.post(api, headers=headers, json=formData)
|
|
return res.json()
|
|
|
|
def read_form_instancesV2(token,formUuid,page=1,n = 100,searchField={}):
|
|
""" 函数功能:读取普通表单的所有数据 """
|
|
|
|
api = f'https://api.dingtalk.com/v1.0/yida/forms/instances/search'
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"x-acs-dingtalk-access-token": token
|
|
}
|
|
|
|
formData = {
|
|
'currentPage':page,
|
|
'pageSize':n,
|
|
"appType": "APP_TNVBVZ3K8G56HG03Z45Q",
|
|
"systemToken": "CH7669818R0WN18TYTYJ42PE6GY22WZN0BYWKD1",
|
|
"userId": "yida_pub_account",
|
|
"language": "zh_CN",
|
|
"formUuid": formUuid,
|
|
"searchFieldJson":json.dumps(searchField)
|
|
}
|
|
|
|
res = requests.post(api, headers=headers, json=formData)
|
|
return res.json()
|
|
def timeStamp(timeNum):
|
|
""" 函数功能:将时间戳(毫秒) 转化为时间日期格式"""
|
|
timeStamp = float(timeNum/1000)
|
|
timeArray = time.localtime(timeStamp)
|
|
otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
|
|
return otherStyleTime
|
|
|
|
def get_time_range_hour(n):
|
|
""" 获取近n个小时的时间戳(单位是毫秒)"""
|
|
|
|
def delay_time(time_str, years=0, months=0, days=0, hours=0, minutes=0, seconds=0):
|
|
if type(time_str) == str:
|
|
time_str = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
|
|
ret = time_str + relativedelta(years=years, months=months, days=days, hours=hours, minutes=minutes, seconds=seconds)
|
|
return ret
|
|
|
|
# 获得当前时间
|
|
now_time = datetime.now()
|
|
endTime = int(time.mktime(time.strptime(now_time.strftime('%Y/%m/%d %H:%M:%S'),'%Y/%m/%d %H:%M:%S'))) * 1000 - 1000
|
|
# n小时前的时间
|
|
ret2 = delay_time(now_time, hours=-n)
|
|
startTime = int(time.mktime(time.strptime(ret2.strftime('%Y/%m/%d %H:%M:%S'),'%Y/%m/%d %H:%M:%S'))) * 1000
|
|
|
|
# print(f'时间区间:[{startTime}-{endTime}]')
|
|
return startTime,endTime
|
|
def remove_duplicates(data):
|
|
new_data = {}
|
|
for key, value in data.items():
|
|
if value not in new_data.values():
|
|
new_data[key] = value
|
|
return new_data
|
|
class NpEncoder(json.JSONEncoder):
|
|
def default(self, obj):
|
|
if isinstance(obj, np.integer):
|
|
return int(obj)
|
|
elif isinstance(obj, np.floating):
|
|
return float(obj)
|
|
elif isinstance(obj, np.ndarray):
|
|
return obj.tolist()
|
|
else:
|
|
return super(NpEncoder, self).default(obj)
|
|
|
|
TOKEN = generateToken()
|
|
# 对应关系维护
|
|
FORMID = 'FORM-JD8668C1CAC7X39ND7GNN4O8D2UA3L21P3ODL22' # 普通表单
|
|
stting = {
|
|
'材料数据':['材料名称','品牌','零件号','规格型号','库存数量','采购均价','计量单位'],
|
|
'材料销售历史数据':['源单号','材料名称','数量','金额','材料备注'],
|
|
'储值卡数据':['卡名称','持卡人手机号','绑定车牌号','卡号','剩余面额','剩余实额','卡有效期'],
|
|
'单据历史数据':['工单号','车牌号','里程','金额','开单时间','工单备注'],
|
|
'供应商信息数据':['供应商名称','联系人','手机','联系电话'],
|
|
'客户车辆信息':['客户姓名','车牌号','手机号码','VIN码'],
|
|
'套餐卡数据':['卡名称','持卡人姓名','持卡人手机号','绑定车牌号','卡号','卡有效期','内容名称','剩余次数','单次面额','单次实额','卡说明'],
|
|
'项目施工历史数据':['源单号','项目名称','工时数','工时费'],
|
|
'项目信息数据':['项目名称','所需工时1','工时单价1']
|
|
}
|
|
""" 24小时读取一遍数据 """
|
|
# 获取全量公有
|
|
form_data = read_form_instancesV2(token=TOKEN, formUuid=FORMID, page=1, n=100,searchField={'selectField_ldo3qxal': '公有'})
|
|
PAGES = form_data.get('totalCount')//100 + 1
|
|
ALL_DATA_staff = {}
|
|
""" 获取全量数据 """
|
|
for i in range(1, PAGES+1):
|
|
form_data = read_form_instancesV2(token=TOKEN, formUuid=FORMID, page=i, n=100,searchField={'selectField_ldo3qxal': '公有'})
|
|
for data in form_data.get('data'):
|
|
ALL_DATA_staff[data['formData']['textField_ldo3qxao'] + data['formData']['textField_ldo3qxap']]=data['formInstanceId']
|
|
# 获取前一天私有
|
|
TIME_RANGE = {'24小时内':[timeStamp(t) for t in get_time_range_hour(24)]} # 当前时刻向前推1个小时
|
|
data_siyou = read_form_instances(token=TOKEN,formUuid=FORMID,page=1,n=100,createFromTimeGMT=[*TIME_RANGE['24小时内']][0],createToTimeGMT=[*TIME_RANGE['24小时内']][1],searchField={'selectField_ldo3qxal': '私有'})
|
|
ALL_DATA_staff_V2 = {}
|
|
for i in range(0,len(data_siyou['data'])):
|
|
if set(stting[data_siyou['data'][i]['formData']['textField_ldo3qxao']]).issubset(set(json.loads(data_siyou['data'][i]['formData']['textField_ldo3qxaq']))):
|
|
ALL_DATA_staff_V2[data_siyou['data'][i]['formInstanceId']]=data_siyou['data'][i]['formData']['textField_ldo3qxao']+data_siyou['data'][i]['formData']['textField_ldo3qxap']
|
|
ALL_DATA_staff_V3 = remove_duplicates(ALL_DATA_staff_V2)
|
|
for id in ALL_DATA_staff_V3:
|
|
data_new = {
|
|
'selectField_ldo3qxal':'公有',
|
|
'textField_ldo3qxan':'区域公用'
|
|
}
|
|
res = instances(TOKEN,id,data_new)
|
|
print(res,id)
|
|
time.sleep(0.5) |