接车宝排查问题及续约待办派发数据源加载

This commit is contained in:
z66
2025-11-10 14:11:15 +08:00
parent 5f1d052f2f
commit 9798071f68
6 changed files with 806 additions and 75 deletions
-14
View File
@@ -31,20 +31,6 @@
</Attribute>
</value>
</entry>
<entry key="\test\feibiao.csv">
<value>
<Attribute>
<option name="separator" value="," />
</Attribute>
</value>
</entry>
<entry key="\test\异常服务待办派发.csv">
<value>
<Attribute>
<option name="separator" value="," />
</Attribute>
</value>
</entry>
</map>
</option>
</component>
+51 -61
View File
@@ -25,6 +25,7 @@ class CommonModule:
# 创建一个存储日期的集合,用于去重
self.date_set = set()
self.conn = Config.CONN_INFO
self.renewal_conn = Config.CONN_INFO_RENEWAL
def time_to_UTC(self, time_input):
"""
@@ -123,7 +124,7 @@ class CommonModule:
return data_NGV
except Exception as e:
print(f"Error occurred: {e}")
error_task_logger.error(f"获取NGV明细失败: {e}")
return None
def get_yichang_details(self, days_back=1):
@@ -170,11 +171,49 @@ class CommonModule:
return data_yichang
except Exception as e:
print(f"Error occurred: {e}")
error_task_logger.error(f"获取异常明细时出错: {e}")
return None
def get_renewal_details(self, ):
"""
从固定的数据库中获取续约待办数据
"""
try:
# 获得连接
conn = psycopg2.connect(**self.renewal_conn)
cursor = conn.cursor()
# 获取指定天数前的日期
now_time = datetime.now()
yes_time = now_time + timedelta(days=-2)
yes_time_nyr = int(yes_time.strftime('%Y%m%d')) # 获取前两天日期
# 获取指定天数前的日期
today = date.today()
days_to_add = 120
future_date = str(today + timedelta(days=days_to_add))
print("距离今天还有{}天的日期是:{}".format(days_to_add, future_date))
sql = f"""SELECT * FROM "public"."holo_ads_report_saas_profile_ngv_detail_d" WHERE "date_id" = '{yes_time_nyr}' and "expiry_time" like '%{future_date}%';"""
# 执行语句并获取结果集
cursor.execute(sql)
rows = cursor.fetchall()
all_fields = cursor.description # 获取所有字段名
# 执行结果转化为dataframe
col = [i[0] for i in all_fields]
data_NGV = pd.DataFrame(list(rows), columns=col)
# 关闭数据库连接
cursor.close()
conn.close()
return data_NGV
except Exception as e:
error_task_logger.error(f"获取续约待办数据时出错: {e}")
return None
def get_saas_data(self):
pass
def get_jcb_details(self, ):
"""
@@ -260,7 +299,7 @@ class CommonModule:
return data_NGV
except Exception as e:
print(f"Error occurred: {e}")
error_task_logger.error(f"获取借车宝NGV明细时出错: {e}")
return None
def get_syxcx_details(self, ):
@@ -313,7 +352,7 @@ class CommonModule:
return data_SY
except Exception as e:
print(f"Error occurred: {e}")
error_task_logger.error(f"获取私域小程序数据时出错: {e}")
return None
def get_commission_details(self, ):
@@ -363,12 +402,12 @@ class CommonModule:
return data_commission
except Exception as e:
print(f"Error occurred: {e}")
error_task_logger.error(f"获取小六提成数据时出错: {e}")
return None
def get_differentindustries_details(self, ):
"""
从f6operation_data_relay数据库中获取小六提成数据。
从f6operation_data_relay数据库中获取异业合作数据。
返回pandas DataFrame。
"""
@@ -413,7 +452,7 @@ class CommonModule:
return data_commission
except Exception as e:
print(f"Error occurred: {e}")
error_task_logger.error(f"获取异业合作数据时出错: {e}")
return None
def get_perforamnce_details(self, ):
@@ -467,62 +506,13 @@ class CommonModule:
return data_commission
except Exception as e:
print(f"Error occurred: {e}")
error_task_logger.error(f"获取履约表数据时出错: {e}")
return None
def get_commission_details(self, ):
"""
从f6operation_data_relay数据库中获取小六提成数据。
返回pandas DataFrame。
"""
try:
# 获得连接并创建游标
conn = pymysql.connect(
host=Config.BI_CONN_host,
database=Config.BI_CONN_INFO_database,
user=Config.BI_CONN_INFO_user,
password=Config.BI_CONN_INFO_password,
# charset='utf8mb4', # 设置字符集以避免编码问题
# cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
)
cursor = conn.cursor()
# 获取指定天数前的日期
# now_time = datetime.now()
# target_time = now_time + timedelta(days=-days_back)
# SQL 查询语句
sql = f"""
SELECT * FROM JianDaoYun_DailyVisit_Commission;
"""
# 执行查询并获取结果
cursor.execute(sql)
rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表
# 将结果转换为 DataFrame
if rows: # 如果有数据
data_commission = pd.DataFrame(rows)
else: # 如果没有数据,返回空 DataFrame
data_commission = pd.DataFrame()
# 关闭游标
cursor.close()
headers = [
"门店id", "提成类型_二级分类", "提成基数(本月)", "提成基数(上月)", "公司id", "门店编码", "门店名称"
]
data_commission.columns = headers
return data_commission
except Exception as e:
print(f"Error occurred: {e}")
return None
def get_GroupNotification_details(self, ):
"""
从f6operation_data_relay数据库中获取小六提成数据。
从f6operation_data_relay数据库中获取短信数据支撑数据。
返回pandas DataFrame。
"""
@@ -568,7 +558,7 @@ class CommonModule:
return data_commission
except Exception as e:
print(f"Error occurred: {e}")
error_task_logger.error(f"获取短信数据支撑数据时出错: {e}")
return None
from datetime import datetime, timedelta, UTC, timezone
+3
View File
@@ -8525,3 +8525,6 @@ Traceback (most recent call last):
File "D:\ProgramTools\anaconda3\envs\saas\Lib\site-packages\pandas\core\indexes\range.py", line 417, in get_loc
raise KeyError(key)
KeyError: 'org_code'
2025-11-10 09:33:28,889 - JCB_efficient_car_pickup.py - error_task_logger - ERROR - 接车宝日常派发执行出错:获取接车宝数据失败,返回None
2025-11-10 09:34:26,005 - JCB_efficient_car_pickup.py - error_task_logger - ERROR - 接车宝日常派发执行出错:获取接车宝数据失败,返回None
2025-11-10 09:44:51,934 - JCB_efficient_car_pickup.py - error_task_logger - ERROR - 接车宝日常派发执行出错:获取接车宝数据失败,返回None
+8
View File
@@ -13,6 +13,14 @@ class Config:
"port": "80"
} # SaaS-NGV 数据库链接配置-postgresql
CONN_INFO_RENEWAL = {
"database": "f6_bi",
"user": "LTAI5tMJsijFA9BS1R6uBpUT",
"password": "PajEQMIRWNRcipd8mYvlud2KHWJr6N",
"host": "hgpostcn-cn-m1e4gikbu00l-cn-shanghai.hologres.aliyuncs.com",
"port": "80"
} # 续约回访数据库链接配置-postgresql
HS_DB_Config = {
'host': "f6-public.rwlb.rds.aliyuncs.com",
'user': "rw_operation_data_relay",
+627
View File
@@ -0,0 +1,627 @@
import datetime
starttime = datetime.datetime.now()
# -*- coding: utf-8 -*-
import psycopg2
import pandas as pd
from datetime import date, timedelta
# 获得连接
# conn = psycopg2.connect(database="f6_bi", user="BASIC$ro_caowei", password="!ro_caowei123", host="hgprecn-cn-nif1vnv0y002-cn-shanghai.hologres.aliyuncs.com", port="80")
conn = psycopg2.connect(database="f6_bi", user="LTAI5tMJsijFA9BS1R6uBpUT", password="PajEQMIRWNRcipd8mYvlud2KHWJr6N", host="hgpostcn-cn-m1e4gikbu00l-cn-shanghai.hologres.aliyuncs.com", port="80")
# 获得游标对象,一个游标对象可以对数据库进行执行操作
cursor = conn.cursor()
import datetime
now_time = datetime.datetime.now()
yes_time = now_time + datetime.timedelta(days=-2)
yes_time_nyr = int(yes_time.strftime('%Y%m%d'))# 获取前一天日期
today = date.today()
days_to_add = 120
future_date = str(today + timedelta(days=days_to_add))
# 输出结果
print("距离今天还有{}天的日期是:{}".format(days_to_add, future_date))
# sql语句 建表
sql =f"""SELECT * FROM "public"."holo_ads_report_saas_profile_ngv_detail_d" WHERE "date_id" = '{yes_time_nyr}' and "expiry_time" like '%{future_date}%';"""
# 执行语句
cursor.execute(sql)
# 获取结果集的每一行
rows = cursor.fetchall()
# 获取所有字段名
all_fields = cursor.description
#执行结果转化为dataframe
col = []
for i in all_fields:
col.append(i[0])
data_NGV = pd.DataFrame(list(rows),columns=col)
# data_NGV.to_excel(r'C:\Users\admin\Desktop\NGV明细.xlsx')
# 关闭数据库连接
cursor.close()
conn.close()
# 基础函数配置
import pandas as pd
import pandas as pd
import requests
from pathlib import Path
from urllib.parse import quote
import json
import numpy as np
import time
from datetime import date, timedelta
ROOT = Path('.').absolute() # 当前工作目录
textField_lrzoowld = "正常" # 运行状态
textField_lrzoowlb = "" # 信息说明
def generateToken() -> str:
""" 生成 token """
token_api = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
# 该信息在钉钉开放应用中
data = {
"appKey": "ding5kqocon5s9oph5uq",
"appSecret": 'HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_'
}
res = requests.post(token_api, json=data)
token = res.json()['accessToken']
return token
def read_instances(token, formUuid, page, n):
""" 函数功能:读取普通表单的所有数据 """
api = f'https://api.dingtalk.com//v1.0/yida/forms/instances/search'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
formData = {
"appType" : "APP_UYZ0KG6L0CCNV80GZ66O",
"systemToken" : "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"userId" : "yida_pub_account",
"language" : "zh_CN",
"formUuid" : formUuid,
"currentPage" : page,
"pageSize" : n
}
res = requests.post(api, headers=headers, json=formData)
return res.json()
def read_delete(token, formInstanceId):
""" 函数功能:调用本接口删除表单数据。 """
api = f'https://api.dingtalk.com//v1.0/yida/forms/instances'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
formData = {
"appType" : "APP_UYZ0KG6L0CCNV80GZ66O",
"systemToken" : "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"userId" : "yida_pub_account",
"language" : "zh_CN",
"formInstanceId" : formInstanceId
}
res = requests.delete(api, headers=headers, json=formData)
return res.json()
def read_new(FORMID,formData):
""" 通过实例id 获取表单内容 """
api = f'https://api.dingtalk.com/v1.0/yida/forms/instances'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": TOKEN
}
payload = {
"formUuid" : FORMID,
"appType" : "APP_UYZ0KG6L0CCNV80GZ66O",
"formDataJson" : json.dumps(formData, cls=NpEncoder),
"systemToken" : "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"language" : "zh_CN",
"userId" : "yida_pub_account"
}
res = requests.post(api, headers=headers, json=payload)
print(res.json())
return res.json()
def component(FORMID,TOKEN):
""" 获取组件信息 """
api = f'https://api.dingtalk.com//v1.0/yida/forms/formFields'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": TOKEN
}
payload = {
"formUuid" : FORMID,
"appType" : "APP_UYZ0KG6L0CCNV80GZ66O",
# "formDataJson" : json.dumps(formData, cls=NpEncoder),
"systemToken" : "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
# "language" : "zh_CN",
"userId" : "yida_pub_account"
}
res = requests.get(api, headers=headers, json=payload)
return res.json()
def initiate_process(TOKEN,formData):
""" 发起宜搭审批流程 """
api = f'https://api.dingtalk.com//v1.0/yida/processes/instances/start'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": TOKEN
}
payload = {
"appType" : "APP_UYZ0KG6L0CCNV80GZ66O",
"systemToken" : "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"userId" : "yida_pub_account",
"language" : "zh_CN",
"formUuid" : "FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22",
"formDataJson" : json.dumps(formData, cls=NpEncoder),
"processCode" : "TPROC--PE866MD1MJMU0WGLYRFLYEN5YN9L1885Z7ZUK32",
}
res = requests.post(api, headers=headers, json=payload)
return res.json()
def delete_in_batches(FORMID,TOKEN,ALL_DATA_instance):
""" 批量删除表单实例 """
api = f'https://api.dingtalk.com//v1.0/yida/forms/instances/batchRemove'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": TOKEN
}
payload = {
"formUuid" : FORMID,
"appType" : "APP_UYZ0KG6L0CCNV80GZ66O",
"asynchronousExecution" : "true",
"systemToken" : "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"formInstanceIdList" : json.dumps(ALL_DATA_instance, cls=NpEncoder),
"userId" : "yida_pub_account",
"executeExpression" : "false" # 不触发
}
res = requests.post(api, headers=headers, json=payload)
return res.json()
def delete_in(TOKEN,formInstanceIdList):
""" 逐条删除表单实例 """
api = f'https://api.dingtalk.com//v1.0/yida/forms/instances?appType=APP_UYZ0KG6L0CCNV80GZ66O&systemToken=XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2&userId=yida_pub_account&language=zh_CN&formInstanceId={formInstanceIdList}'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": TOKEN
}
res = requests.delete(api, headers=headers)
return res.json()
def read_instances_ngv(token, formUuid, page, n,searchField):
""" 函数功能:读取普通表单的所有数据 """
api = f'https://api.dingtalk.com//v1.0/yida/forms/instances/search'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
formData = {
"appType" : "APP_UYZ0KG6L0CCNV80GZ66O",
"systemToken" : "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"userId" : "yida_pub_account",
"language" : "zh_CN",
"formUuid" : formUuid,
"searchFieldJson": json.dumps(searchField),
"currentPage" : page,
"pageSize" : n
}
res = requests.post(api, headers=headers, json=formData)
return res.json()
class NpEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(NpEncoder, self).default(obj)
import binascii
import time
import random
from pyDes import des, CBC, PAD_PKCS5
import requests
def des_encrypt(s):
"""
DES 加密
:param s: 原始字符串
:return: 加密后字符串,16进制
"""
secret_key = 'HwdMBW8o'
iv = secret_key
k = des(secret_key, CBC, iv, pad=None, padmode=PAD_PKCS5)
en = k.encrypt(s, padmode=PAD_PKCS5)
return binascii.b2a_base64(en, newline=False)
def des_descrypt(s):
"""
DES 解密
:param s: 加密后的字符串,16进制
:return: 解密后的字符串
"""
secret_key = 'HwdMBW8o'
iv = secret_key
k = des(secret_key, CBC, iv, pad=None, padmode=PAD_PKCS5)
de = k.decrypt(binascii.a2b_base64(s), padmode=PAD_PKCS5)
return de
TOKEN = generateToken()
'''读取省市小六技术专家区域客服区域客成 '''
FORMID = "FORM-8C7E8036770B495D99862638F87FA8BFOEEN" #省市小六技术专家区域客服区域客成
try:
# 读取流程表单数据
form_data = read_instances(token=TOKEN, formUuid=FORMID, page=1, n=100)
PAGES = form_data.get('totalCount')//100 + 1
textField_gif29wy = {}
textField_3athky8 = {}
textField_3hgho1m = {}
textField_nc7gskc = {}
""" 获取全量数据 """
for i in range(1, PAGES+1):
# form_data = read_processes_instances(token=TOKEN, formUuid=FORMID, createFromTimeGMT=CREATE_FROM, createToTimeGMT=CREATE_TO, page=i, n=100, searchField={'textField_l7if5ff9': '否'})
form_data = read_instances(token=TOKEN, formUuid=FORMID, page=i, n=100)
for data in form_data.get('data'):
textField_gif29wy[data['formData']['textField_m3hchxc']]=data['formData']['textField_gif29wy'] #区域客成id # 根据市做判断
textField_3athky8[data['formData']['textField_m3hchxc']]=data['formData']['textField_3athky8'] #区域客服id
# textField_3hgho1m[data['formData']['textField_3hgho1m']]=data['formData']['textField_3hgho1m'] #小六id
# textField_nc7gskc[data['formData']['textField_nc7gskc']]=data['formData']['textField_nc7gskc'] #技术专家id
print(f'读取到省市小六技术专家区域客服区域客成表单中 {len(textField_gif29wy)} 条数据!')
'''遍历数据进行新建'''
data_NGV = data_NGV.astype('string')
data_NGV = data_NGV.fillna('',inplace=False)
group_grade = {
"普通客户(VIP":10,
"重要客户(SVIP":20,
"区域KAMVP":30,
"全国KAFMVP":50
}
# 过滤数据
for i in range(0,len(data_NGV["date_fmt"])):
try:
t = time.time()
ts = int(round(t * 1000))
randint = random.randint(100000000, 999999999)
req = data_NGV['id_own_org'][i] + "_" + str(ts) + "_" + str(randint)
str_en = des_encrypt(req)
req_new = str_en.decode('utf-8')
url = f"http://manage.f6yc.com/hive-admin/py/yida/renewal/orgInfo"
data = {
'req':req_new,
't':ts,
'r':randint
}
res = requests.post(url,data=data)
formData = res.json()['data']['yidaFormData']
# 过期日期的时间戳
expire_timestamp = int(formData['dateField_ksirro5l'])/1000
# 获取距离过期日期前120天,前90天,前60天,前30天的日期
expire_date = datetime.datetime.fromtimestamp(expire_timestamp)
before_90_days = expire_date - datetime.timedelta(days=90)
before_60_days = expire_date - datetime.timedelta(days=60)
before_30_days = expire_date - datetime.timedelta(days=30)
# print(formData)
formData['dateField_ljzefdm4'] = str(int(before_90_days.timestamp()*1000)) # 90天限制日期
formData['dateField_ljzefdm5'] = str(int(before_60_days.timestamp()*1000)) # 60天限制日期
formData['dateField_ljzefdm6'] = str(int(before_30_days.timestamp()*1000)) # 30天限制日期
employeeField_kykw5ege = str(formData['employeeField_kykw5ege'])
employeeField_ksydghrd = str(formData['employeeField_ksydghrd'])
formData['employeeField_ljz6gvwc'] = f"['{textField_gif29wy[formData['textField_kuj8nx01']]}', '{textField_3athky8[formData['textField_kuj8nx01']]}']" # 区域客成+区域客服
# formData['employeeField_ljz6416i'] = f"[{textField_gif29wy[formData['textField_kuj8nx01']]}, {employeeField_kykw5ege}]" # 区域客成+小六
formData['employeeField_ljz6416i'] = f"['{textField_gif29wy[formData['textField_kuj8nx01']]}', '{employeeField_kykw5ege}']" # 区域客成+小六
formData['employeeField_ljz6416j'] = f"['{textField_gif29wy[formData['textField_kuj8nx01']]}', '{employeeField_ksydghrd}']" # 区域客成+技术专家
formData['employeeField_ljz6gvwd'] = textField_3athky8[formData['textField_kuj8nx01']] # 区域客服
formData['employeeField_ksydght0'] = textField_gif29wy[formData['textField_kuj8nx01']] # 区域客成
if employeeField_kykw5ege =="1824534815658365" or employeeField_kykw5ege =="0627252740652855":
if employeeField_ksydghrd =="":
employeeField_kykw5ege = '0627252740652855'
formData['employeeField_ljz6gvwc'] = '0627252740652855' # 区域客成+区域客服
formData['employeeField_ljz6416i'] = '0627252740652855' # 区域客成+小六
formData['employeeField_ljz6416j'] = '0627252740652855' # 区域客成+技术专家
formData['employeeField_ljz6gvwd'] = '0627252740652855' # 区域客服
formData['employeeField_ksydght0'] = '0627252740652855' # 区域客成
formData['employeeField_kykw5ege'] = '0627252740652855' # 专属运营顾问
formData['employeeField_ksirro5o'] = '0627252740652855' # 续约绩效归属人
else:
employeeField_kykw5ege = employeeField_ksydghrd
formData['employeeField_ljz6gvwc'] = employeeField_ksydghrd # 区域客成+区域客服
formData['employeeField_ljz6416i'] = employeeField_ksydghrd # 区域客成+小六
formData['employeeField_ljz6416j'] = employeeField_ksydghrd # 区域客成+技术专家
formData['employeeField_ljz6gvwd'] = employeeField_ksydghrd # 区域客服
formData['employeeField_ksydght0'] = employeeField_ksydghrd # 区域客成
formData['employeeField_kykw5ege'] = employeeField_ksydghrd # 专属运营顾问
formData['employeeField_ksirro5o'] = employeeField_ksydghrd # 续约绩效归属人
if formData['textField_kycfic6o'] == "区域KAMVP" or formData['textField_kycfic6o'] == "全国KAFMVP":
formData['employeeField_ljz6gvwc'] = f"['{employeeField_kykw5ege}', '{employeeField_ksydghrd}']" # 小六+技术专家
formData['employeeField_ljz6416i'] = f"['{employeeField_kykw5ege}', '{employeeField_ksydghrd}']" # 小六+技术专家
formData['employeeField_ljz6416j'] = f"['{employeeField_kykw5ege}', '{employeeField_ksydghrd}']" # 小六+技术专家
formData['employeeField_ljz6gvwd'] = f"['{employeeField_kykw5ege}', '{employeeField_ksydghrd}']" # 小六+技术专家
try:
formData['textField_ksirro5g'] = group_grade[data_NGV['group_grade'][i]]
formData['textField_kycfic6o'] = data_NGV['group_grade'][i]
except:
pass
try:
formData['textField_liwg9trm'] = res.json()['data']['franchiseGroupInfo']['groupName']
except:
pass
# 富文本 超链接 NGV
try:
form_data_ngv = read_instances_ngv(token=TOKEN, formUuid="FORM-ZK866D91O9LA4NIHCARG2DPIPCXF3Z087PPHL91", page=1, n=100, searchField={'textField_l8nc9f2': data_NGV['id_own_org'][i]})
formData['editorField_m3gn517y'] = ["root",{},["p",{},["span",{"data-type":"text"},["span",{"data-type":"leaf"},""]],["a",{"href":"https://f6car.aliwork.com/APP_UYZ0KG6L0CCNV80GZ66O/formDetail/FORM-ZK866D91O9LA4NIHCARG2DPIPCXF3Z087PPHL91?formInstId="+form_data_ngv['data'][0]['formInstanceId']+"&isAdmin=true"},["span",{"data-type":"text"},["span",{"unlink":{},"data-type":"leaf"},"点击查看门店NGV"]]],["span",{"data-type":"text"},["span",{"unlink":{},"data-type":"leaf"},""]]]] # 富文本 超链接 NGV
except:
pass
res_new = initiate_process(TOKEN,formData)
time.sleep(2)
print(res_new)
# 回传信息-------------------------------------------------------------------------------------------------------------
default_new = True
a_len = 1
while default_new:
t = time.time()
ts = int(round(t * 1000))
randint = random.randint(100000000, 999999999)
req = res_new['result'] + "|" + formData['textField_kuntp6fk'] + "|" + formData['textField_kuntp6fl']+ "|" + formData['employeeField_kykw5ege'] + "_" + str(ts) + "_" + str(randint)
str_en = des_encrypt(req)
print(str_en.decode('utf-8'))
req_new = str_en.decode('utf-8')
url = f"http://manage.f6yc.com/hive-admin/py/yida/renewal/insertRenewalFormsData"
data = {
'req':req_new,
't':ts,
'r':randint
}
res = requests.post(url,data=data)
res.json()
if res.json()['message'] == "SUCCESS":
default_new = False
a_len = a_len + 1
if a_len > 5:
default_new = False
time.sleep(1)
'''校验是否新建正常'''
FORMID = "FORM-L8966281PTZA73CDBTGQBDLM628M2P4X1OYHL0"
if a_len < 5:
print("数据新建成功!")
else:
def start_instance_process(token: str, name):
"""发送宜搭表单 -- 发起流程表单
Args:
token
data:需要发送的数据字典
"""
yida_api = "https://api.dingtalk.com/v1.0/yida/processes/instances/start"
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
send_data = {
"textField_l9fe0uiw": name,
"textField_l9fe0uiv": name
}
payload = {
"appType": "APP_TNVBVZ3K8G56HG03Z45Q",
"systemToken": "CH7669818R0WN18TYTYJ42PE6GY22WZN0BYWKD1",
"userId": "yida_pub_account",# 超级管理员账号
"language": "zh_CN",
"formUuid": "FORM-UX866Q61GNLAZBCIEDF77BGVIIR83K82WYPHLH2",
"formDataJson": json.dumps(send_data),
"processCode":"TPROC--UX866Q61GNLAZBCIEDF77BGVIIR83M92WYPHLI2"
}
res = requests.post(yida_api, headers=headers, json=payload)
return res
try:
name = f"[流程]续约服务流程 新建后接口回传失败,请检查!{data_NGV['id_own_org'][i]}"
res_yujing = start_instance_process(TOKEN,name)
except:
textField_lrzoowld = "异常" # 运行状态
textField_lrzoowlb = "[流程]续约服务流程 新建后接口回传失败,请检查" # 信息说明
except:
'''校验是否新建正常'''
FORMID = "FORM-L8966281PTZA73CDBTGQBDLM628M2P4X1OYHL0"
def start_instance_process(token: str, name):
"""发送宜搭表单 -- 发起流程表单
Args:
token
data:需要发送的数据字典
"""
yida_api = "https://api.dingtalk.com/v1.0/yida/processes/instances/start"
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
send_data = {
"textField_l9fe0uiw": name,
"textField_l9fe0uiv": name
}
payload = {
"appType": "APP_TNVBVZ3K8G56HG03Z45Q",
"systemToken": "CH7669818R0WN18TYTYJ42PE6GY22WZN0BYWKD1",
"userId": "yida_pub_account",# 超级管理员账号
"language": "zh_CN",
"formUuid": "FORM-UX866Q61GNLAZBCIEDF77BGVIIR83K82WYPHLH2",
"formDataJson": json.dumps(send_data),
"processCode":"TPROC--UX866Q61GNLAZBCIEDF77BGVIIR83M92WYPHLI2"
}
res = requests.post(yida_api, headers=headers, json=payload)
return res
try:
name = f"[流程]续约服务流程 未成功新建,请检查!{data_NGV['id_own_org'][i]}"
res_yujing = start_instance_process(TOKEN,name)
textField_lrzoowld = "异常" # 运行状态
textField_lrzoowlb = "[流程]续约服务流程 未成功新建,请检查" # 信息说明
except:
pass
except:
'''校验是否新建正常'''
FORMID = "FORM-L8966281PTZA73CDBTGQBDLM628M2P4X1OYHL0"
def start_instance_process(token: str, name):
"""发送宜搭表单 -- 发起流程表单
Args:
token
data:需要发送的数据字典
"""
yida_api = "https://api.dingtalk.com/v1.0/yida/processes/instances/start"
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
send_data = {
"textField_l9fe0uiw": name,
"textField_l9fe0uiv": name
}
payload = {
"appType": "APP_TNVBVZ3K8G56HG03Z45Q",
"systemToken": "CH7669818R0WN18TYTYJ42PE6GY22WZN0BYWKD1",
"userId": "yida_pub_account",# 超级管理员账号
"language": "zh_CN",
"formUuid": "FORM-UX866Q61GNLAZBCIEDF77BGVIIR83K82WYPHLH2",
"formDataJson": json.dumps(send_data),
"processCode":"TPROC--UX866Q61GNLAZBCIEDF77BGVIIR83M92WYPHLI2"
}
res = requests.post(yida_api, headers=headers, json=payload)
return res
try:
name = f"[流程]续约服务流程 表单数据读取失败,请检查!{data_NGV['id_own_org'][i]}"
res_yujing = start_instance_process(TOKEN,name)
except:
textField_lrzoowld = "异常" # 运行状态
textField_lrzoowlb = "[流程]续约服务流程 表单数据读取失败,请检查" # 信息说明
try:
import requests
import json
import numpy as np
def generateToken() -> str:
""" 生成 token """
token_api = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
# 该信息在钉钉开放应用中
data = {
"appKey": "ding5kqocon5s9oph5uq",
"appSecret": 'HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_'
}
res = requests.post(token_api, json=data)
token = res.json()['accessToken']
return token
def start_instance_process(token: str, send_data):
"""发送宜搭表单 -- 发起流程表单
Args:
token
data:需要发送的数据字典
"""
yida_api = "https://api.dingtalk.com/v1.0/yida/processes/instances/start"
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
payload = {
"appType": "APP_TNVBVZ3K8G56HG03Z45Q",
"systemToken": "CH7669818R0WN18TYTYJ42PE6GY22WZN0BYWKD1",
"userId": "yida_pub_account",# 超级管理员账号
"language": "zh_CN",
"formUuid": "FORM-96D58EF2219240C7B1F55F9CA463CD2D4MGC",
"formDataJson": json.dumps(send_data),
"processCode":"TPROC--5Q966D918T1I1AZM68NASC6TS13P3QOL3PZRLC"
}
res = requests.post(yida_api, headers=headers, json=payload)
return res
class NpEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(NpEncoder, self).default(obj)
'''校验是否正常运行'''
TOKEN = generateToken()
import datetime
endtime = datetime.datetime.now()
implement = (endtime - starttime).seconds
send_data = {
"textField_ls01al4o": implement, #运行耗时
"textField_lrzoowl8": "yida_xuyuedaiban_paifa", # 程序名称
"textField_lrzoowl9": "每天早上9点重新派发续约跟进任务", # 功能简述
"textField_lrzoowld": textField_lrzoowld, # 运行状态
"textField_lrzoowlb": textField_lrzoowlb # 信息说明
}
res_yujing = start_instance_process(TOKEN,send_data)
except:
pass
+117
View File
@@ -0,0 +1,117 @@
# 标准库
import os
import time
import random
import json
import binascii
from datetime import date, timedelta, datetime
from urllib.parse import quote
from pathlib import Path
# 第三方库
import numpy as np
import pandas as pd
import requests
from pyDes import des, CBC, PAD_PKCS5
import mysql.connector
from mysql.connector import Error
# PostgreSQL(如果你用到了)
import psycopg2
# 自定义模块
from config import Config
from api import API
from back_ground_module import CommonModule
from log_config import configure_task_logger, configure_error_task_logger
logger = configure_task_logger()
error_task_logger = configure_error_task_logger()
api_instance = API()
common_module = CommonModule()
output_dir = "output" # 设置输出目录
os.makedirs(output_dir, exist_ok=True)
class RenewalToDo:
def __init__(self):
self.json_list = None
self.data_NGV = None
self.staff_id_list = None
self.NGV_data_list = None
@staticmethod
def des_encrypt(s):
"""
DES 加密
:param s: 原始字符串
:return: 加密后字符串,16进制
"""
secret_key = 'HwdMBW8o'
iv = secret_key
k = des(secret_key, CBC, iv, pad=None, padmode=PAD_PKCS5)
en = k.encrypt(s, padmode=PAD_PKCS5)
return binascii.b2a_base64(en, newline=False)
@staticmethod
def des_descrypt(s):
"""
DES 解密
:param s: 加密后的字符串,16进制
:return: 解密后的字符串
"""
secret_key = 'HwdMBW8o'
iv = secret_key
k = des(secret_key, CBC, iv, pad=None, padmode=PAD_PKCS5)
de = k.decrypt(binascii.a2b_base64(s), padmode=PAD_PKCS5)
return de
def load_all_data(self):
# 获取NGV数据
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "675bb02bd2d53c2034c665e4"}
self.NGV_data_list = api_instance.entry_data_list(payload).get("data")
# 获取简道云员工id
payload = {"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "6769204a1902c9341340a1bc",
}
staff_id = api_instance.entry_data_list(payload)
self.staff_id_list = staff_id.get("data") # api请求格式,将数据封装在data字典里
# 省市区人员关系表
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "676512ac3e54dc3159460c0a"}
json_dict = api_instance.entry_data_list(payload)
if json_dict and "data" in json_dict:
self.json_list = json_dict.get("data")
else:
print("加载省市区人员关系表失败")
self.json_list = []
# 数据库获取续约回访数据
self.data_NGV = common_module.get_renewal_details()
def process_data(self):
"""
:return:
"""
def main(self):
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
logger.info("任务开始")
# step1: 获取数据
self.load_all_data()
logger.info("加载数据完成")
# step2:数据处理
self.process_data()
common_module.send_task_status(task_start_time, "续约回访待办")
except Exception as e:
error_task_logger.error(f"续约回访待办发生错误{e}")
common_module.send_task_error(task_start_time, "续约回访待办", str(e))
if __name__ == '__main__':
RenewalToDo().main()