# -*- coding: utf-8 -*- import psycopg2 import pandas as pd # 获得连接 conn = psycopg2.connect(database="f6_bi", user="BASIC$ro_caowei", password="!ro_caowei123", host="hgprecn-cn-nif1vnv0y002-cn-shanghai.hologres.aliyuncs.com", port="80") # 获得游标对象,一个游标对象可以对数据库进行执行操作 cursor = conn.cursor() import datetime now_time = datetime.datetime.now() yes_time = now_time + datetime.timedelta(days=-1) yes_time_nyr = int(yes_time.strftime('%Y%m%d'))# 获取前一天日期 # sql语句 建表 sql =f"""SELECT * FROM "public"."holo_ads_report_saas_profile_ngv_detail_d" WHERE "date_id" = '{yes_time_nyr}';""" # 执行语句 cursor.execute(sql) # 获取结果集的每一行 rows = cursor.fetchall() # 获取所有字段名 all_fields = cursor.description #执行结果转化为dataframe col = [] for i in all_fields: col.append(i[0]) data_NGV = pd.DataFrame(list(rows),columns=col) # data_NGV.to_excel(r'C:\Users\admin\Desktop\NGV明细.xlsx') # 关闭数据库连接 cursor.close() conn.close() # 基础函数配置 import pandas as pd import requests from pathlib import Path from urllib.parse import quote import json import numpy as np import time import datetime ROOT = Path('.').absolute() # 当前工作目录 def generateToken() -> str: """ 生成 token """ token_api = 'https://api.dingtalk.com/v1.0/oauth2/accessToken' # 该信息在钉钉开放应用中 data = { "appKey": "ding5kqocon5s9oph5uq", "appSecret": 'HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_' } res = requests.post(token_api, json=data) token = res.json()['accessToken'] return token def read_instances_new(token, formUuid, page, n): """ 函数功能:读取流程表单的所有数据 """ api = f'https://api.dingtalk.com//v1.0/yida/processes/instances?pageNumber={page}&pageSize={n}' headers = { "Content-Type": "application/json", "x-acs-dingtalk-access-token": token } formData = { "appType": "APP_UYZ0KG6L0CCNV80GZ66O", "systemToken": "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", "userId": "yida_pub_account", # 超级管理员账号 "language": "zh_CN", "formUuid": formUuid, # "searchFieldJson": json.dumps(searchField), # 如果增加上这一项会要求升级宜搭存储 "instanceStatus": "RUNNING" } res = requests.post(api, headers=headers, json=formData) return res.json() def read_instances(token, formUuid, page, n): """ 函数功能:读取普通表单的所有数据 """ api = f'https://api.dingtalk.com//v1.0/yida/forms/instances/search' headers = { "Content-Type": "application/json", "x-acs-dingtalk-access-token": token } formData = { "appType" : "APP_UYZ0KG6L0CCNV80GZ66O", "systemToken" : "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", "userId" : "yida_pub_account", "language" : "zh_CN", "formUuid" : formUuid, "currentPage" : page, "pageSize" : n } res = requests.post(api, headers=headers, json=formData) return res.json() def transcation(FORMID,data_new): """ 函数功能:更新表单内容 """ api = f'https://api.dingtalk.com//v1.0/yida/forms/instances' headers = { "Content-Type": "application/json", "x-acs-dingtalk-access-token": TOKEN } payload = { "appType" : "APP_UYZ0KG6L0CCNV80GZ66O", "systemToken" : "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", "userId" : "yida_pub_account", # 曹伟 id "language" : "zh_CN", "useLatestVersion" : "false", "formInstanceId" : FORMID, "updateFormDataJson" : json.dumps(data_new, cls=NpEncoder), } res = requests.put(api, headers=headers, json=payload) return res.json() def update_instances(TOKEN, processInstanceId,code,name): """ 更新表单实例 """ api = f'https://api.dingtalk.com//v1.0/yida/forms/instances' headers = { "Content-Type": "application/json", "x-acs-dingtalk-access-token": TOKEN } data_new= { code : name } payload = { "appType" : "APP_UYZ0KG6L0CCNV80GZ66O", "systemToken" : "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", "userId" : "yida_pub_account", "language" : "zh_CN", "formInstanceId" : processInstanceId, "useLatestVersion" : 'false', "updateFormDataJson" : json.dumps(data_new, cls=NpEncoder) #json.dumps(data_new, cls=NpEncoder) } res = requests.put(api, headers=headers,json =payload) return res.json() def instances_id(TOKEN,FORMID,id): """ 函数功能:查询表单实例 """ api = f'https://api.dingtalk.com//v1.0/yida/forms/instances/{id}?appType=APP_UYZ0KG6L0CCNV80GZ66O&systemToken=XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2&userId=2268275546837446&language=zh_CN' headers = { "Content-Type": "application/json", "x-acs-dingtalk-access-token": TOKEN } res = requests.get(api, headers=headers) return res.json() class NpEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.integer): return int(obj) elif isinstance(obj, np.floating): return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() else: return super(NpEncoder, self).default(obj) TOKEN = generateToken() '''新签服务流程实例状态字段,动态传入到 节点化回访流程表单中''' FORMID = "FORM-9X766NA1SOATBRSH2K42D8BJCX7L326HI64TKG5" # [流程]新签服务流程 # 读取[流程]新签服务流程表单 form_data = read_instances_new(token=TOKEN, formUuid=FORMID, page=1, n=100) PAGES = form_data.get('totalCount')//100 + 1 ALL_DATA_code = [] """ 获取全量数据 """ for i in range(1, PAGES+1): form_data = read_instances_new(token=TOKEN, formUuid=FORMID, page=i, n=100) for data in form_data.get('data'): ALL_DATA_code.append(data["data"]["orgCode"]) print(f'读取到 [流程]新签服务流程表中 {len(ALL_DATA_code)} 条数据!') # 读取节点化回访流程表单 FORMID = "FORM-L89662816B04LXH893M4K50Q7MIZ1SVQI08ALU2" # 新签节点化服务待办 # 读取流程表单数据 form_data = read_instances_new(token=TOKEN, formUuid=FORMID, page=1, n=100) PAGES = form_data.get('totalCount')//100 + 1 ALL_DATA_node = [] """ 获取全量数据 """ for i in range(1, PAGES+1): # form_data = read_processes_instances(token=TOKEN, formUuid=FORMID, createFromTimeGMT=CREATE_FROM, createToTimeGMT=CREATE_TO, page=i, n=100, searchField={'textField_l7if5ff9': '否'}) form_data = read_instances_new(token=TOKEN, formUuid=FORMID, page=i, n=100) for data in form_data.get('data'): ALL_DATA_node.append(data) print(f'读取到 新签节点化服务待办表单中 {len(ALL_DATA_node)} 条数据!') # 校验状态 for i in range(0,len(ALL_DATA_node)): try: if ALL_DATA_node[i]['data']['textField_la80kiyd'] in ALL_DATA_code and ALL_DATA_node[i]['data']['radioField_lfth6zsc'] !="运行中": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'radioField_lfth6zsc',"运行中") print(i,"运行中",ALL_DATA_node[i]['data']['textField_la80kiyd']) except: pass try: if ALL_DATA_node[i]['data']['textField_la80kiyd'] not in ALL_DATA_code and ALL_DATA_node[i]['data']['radioField_lfth6zsc'] !="已完成": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'radioField_lfth6zsc',"已完成") print(i,"已完成",ALL_DATA_node[i]['data']['textField_la80kiyd']) except: pass try: res = instances_id(TOKEN,"FORM-L89662816B04LXH893M4K50Q7MIZ1SVQI08ALU2",ALL_DATA_node[i]['processInstanceId']) timestamp = int(res['formData']['dateField_la8zs59x']) # 输出结果为:1672493600000 # 将时间戳转换为日期对象 date_obj = datetime.datetime.fromtimestamp(timestamp / 1000) # 将日期对象的时分秒毫秒部分设置为0 date_obj = date_obj.replace(hour=0, minute=0, second=0, microsecond=0) # 将日期对象的年份、月份和日期部分拼接成字符串 date_str = date_obj.strftime('%Y-%m-%d') # 将字符串转换为日期对象 date_obj = datetime.datetime.strptime(date_str, '%Y-%m-%d') # 将日期对象转换为时间戳(单位为毫秒) timestamp = int(date_obj.timestamp() * 1000) formData = {} # 获取距离过期日期前120天,前90天,前60天,前30天的日期 formData['dateField_ll7zvjsv'] = str(timestamp+2592000000) # 30天限制日期4 2023-06-04 00:00:00 formData['dateField_ll7zvjsw'] = str(timestamp+5184000000) # 60天限制日期3 2023-07-04 00:00:00 formData['dateField_ll7zvjsx'] = str(timestamp+7776000000) # 90天限制日期2 2023-08-03 00:00:00 res = transcation(ALL_DATA_node[i]['processInstanceId'],formData) print(res,ALL_DATA_node[i]['processInstanceId'],timestamp,formData) except: pass # 校验状态 for i in range(0,len(ALL_DATA_node)): for a in range(len(data_NGV["org_code"])): if ALL_DATA_node[i]['data']['textField_la80kiyd'] == data_NGV.loc[a,"org_code"]: try: if data_NGV.loc[a,"active_status_fmt"] == "活跃" and ALL_DATA_node[i]['data']['textField_lmkblr5v']=="否": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_lmkblr5v',"是") print(i,ALL_DATA_node[i]['data']['textField_la80kiyd'],"活跃") elif data_NGV.loc[a,"active_status_fmt"] == "活跃" and ALL_DATA_node[i]['data']['textField_lmkblr5v']=="": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_lmkblr5v',"是") print(i,ALL_DATA_node[i]['data']['textField_la80kiyd'],"活跃") elif data_NGV.loc[a,"active_status_fmt"] == "不活跃" and ALL_DATA_node[i]['data']['textField_lmkblr5v']=="是": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_lmkblr5v',"否") print(i,ALL_DATA_node[i]['data']['textField_la80kiyd'],"不活跃") elif data_NGV.loc[a,"active_status_fmt"] == "不活跃" and ALL_DATA_node[i]['data']['textField_lmkblr5v']=="": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_lmkblr5v',"否") print(i,ALL_DATA_node[i]['data']['textField_la80kiyd'],"不活跃") except: pass try: print(data_NGV.loc[a,"is_g"],ALL_DATA_node[i]['data']['textField_lmkblr5w']) if data_NGV.loc[a,"is_g"] == 1 and ALL_DATA_node[i]['data']['textField_lmkblr5w']=="否": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_lmkblr5w',"是") print(i,ALL_DATA_node[i]['data']['textField_la80kiyd'],"是") elif data_NGV.loc[a,"is_g"] == 1 and ALL_DATA_node[i]['data']['textField_lmkblr5w']=="": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_lmkblr5w',"是") print(i,ALL_DATA_node[i]['data']['textField_la80kiyd'],"是") elif data_NGV.loc[a,"is_g"] == 0 and ALL_DATA_node[i]['data']['textField_lmkblr5w']=="是": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_lmkblr5w',"否") print(i,ALL_DATA_node[i]['data']['textField_la80kiyd'],"否") elif data_NGV.loc[a,"is_g"] == 0 and ALL_DATA_node[i]['data']['textField_lmkblr5w']=="": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_lmkblr5w',"否") print(i,ALL_DATA_node[i]['data']['textField_la80kiyd'],"否") except: pass try: try: if int(ALL_DATA_node[i]['data']['textField_lmkblr5x']) != data_NGV.loc[a,"bill_day_count_last_30_day"]: update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_lmkblr5x',data_NGV.loc[a,"bill_day_count_last_30_day"]) print(i,ALL_DATA_node[i]['data']['textField_la80kiyd'],data_NGV.loc[a,"bill_day_count_last_30_day"]) except: update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_lmkblr5x',data_NGV.loc[a,"bill_day_count_last_30_day"]) print(i,ALL_DATA_node[i]['data']['textField_la80kiyd'],data_NGV.loc[a,"bill_day_count_last_30_day"]) try: if ALL_DATA_node[i]['data']['radioField_livfb4km'] == "": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'radioField_livfb4km',data_NGV.loc[a,"manage_model"]) print(i,ALL_DATA_node[i]['data']['textField_la80kiyd'],data_NGV.loc[a,"manage_model"]) except: update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'radioField_livfb4km',data_NGV.loc[a,"manage_model"]) print(i,ALL_DATA_node[i]['data']['textField_la80kiyd'],data_NGV.loc[a,"manage_model"]) try: if ALL_DATA_node[i]['data']['textField_lc8vmo71'] == "": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_lc8vmo71',data_NGV.loc[a,"group_grade"]) print(i,ALL_DATA_node[i]['data']['textField_la80kiyd'],data_NGV.loc[a,"group_grade"]) except: update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_lc8vmo71',data_NGV.loc[a,"group_grade"]) print(i,ALL_DATA_node[i]['data']['textField_la80kiyd'],data_NGV.loc[a,"group_grade"]) try: if ALL_DATA_node[i]['data']['selectField_lmjwct5w'] == "": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'selectField_lmjwct5w',"默认回访") print(i,ALL_DATA_node[i]['data']['textField_la80kiyd'],"默认回访") except: update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'selectField_lmjwct5w',"默认回访") print(i,ALL_DATA_node[i]['data']['textField_la80kiyd'],"默认回访") except: pass