# -*- coding: utf-8 -*- import psycopg2 import pandas as pd # 获得连接 conn = psycopg2.connect(database="f6_bi", user="BASIC$ro_caowei", password="!ro_caowei123", host="hgprecn-cn-nif1vnv0y002-cn-shanghai.hologres.aliyuncs.com", port="80") # 获得游标对象,一个游标对象可以对数据库进行执行操作 cursor = conn.cursor() import datetime now_time = datetime.datetime.now() yes_time = now_time + datetime.timedelta(days=-1) yes_time_nyr = int(yes_time.strftime('%Y%m%d'))# 获取前一天日期 # sql语句 建表 sql =f"""SELECT * FROM "public"."holo_ads_report_saas_profile_ngv_detail_d" WHERE "date_id" = '{yes_time_nyr}';""" # 执行语句 cursor.execute(sql) # 获取结果集的每一行 rows = cursor.fetchall() # 获取所有字段名 all_fields = cursor.description #执行结果转化为dataframe col = [] for i in all_fields: col.append(i[0]) data_NGV = pd.DataFrame(list(rows),columns=col) # data_NGV.to_excel(r'C:\Users\admin\Desktop\NGV明细.xlsx') # 关闭数据库连接 cursor.close() conn.close() # 基础函数配置 import pandas as pd import requests from pathlib import Path from urllib.parse import quote import json import numpy as np import time ROOT = Path('.').absolute() # 当前工作目录 def generateToken() -> str: """ 生成 token """ token_api = 'https://api.dingtalk.com/v1.0/oauth2/accessToken' # 该信息在钉钉开放应用中 data = { "appKey": "ding5kqocon5s9oph5uq", "appSecret": 'HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_' } res = requests.post(token_api, json=data) token = res.json()['accessToken'] return token def read_instances_new(token, formUuid, page, n): """ 函数功能:读取流程表单的所有数据 """ api = f'https://api.dingtalk.com//v1.0/yida/processes/instances?pageNumber={page}&pageSize={n}' headers = { "Content-Type": "application/json", "x-acs-dingtalk-access-token": token } formData = { "appType": "APP_UYZ0KG6L0CCNV80GZ66O", "systemToken": "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", "userId": "yida_pub_account", # 超级管理员账号 "language": "zh_CN", "formUuid": formUuid, # "searchFieldJson": json.dumps(searchField), # 如果增加上这一项会要求升级宜搭存储 "instanceStatus": "RUNNING" } res = requests.post(api, headers=headers, json=formData) return res.json() def read_instances(token, formUuid, page, n): """ 函数功能:读取普通表单的所有数据 """ api = f'https://api.dingtalk.com//v1.0/yida/forms/instances/search' headers = { "Content-Type": "application/json", "x-acs-dingtalk-access-token": token } formData = { "appType" : "APP_UYZ0KG6L0CCNV80GZ66O", "systemToken" : "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", "userId" : "yida_pub_account", "language" : "zh_CN", "formUuid" : formUuid, "currentPage" : page, "pageSize" : n } res = requests.post(api, headers=headers, json=formData) return res.json() def update_instances(TOKEN, processInstanceId,code,name): """ 更新表单实例 """ api = f'https://api.dingtalk.com//v1.0/yida/forms/instances' headers = { "Content-Type": "application/json", "x-acs-dingtalk-access-token": TOKEN } data_new= { code : name } payload = { "appType" : "APP_UYZ0KG6L0CCNV80GZ66O", "systemToken" : "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2", "userId" : "yida_pub_account", "language" : "zh_CN", "formInstanceId" : processInstanceId, "useLatestVersion" : 'false', "updateFormDataJson" : json.dumps(data_new, cls=NpEncoder) #json.dumps(data_new, cls=NpEncoder) } res = requests.put(api, headers=headers,json =payload) return res.json() class NpEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.integer): return int(obj) elif isinstance(obj, np.floating): return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() else: return super(NpEncoder, self).default(obj) TOKEN = generateToken() # 读取续约服务流程 FORMID = "FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22" # 续约服务流程 # 读取流程表单数据 form_data = read_instances_new(token=TOKEN, formUuid=FORMID, page=1, n=100) PAGES = form_data.get('totalCount')//100 + 1 ALL_DATA_node = [] """ 获取全量数据 """ for i in range(1, PAGES+1): # form_data = read_processes_instances(token=TOKEN, formUuid=FORMID, createFromTimeGMT=CREATE_FROM, createToTimeGMT=CREATE_TO, page=i, n=100, searchField={'textField_l7if5ff9': '否'}) form_data = read_instances_new(token=TOKEN, formUuid=FORMID, page=i, n=100) for data in form_data.get('data'): ALL_DATA_node.append(data) print(f'读取到 续约服务流程表单中 {len(ALL_DATA_node)} 条数据!') # 校验状态 for i in range(0,len(ALL_DATA_node)): for a in range(len(data_NGV["org_code"])): if ALL_DATA_node[i]['data']['textField_ksydghqw'] == data_NGV.loc[a,"org_code"]: if data_NGV.loc[a,"active_status_fmt"] == "活跃": if ALL_DATA_node[i]['data']['textField_ksydghr5']=="否": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_ksydghr5',"是") print(i,ALL_DATA_node[i]['data']['textField_ksydghr5']) elif data_NGV.loc[a,"active_status_fmt"] == "不活跃": if ALL_DATA_node[i]['data']['textField_ksydghr5']=="是": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_ksydghr5',"否") print(i,ALL_DATA_node[i]['data']['textField_ksydghr5']) for i in range(0,len(ALL_DATA_node)): for a in range(len(data_NGV["org_code"])): if ALL_DATA_node[i]['data']['textField_ksydghqw'] == data_NGV.loc[a,"org_code"]: if data_NGV.loc[a,"is_g"] == "1": if ALL_DATA_node[i]['data']['textField_ksydghr8']=="否": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_ksydghr8',"是") print(i,ALL_DATA_node[i]['data']['textField_ksydghr5']) elif data_NGV.loc[a,"is_g"] == "0": if ALL_DATA_node[i]['data']['textField_ksydghr8']=="是": update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_ksydghr8',"否") print(i,ALL_DATA_node[i]['data']['textField_ksydghr5']) for i in range(0,len(ALL_DATA_node)): for a in range(len(data_NGV["org_code"])): if ALL_DATA_node[i]['data']['textField_ksydghqw'] == data_NGV.loc[a,"org_code"]: if ALL_DATA_node[i]['data']['textField_ksydghr7'] != data_NGV.loc[a,"bill_day_count_last_30_day"]: update_instances(TOKEN,ALL_DATA_node[i]['processInstanceId'],'textField_ksydghr7',data_NGV.loc[a,"bill_day_count_last_30_day"])