Files
F6--/其它系统脚本/duiyingguanxi_zidongjiaoyan.py
2026-01-30 11:28:35 +08:00

186 lines
7.3 KiB
Python

import pandas as pd
import numpy as np
import requests
import json
import time
import re
from datetime import datetime
from dateutil.relativedelta import relativedelta
from pathlib import Path
from urllib.parse import quote
from io import BytesIO
ROOT = Path('.').absolute() # 当前工作目录
# 生成 token,参数不需要修改
def generateToken() -> str:
""" 生成 token """
token_api = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
# 该信息在钉钉开放应用中
data = {
"appKey": "ding5kqocon5s9oph5uq",
"appSecret": 'HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_'
}
res = requests.post(token_api, json=data)
token = res.json()['accessToken']
return token
def instances(TOKEN,formInstanceId,data_new) -> str:
""" 函数功能:更新表单内容 """
api = f'https://api.dingtalk.com//v1.0/yida/forms/instances'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": TOKEN
}
payload = {
"appType" : "APP_TNVBVZ3K8G56HG03Z45Q",
"systemToken" : "CH7669818R0WN18TYTYJ42PE6GY22WZN0BYWKD1",
"userId" : "2268275546837446", # 曹伟 id
"language" : "zh_CN",
"useLatestVersion" : "false",
"formInstanceId" : formInstanceId,
"updateFormDataJson" : json.dumps(data_new, cls=NpEncoder),
}
res = requests.put(api, headers=headers, json=payload)
return res.json()
def read_form_instances(token,formUuid,createFromTimeGMT,createToTimeGMT,page=1,n = 100,searchField={}):
""" 函数功能:读取普通表单的所有数据 """
api = f'https://api.dingtalk.com/v1.0/yida/forms/instances/search'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
formData = {
'currentPage':page,
'pageSize':n,
"appType": "APP_TNVBVZ3K8G56HG03Z45Q",
"systemToken": "CH7669818R0WN18TYTYJ42PE6GY22WZN0BYWKD1",
"userId": "yida_pub_account",
"language": "zh_CN",
"formUuid": formUuid,
"createFromTimeGMT":createFromTimeGMT,
"createToTimeGMT":createToTimeGMT,
"searchFieldJson":json.dumps(searchField)
}
res = requests.post(api, headers=headers, json=formData)
return res.json()
def read_form_instancesV2(token,formUuid,page=1,n = 100,searchField={}):
""" 函数功能:读取普通表单的所有数据 """
api = f'https://api.dingtalk.com/v1.0/yida/forms/instances/search'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
formData = {
'currentPage':page,
'pageSize':n,
"appType": "APP_TNVBVZ3K8G56HG03Z45Q",
"systemToken": "CH7669818R0WN18TYTYJ42PE6GY22WZN0BYWKD1",
"userId": "yida_pub_account",
"language": "zh_CN",
"formUuid": formUuid,
"searchFieldJson":json.dumps(searchField)
}
res = requests.post(api, headers=headers, json=formData)
return res.json()
def timeStamp(timeNum):
""" 函数功能:将时间戳(毫秒) 转化为时间日期格式"""
timeStamp = float(timeNum/1000)
timeArray = time.localtime(timeStamp)
otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
return otherStyleTime
def get_time_range_hour(n):
""" 获取近n个小时的时间戳(单位是毫秒)"""
def delay_time(time_str, years=0, months=0, days=0, hours=0, minutes=0, seconds=0):
if type(time_str) == str:
time_str = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
ret = time_str + relativedelta(years=years, months=months, days=days, hours=hours, minutes=minutes, seconds=seconds)
return ret
# 获得当前时间
now_time = datetime.now()
endTime = int(time.mktime(time.strptime(now_time.strftime('%Y/%m/%d %H:%M:%S'),'%Y/%m/%d %H:%M:%S'))) * 1000 - 1000
# n小时前的时间
ret2 = delay_time(now_time, hours=-n)
startTime = int(time.mktime(time.strptime(ret2.strftime('%Y/%m/%d %H:%M:%S'),'%Y/%m/%d %H:%M:%S'))) * 1000
# print(f'时间区间:[{startTime}-{endTime}]')
return startTime,endTime
def remove_duplicates(data):
new_data = {}
for key, value in data.items():
if value not in new_data.values():
new_data[key] = value
return new_data
class NpEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(NpEncoder, self).default(obj)
TOKEN = generateToken()
# 对应关系维护
FORMID = 'FORM-JD8668C1CAC7X39ND7GNN4O8D2UA3L21P3ODL22' # 普通表单
stting = {
'材料数据':['材料名称','品牌','零件号','规格型号','库存数量','采购均价','计量单位'],
'材料销售历史数据':['源单号','材料名称','数量','金额','材料备注'],
'储值卡数据':['卡名称','持卡人手机号','绑定车牌号','卡号','剩余面额','剩余实额','卡有效期'],
'单据历史数据':['工单号','车牌号','里程','金额','开单时间','工单备注'],
'供应商信息数据':['供应商名称','联系人','手机','联系电话'],
'客户车辆信息':['客户姓名','车牌号','手机号码','VIN码'],
'套餐卡数据':['卡名称','持卡人姓名','持卡人手机号','绑定车牌号','卡号','卡有效期','内容名称','剩余次数','单次面额','单次实额','卡说明'],
'项目施工历史数据':['源单号','项目名称','工时数','工时费'],
'项目信息数据':['项目名称','所需工时1','工时单价1']
}
""" 24小时读取一遍数据 """
# 获取全量公有
form_data = read_form_instancesV2(token=TOKEN, formUuid=FORMID, page=1, n=100,searchField={'selectField_ldo3qxal': '公有'})
PAGES = form_data.get('totalCount')//100 + 1
ALL_DATA_staff = {}
""" 获取全量数据 """
for i in range(1, PAGES+1):
form_data = read_form_instancesV2(token=TOKEN, formUuid=FORMID, page=i, n=100,searchField={'selectField_ldo3qxal': '公有'})
for data in form_data.get('data'):
ALL_DATA_staff[data['formData']['textField_ldo3qxao'] + data['formData']['textField_ldo3qxap']]=data['formInstanceId']
# 获取前一天私有
TIME_RANGE = {'24小时内':[timeStamp(t) for t in get_time_range_hour(24)]} # 当前时刻向前推1个小时
data_siyou = read_form_instances(token=TOKEN,formUuid=FORMID,page=1,n=100,createFromTimeGMT=[*TIME_RANGE['24小时内']][0],createToTimeGMT=[*TIME_RANGE['24小时内']][1],searchField={'selectField_ldo3qxal': '私有'})
ALL_DATA_staff_V2 = {}
for i in range(0,len(data_siyou['data'])):
if set(stting[data_siyou['data'][i]['formData']['textField_ldo3qxao']]).issubset(set(json.loads(data_siyou['data'][i]['formData']['textField_ldo3qxaq']))):
ALL_DATA_staff_V2[data_siyou['data'][i]['formInstanceId']]=data_siyou['data'][i]['formData']['textField_ldo3qxao']+data_siyou['data'][i]['formData']['textField_ldo3qxap']
ALL_DATA_staff_V3 = remove_duplicates(ALL_DATA_staff_V2)
for id in ALL_DATA_staff_V3:
data_new = {
'selectField_ldo3qxal':'公有',
'textField_ldo3qxan':'区域公用'
}
res = instances(TOKEN,id,data_new)
print(res,id)
time.sleep(0.5)