Files
F6--/其它系统脚本/huifang_xinqian_180.py
2026-01-30 11:28:35 +08:00

398 lines
18 KiB
Python

import holidays
from datetime import date
# 创建一个中国节假日对象
cn_holidays = holidays.China()
# 获取当前日期
input_date = date.today()
switch = ""
# 判断日期是否为节假日
if input_date in cn_holidays:
switch = "节假日"
print(f"{input_date} 是节假日")
else:
switch = "非节假日"
print(f"{input_date} 不是节假日")
date_list = []
for data in cn_holidays:
date_list.append(str(data))
from datetime import date, timedelta
def get_saturdays(year):
start_date = date(year, 1, 1)
end_date = date(year, 12, 31)
saturdays = []
while start_date <= end_date:
if start_date.weekday() == 6: # 周日的weekday值为6
saturdays.append(start_date)
start_date += timedelta(days=1)
return saturdays
current_year = date.today().year
saturdays = get_saturdays(current_year)
print(saturdays)
for data in saturdays:
date_list.append(str(data))
date_list = list(set(date_list))
date_list = sorted(date_list, reverse=False)
# -*- coding: utf-8 -*-
import psycopg2
import pandas as pd
# 获得连接
conn = psycopg2.connect(database="f6_bi", user="BASIC$ro_caowei", password="!ro_caowei123", host="hgprecn-cn-nif1vnv0y002-cn-shanghai.hologres.aliyuncs.com", port="80")
# 获得游标对象,一个游标对象可以对数据库进行执行操作
cursor = conn.cursor()
import datetime
now_time = datetime.datetime.now()
yes_time = now_time + datetime.timedelta(days=-1)
yes_time_nyr = int(yes_time.strftime('%Y%m%d'))# 获取前一天日期
# sql语句 建表
sql =f"""SELECT * FROM "public"."holo_ads_report_saas_profile_ngv_detail_d" WHERE "date_id" = '{yes_time_nyr}' ;"""
# 执行语句
cursor.execute(sql)
# 获取结果集的每一行
rows = cursor.fetchall()
# 获取所有字段名
all_fields = cursor.description
#执行结果转化为dataframe
col = []
for i in all_fields:
col.append(i[0])
data_NGV_j = pd.DataFrame(list(rows),columns=col)
# data_NGV.to_excel(r'C:\Users\admin\Desktop\NGV明细.xlsx')
# 关闭数据库连接
cursor.close()
conn.close()
import pandas as pd
import pandas as pd
import numpy as np
import requests
import json
import time
import re
from datetime import datetime
from dateutil.relativedelta import relativedelta
from pathlib import Path
from urllib.parse import quote
from io import BytesIO
ROOT = Path('.').absolute() # 当前工作目录
# 生成 token,参数不需要修改
def generateToken() -> str:
""" 生成 token """
token_api = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
# 该信息在钉钉开放应用中
data = {
"appKey": "ding5kqocon5s9oph5uq",
"appSecret": 'HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_'
}
res = requests.post(token_api, json=data)
token = res.json()['accessToken']
return token
def read_instances(token, formUuid, page, n,formatted_today,formatted_today_two):
""" 函数功能:读取普通表单的所有数据 """
api = f'https://api.dingtalk.com//v1.0/yida/forms/instances/search'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
formData = {
"appType" : "APP_UYZ0KG6L0CCNV80GZ66O",
"systemToken" : "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"userId" : "yida_pub_account",
"language" : "zh_CN",
"formUuid" : formUuid,
"createFromTimeGMT" : formatted_today,
"createToTimeGMT" : formatted_today_two,
"currentPage" : page,
"pageSize" : n
}
res = requests.post(api, headers=headers, json=formData)
return res.json()
def initiate_process(TOKEN,formUuid,processCode,formData):
""" 发起宜搭审批流程 """
api = f'https://api.dingtalk.com//v1.0/yida/processes/instances/start'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": TOKEN
}
payload = {
"appType" : "APP_UYZ0KG6L0CCNV80GZ66O",
"systemToken" : "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"userId" : "yida_pub_account",
"language" : "zh_CN",
"formUuid" : formUuid,
"formDataJson" : json.dumps(formData, cls=NpEncoder),
"processCode" : processCode,
}
res = requests.post(api, headers=headers, json=payload)
return res.json()
def read_instances_new(token, formUuid, page, n):
""" 函数功能:读取普通表单的所有数据 """
api = f'https://api.dingtalk.com//v1.0/yida/forms/instances/search'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
formData = {
"appType" : "APP_UYZ0KG6L0CCNV80GZ66O",
"systemToken" : "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"userId" : "yida_pub_account",
"language" : "zh_CN",
"formUuid" : formUuid,
"currentPage" : page,
"pageSize" : n
}
res = requests.post(api, headers=headers, json=formData)
return res.json()
def read_instances_ngv(token, formUuid, page, n,searchField):
""" 函数功能:读取普通表单的所有数据 """
api = f'https://api.dingtalk.com//v1.0/yida/forms/instances/search'
headers = {
"Content-Type": "application/json",
"x-acs-dingtalk-access-token": token
}
formData = {
"appType" : "APP_UYZ0KG6L0CCNV80GZ66O",
"systemToken" : "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2",
"userId" : "yida_pub_account",
"language" : "zh_CN",
"formUuid" : formUuid,
"searchFieldJson": json.dumps(searchField),
"currentPage" : page,
"pageSize" : n
}
res = requests.post(api, headers=headers, json=formData)
return res.json()
class NpEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(NpEncoder, self).default(obj)
TOKEN = generateToken()
import datetime
kaishiriqi = 173
kaishiriqi_1 = 172
date_one = 1
now_time = datetime.datetime.now()
# now_time = now_time + datetime.timedelta(days=+3)
if now_time.strftime("%Y-%m-%d") in date_list:
date_one = 0
print("开始次数:",date_one)
print("当前日期:",now_time)
for i in range(1,10):
new_date = now_time + datetime.timedelta(days=-i)
new_date = new_date.strftime("%Y-%m-%d")
print("遍历日期:",new_date)
if new_date in date_list:
date_one = date_one + 1
print("节假日期:",new_date)
else:
break
print("遍历次数:",date_one)
# now_time = datetime.datetime.now()
for i in range(0,date_one):
now_time = now_time + datetime.timedelta(days=-i)
today = now_time + datetime.timedelta(days=-kaishiriqi) # 根据周日和法定节假日 输出每日需要筛选的日期范围
formatted_today = today.strftime("%Y-%m-%d")
today_two = now_time + datetime.timedelta(days=-kaishiriqi_1)
formatted_today_two = today_two.strftime("%Y-%m-%d")
print(formatted_today,formatted_today_two)
timestamp_ms = int(time.time() * 1000) # 当然日期时间戳 毫秒级
'''读取省市小六技术专家区域客服区域客成 '''
FORMID = "FORM-TP866D918DFCA4FW79YZU5X43FO32QZJQDZJL7" #省市小六技术专家区域客服区域客成
# 读取流程表单数据
form_data = read_instances_new(token=TOKEN, formUuid=FORMID, page=1, n=100)
PAGES = form_data.get('totalCount')//100 + 1
textField_gif29wy = {}
textField_3athky8 = {}
textField_3hgho1m = {}
textField_nc7gskc = {}
textField_qk1e5di = {}
textField_m3hchxc = {} # 市
""" 获取全量数据 """
for i in range(1, PAGES+1):
form_data = read_instances_new(token=TOKEN, formUuid=FORMID, page=i, n=100)
for data in form_data.get('data'):
textField_gif29wy[data['formData']['textField_m3hchxc']]=data['formData']['textField_gif29wy'] #区域客成id
textField_3athky8[data['formData']['textField_3hgho1m']]=data['formData']['textField_3athky8'] #区域客服id
textField_3hgho1m[data['formData']['textField_3hgho1m']]=data['formData']['textField_3hgho1m'] #小六id
textField_nc7gskc[data['formData']['textField_3hgho1m']]=data['formData']['textField_nc7gskc'] #技术专家id
textField_qk1e5di[data['formData']['textField_3hgho1m']]=data['formData']['textField_lntniove'] #区域经理id
textField_m3hchxc[data['formData']['textField_m3hchxc']]=data['formData']['textField_3hgho1m'] #小六id-市
print(f'读取到省市小六技术专家区域客服区域客成表单中 {len(textField_gif29wy)} 条数据!')
# [流程]新签服务流程 读取待处理数据
json_data = []
read_data = read_instances(TOKEN, "FORM-9X766NA1SOATBRSH2K42D8BJCX7L326HI64TKG5", 1, 100,formatted_today,formatted_today_two)
PAGES = read_data.get('totalCount')//100 + 1
print(formatted_today,read_data.get('totalCount'))
for a in range(1,PAGES + 1):
read_data = read_instances(TOKEN, "FORM-9X766NA1SOATBRSH2K42D8BJCX7L326HI64TKG5", a, 100,formatted_today,formatted_today_two)
for i in range(0,len(read_data["data"])):
json_data.append(read_data["data"][i]['formData'])
# [流程]新签服务流程 读取待处理数据
json_data_all = []
for i in range(0,1000,100):
now_time = datetime.datetime.now()
today = now_time + datetime.timedelta(days=-(i+100))
formatted_today = today.strftime("%Y-%m-%d")
today_two = now_time + datetime.timedelta(days=-i)
formatted_today_two = today_two.strftime("%Y-%m-%d")
timestamp_ms = int(time.time() * 1000) # 当然日期时间戳 毫秒级
read_data = read_instances(TOKEN, "FORM-9X766NA1SOATBRSH2K42D8BJCX7L326HI64TKG5", 1, 100,formatted_today,formatted_today_two)
PAGES = read_data.get('totalCount')//100 + 1
print(formatted_today,formatted_today_two,read_data.get('totalCount'))
for a in range(1,PAGES + 1):
read_data = read_instances(TOKEN, "FORM-9X766NA1SOATBRSH2K42D8BJCX7L326HI64TKG5", a, 100,formatted_today,formatted_today_two)
for i in range(0,len(read_data["data"])):
json_data_all.append(read_data["data"][i]['formData'])
df = pd.DataFrame(json_data_all)
# 将所有字段设置为字符串类型
df = df.astype(str)
data_NGV = data_NGV_j[['org_code', 'id_own_group']]
data_NGV = data_NGV.rename(columns={'org_code': 'orgCode'})
result = pd.merge(df, data_NGV, on='orgCode', how='left')
data_details_not_null = result.sort_values(by='openTime', ascending=True).drop_duplicates(subset='id_own_group')
# 重置索引
data_details_not_null = data_details_not_null.reset_index(drop=True)
data_details_all = data_details_not_null.copy() # 替换名称 v2
# 获取待回访人员信息+去重
name_list = []
for data in json_data:
name_list.append(data['employeeField_kyi1dqth_id'])
unique_arr = []
[unique_arr.append(x) for x in name_list if x not in unique_arr]
print(unique_arr)
# 每日工作计划
formUuid = "FORM-4V966N81OMEEH85QDP8XW4AH1AIZ2GKRLZTML21"
processCode = "TPROC--4V966N81OMEEH85QDP8XW4AH1AIZ21ORLZTML31"
# 遍历数据并进行创建
from datetime import datetime, timedelta
for name in unique_arr:
tableField_lnsi0v71 = {}
tableField_lnsi0v71['dateField_lmelabb1'] = ""
tableField_lnsi0v71['employeeField_lmeqk429'] = ""
for data in json_data:
if data['employeeField_kyi1dqth_id'] == name and data['orgCode'] in data_details_all['orgCode'].values:
# 过滤近7天有派发记录的数据
# 获取当前日期
today = datetime.now()
# 计算前7天的日期
seven_days_ago = today - timedelta(days=7)
# 计算前8天的日期
seven_days_8 = today - timedelta(days=8)
# 过滤近7天有派发记录的数据
form_data_ces = read_instances_ngv(token=TOKEN, formUuid="FORM-KW766OD13WEEDPB6DSRTSD3IC2S131YMYKSML2", page=1, n=100, searchField={'textField_lmeqk42b': data["orgCode"]})
if form_data_ces['totalCount'] == 0:
date_string = seven_days_8.strftime("%Y-%m-%dT%H:%MZ")
else:
date_string = form_data_ces['data'][0]['createdTimeGMT']
date_format = "%Y-%m-%dT%H:%MZ"
date_object = datetime.strptime(date_string, date_format)
if date_object > seven_days_ago:
print("近7天内有派发记录,不重复派发!",data["orgCode"])
else:
print(textField_m3hchxc[data['city']])
data_one = {} # 子表单
data_two = {} # 主表单
# 子表单
data_one['textField_lnsi0v72'] = data['orgName']
data_one['textField_lnsi0v6s'] = data["saasEdition"] # 销售版本
try:
data_one['employeeField_lnsi0v6u'] = textField_nc7gskc[name[0]] # 技术专家
data_one['employeeField_lnsi0v6v'] = textField_qk1e5di[name[0]] # 区域经理
if textField_3hgho1m[name[0]] =="":
data_one['employeeField_lnsi0v6w'] = textField_m3hchxc[data['city']] # 运营顾问
else:
data_one['employeeField_lnsi0v6w'] = textField_3hgho1m[name[0]] # 运营顾问
except:
data_one['employeeField_lnsi0v6u'] = data['technician_id'][0] # 技术专家
data_one['employeeField_lnsi0v6v'] = data['manager_id'][0] # 区域经理
data_one['employeeField_lnsi0v6w'] = textField_m3hchxc[data['city']] # 运营顾问
data_one['dateField_lnsi0v6x'] = str(timestamp_ms) # 计划日期
data_one['textField_lnsi0v6y'] = data["orgName"] # 门店名称-拷贝
data_one['textField_lnsi0v6z'] = data["orgCode"] # 门店编码
data_one['selectField_lnsi0v6q'] = "强化培训" # 拜访目的
# data_one['textField_lnsi0v70'] = data_result.loc[i,"id_own_group"] # 公司id
data_one['textareaField_lnsi0v6r'] = '''
新签180天后:
1.客户日常使用问题解答;
2.跟客户查看主营分析,推荐进销存功能深度使用;
3.跟客户查看车辆分析,推荐短信及其他营销功能;
4.说明转介绍政策,索要转介绍;''' # 拜访目的说明
# 加入数据
tableField_lnsi0v71['tableField_lnsi0v71'] = [data_one]
# 主表单
# data_two['dateField_lmelabb1'] = str(timestamp_ms)
# data_two['employeeField_lmeqk429'] = name
tableField_lnsi0v71['dateField_lmelabb1'] = str(timestamp_ms)
try:
if name[0] =="":
tableField_lnsi0v71['employeeField_lmeqk429'] = textField_m3hchxc[data['city']] # 运营顾问
else:
tableField_lnsi0v71['employeeField_lmeqk429'] = name[0]
tableField_lnsi0v71['employeeField_lmeqk423'] = textField_qk1e5di[name[0]] # 区域经理
tableField_lnsi0v71['employeeField_lmeqk424'] = textField_nc7gskc[name[0]] # 技术专家
except:
tableField_lnsi0v71['employeeField_lmeqk429'] = textField_m3hchxc[data['city']] # 运营顾问
tableField_lnsi0v71['employeeField_lmeqk423'] = data['manager_id'][0] # 区域经理
tableField_lnsi0v71['employeeField_lmeqk424'] = data['technician_id'][0] # 技术专家
# 开户时间 大于3年派发给技术专家
try:
timestamp = data['openTime']
date_obj = datetime.fromtimestamp(timestamp / 1000)
current_date = datetime.now()
time_difference = current_date - date_obj
if time_difference > timedelta(days=365 * 3):
tableField_lnsi0v71['employeeField_lmeqk429'] = f"['{textField_gif29wy[data['city']]}', '{tableField_lnsi0v71['employeeField_lmeqk424']}']"
except:
pass
tableField_lnsi0v71['textField_lmu0523h'] = "自动派发回访计划" # 状态备用
# 富文本 超链接 NGV
try:
form_data_ngv = read_instances_ngv(token=TOKEN, formUuid="FORM-ZK866D91O9LA4NIHCARG2DPIPCXF3Z087PPHL91", page=1, n=100, searchField={'textField_zc1iowp': data["orgCode"]})
tableField_lnsi0v71['editorField_lodoplg0'] = ["root",{},["p",{},["span",{"data-type":"text"},["span",{"data-type":"leaf"},""]],["a",{"href":"https://f6car.aliwork.com/APP_UYZ0KG6L0CCNV80GZ66O/formDetail/FORM-ZK866D91O9LA4NIHCARG2DPIPCXF3Z087PPHL91?formInstId="+form_data_ngv['data'][0]['formInstanceId']+"&isAdmin=true"},["span",{"data-type":"text"},["span",{"unlink":{},"data-type":"leaf"},"点击查看门店NGV"]]],["span",{"data-type":"text"},["span",{"unlink":{},"data-type":"leaf"},""]]]] # 富文本 超链接 NGV
except:
pass
res=initiate_process(TOKEN,formUuid,processCode,tableField_lnsi0v71)
print(res,tableField_lnsi0v71)