654 lines
26 KiB
Python
654 lines
26 KiB
Python
#!/Users/xuyeqiang/opt/miniconda3/envs/f6/bin/python3.9
|
||
from pandas import DataFrame
|
||
from playwright.sync_api import Playwright, sync_playwright
|
||
import re
|
||
import pandas as pd
|
||
import requests
|
||
import json
|
||
from typing import Optional, List, Dict, Any
|
||
import time
|
||
import cpca
|
||
import numpy as np
|
||
from decimal import Decimal
|
||
from datetime import datetime, timezone, timedelta, date, UTC
|
||
|
||
# 保存为CSV文件
|
||
output_dir = "output" # 设置输出目录
|
||
|
||
# 创建输出目录(如果不存在)
|
||
import os
|
||
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
js = """
|
||
Object.defineProperties(navigator, {webdriver:{get:()=>undefined}});
|
||
"""
|
||
|
||
|
||
@staticmethod
|
||
def entry_data_update(data: dict, max_retries: int = 20) -> dict: # 修改数据
|
||
"""
|
||
修改数据
|
||
:param max_retries: 最大重试次数,此处设置100次
|
||
:param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息
|
||
:return: 修改数据后简道云返回的结果
|
||
"""
|
||
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/update'
|
||
|
||
headers = {
|
||
'Authorization': "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN", # 曹伟应用api测试 appKey
|
||
'Content-Type': 'application/json'
|
||
}
|
||
|
||
payload = json.dumps({
|
||
"app_id": data['api_key'], # 应用ID
|
||
"entry_id": data['entry_id'], # 表单ID
|
||
"data_id": data['data_id'], # 数据ID
|
||
"data": data['data']
|
||
}
|
||
)
|
||
print(payload)
|
||
data_get = None
|
||
retries = 0
|
||
while retries <= max_retries:
|
||
try:
|
||
res = requests.post(url=url, data=payload, headers=headers)
|
||
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
|
||
data_get = res.json()
|
||
print("返回结果:", data_get)
|
||
if res.status_code == 200:
|
||
break # 成功则跳出循环
|
||
else:
|
||
print("请求失败, 将重新请求")
|
||
retries += 1
|
||
time.sleep(3) # 在重试之间稍作停顿
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"请求异常: {e}, 将重新请求")
|
||
retries += 1
|
||
time.sleep(10) # 在重试之间稍作停顿
|
||
if retries > max_retries:
|
||
print(f"超过最大重试次数({max_retries}),放弃此次请求")
|
||
continue
|
||
return data_get
|
||
|
||
|
||
def replace_decimals(obj):
|
||
if isinstance(obj, dict):
|
||
return {k: replace_decimals(v) for k, v in obj.items()}
|
||
elif isinstance(obj, list):
|
||
return [replace_decimals(item) for item in obj]
|
||
elif isinstance(obj, Decimal):
|
||
return float(obj) # 或者 str(obj)
|
||
return obj
|
||
|
||
|
||
class NpEncoder(json.JSONEncoder):
|
||
def default(self, obj):
|
||
if isinstance(obj, np.integer):
|
||
return int(obj)
|
||
elif isinstance(obj, np.floating):
|
||
return float(obj)
|
||
elif isinstance(obj, np.ndarray):
|
||
return obj.tolist()
|
||
else:
|
||
return super(NpEncoder, self).default(obj)
|
||
|
||
|
||
@staticmethod
|
||
def entry_data_batch_create(
|
||
data: dict,
|
||
chunk_size: int = 90,
|
||
max_retries: int = 20
|
||
) -> List[Optional[requests.Response]]: # 新建多条数据 注意简道云限制1次最多100条数据
|
||
"""
|
||
新建多条数据
|
||
:param max_retries: 最大重试次数,此处设置20次
|
||
:param data:应包含数据id、表单id、以及需要新建的信息,新建信息应该是一个列表
|
||
:param chunk_size: 简道云限制批量新建一次最多100条,这里默认值设置为90条一次
|
||
:return:返回请求后的结果
|
||
"""
|
||
data = replace_decimals(data)
|
||
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_create'
|
||
|
||
headers = {
|
||
'Authorization': "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN", # 曹伟应用api测试 appKey
|
||
'Content-Type': 'application/json'
|
||
}
|
||
|
||
"""
|
||
data_list 样式 # 后续优化发送数据样式 目前输入字段,后续优化输入表单名称
|
||
jiandaoyun_data_list['data_list'] = [{"_widget_1731650067055":{"value":f'{username}{password}'},
|
||
"_widget_1731650067056":{"value": f"{group}"}},
|
||
{"_widget_1731650067055":{"value":f'{username}{password}'},
|
||
"_widget_1731650067056":{"value": f"{group}"}}]
|
||
"""
|
||
|
||
# 获取data_list长度
|
||
total_length = len(data['data_list'])
|
||
print(f"多数据写入行数: {total_length}")
|
||
|
||
# 计算需要发送的次数
|
||
num_chunks = (total_length + chunk_size - 1) // chunk_size # //整除向下取证,需要加上chunk_size - 1保证不会有缺失数据
|
||
print(num_chunks)
|
||
data_get_list = []
|
||
for i in range(num_chunks):
|
||
start_index = i * chunk_size
|
||
end_index = min(start_index + chunk_size, total_length)
|
||
payload = json.dumps({
|
||
"app_id": data['api_key'], # 应用ID
|
||
"entry_id": data['entry_id'], # 表单ID
|
||
"data_list": data['data_list'][start_index:end_index],
|
||
"is_start_workflow": data.get('is_start_workflow', "false"),
|
||
"is_start_trigger": data.get('is_start_trigger', "false"),
|
||
}, cls=NpEncoder)
|
||
retries = 0
|
||
while retries <= max_retries:
|
||
try:
|
||
res = requests.post(url=url, data=payload, headers=headers)
|
||
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
|
||
data_get = res.json()
|
||
print(i, "返回结果:", data_get)
|
||
if data_get["status"] == "success":
|
||
data_get_list.append(data_get)
|
||
break # 成功则跳出循环
|
||
else:
|
||
print("请求失败, 将重新请求")
|
||
retries += 1
|
||
time.sleep(3) # 在重试之间稍作停顿
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"请求异常: {e}, 将重新请求")
|
||
retries += 1
|
||
time.sleep(0.1) # 在重试之间稍作停顿
|
||
if retries > max_retries:
|
||
print(f"超过最大重试次数({max_retries}),放弃此次请求")
|
||
data_get_list.append(None) # 或者可以选择记录失败的payload以便后续处理
|
||
|
||
return data_get_list
|
||
|
||
|
||
def data_batch_create(data: dict, max_retries: int = 20) -> Optional[requests.Response]: # 新建单条数据
|
||
"""
|
||
新建单条表单数据
|
||
:param max_retries: 最大重试次数
|
||
:param data: 应该包含应用id、表单id,以及新建的数据data['data']
|
||
:return: 返回创建后简道云返回的信息
|
||
"""
|
||
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/create'
|
||
|
||
headers = {
|
||
'Authorization': "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN", # 曹伟应用api测试 app_key
|
||
'Content-Type': 'application/json'
|
||
}
|
||
|
||
"""
|
||
data 样式 # 后续优化发送数据样式 目前输入字段,后续优化输入表单名称
|
||
jiandaoyun_data['data'] = {"_widget_1731650067055":{"value":f'{username}{password}'},
|
||
"_widget_1731650067056":{"value": f"{group}"}}
|
||
"""
|
||
|
||
payload = json.dumps({
|
||
"app_id": data['api_key'], # 应用ID
|
||
"entry_id": data['entry_id'], # 表单ID
|
||
"data": data['data'],
|
||
"is_start_workflow": data.get('is_start_workflow', "false"),
|
||
"is_start_trigger": data.get('is_start_trigger', "false"),
|
||
"transaction_id": data.get('transaction_id', "")
|
||
}
|
||
)
|
||
retries = 0
|
||
while retries <= max_retries:
|
||
try:
|
||
res = requests.post(url=url, data=payload, headers=headers)
|
||
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
|
||
data_get = res.json()
|
||
print("返回结果:", data_get)
|
||
if res.status_code == 200:
|
||
return data_get
|
||
else:
|
||
print("请求失败, 将重新请求")
|
||
retries += 1
|
||
time.sleep(3) # 在重试之间稍作停顿
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"请求异常: {e}, 将重新请求")
|
||
retries += 1
|
||
time.sleep(3) # 在重试之间稍作停顿
|
||
if retries > max_retries:
|
||
print(f"超过最大重试次数({max_retries}),放弃此次请求")
|
||
|
||
|
||
def entry_data_list(data: dict, replace: bool = False, max_retries: int = 20) -> Dict: # 获取多条表单数据
|
||
"""
|
||
获取多条表单数据
|
||
:param max_retries: 最大重试次数
|
||
:param replace: 是否替换字段
|
||
:param data:
|
||
api_key: 应用id
|
||
entry_id: 表单id
|
||
:return:
|
||
"""
|
||
|
||
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
|
||
|
||
headers = {
|
||
'Authorization': "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN", # 曹伟应用api测试 app_key
|
||
'Content-Type': 'application/json'
|
||
}
|
||
all_data_batches = [] # 用于存储每次请求返回的数据批次
|
||
last_data_id = None
|
||
exit_flag = False
|
||
while True:
|
||
payload = json.dumps({
|
||
"app_id": data['api_key'], # 应用ID
|
||
"entry_id": data['entry_id'], # 表单ID
|
||
"limit": 100,
|
||
"data_id": last_data_id
|
||
})
|
||
|
||
retries = 0
|
||
while retries <= max_retries:
|
||
try:
|
||
res = requests.post(url=url, data=payload, headers=headers)
|
||
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
|
||
data_get = res.json()
|
||
# print("返回结果:", data_get)
|
||
if data_get["data"]:
|
||
all_data_batches.extend(data_get['data'])
|
||
last_data_id = data_get['data'][-1].get('_id')
|
||
print(f"已获取 {len(all_data_batches)} 条数据")
|
||
break # 成功则跳出循环
|
||
else:
|
||
if 'data' not in data_get or len(data_get['data']) == 0:
|
||
exit_flag = True
|
||
break
|
||
print("请求失败, 将重新请求")
|
||
retries += 1
|
||
time.sleep(0.1) # 在重试之间稍作停顿
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"请求异常: {e}, 将重新请求")
|
||
retries += 1
|
||
time.sleep(0.1) # 在重试之间稍作停顿
|
||
if retries > max_retries:
|
||
print(f"超过最大重试次数({max_retries}),放弃此次请求")
|
||
all_data_batches.append(None) # 或者可以选择记录失败的payload以便后续处理
|
||
|
||
if exit_flag:
|
||
break
|
||
|
||
# 构建最终返回的字典
|
||
final_data = {
|
||
'data': all_data_batches # 'data' 键对应的值是列表的列表
|
||
}
|
||
|
||
return final_data
|
||
|
||
|
||
def run(playwright: Playwright) -> DataFrame:
|
||
browser = playwright.chromium.launch(headless=False)
|
||
context = browser.new_context(viewport={'width': 1700, 'height': 1080})
|
||
|
||
# Open new page
|
||
page = context.new_page()
|
||
page.add_init_script(js) # 隐藏 webdriver属性,不然拖动滑块会失败。
|
||
|
||
# Go to https://fws.carzone365.com/#/store/quitAudit
|
||
page.goto("https://fws-tmyc.tmallyc.com/#/store/quitAudit")
|
||
|
||
# Click [placeholder="请输入用户名"]
|
||
page.click("[placeholder=\"请输入用户名\"]")
|
||
|
||
# Fill [placeholder="请输入用户名"]
|
||
page.fill("[placeholder=\"请输入用户名\"]", "18362916786")
|
||
|
||
# Click [placeholder="请输入密码"]
|
||
page.click("[placeholder=\"请输入密码\"]")
|
||
|
||
# Fill [placeholder="请输入密码"]
|
||
page.fill("[placeholder=\"请输入密码\"]", "F6@12456")
|
||
|
||
""" 拖拽滑块验证 """
|
||
deltaX = 50000
|
||
steps = 100
|
||
element = page.wait_for_selector("text=请按住滑块,拖动到最右边")
|
||
boundingBox = element.bounding_box()
|
||
df = pd.DataFrame()
|
||
if boundingBox:
|
||
x = boundingBox.get('x') + boundingBox.get('width') / 2
|
||
y = boundingBox.get('y') + boundingBox.get('height') / 2
|
||
page.mouse.move(x, y)
|
||
page.mouse.down()
|
||
x1 = x + deltaX
|
||
page.mouse.move(x1, y, steps=steps)
|
||
page.mouse.up()
|
||
page.wait_for_timeout(1000)
|
||
page.click('xpath=//*[@id="app"]//button[contains(@class,"login-btn")]') # 登录
|
||
""" 开始自动化点击操作 """
|
||
page.click('xpath=//*[@id="app"]/section/section/aside/ul/li[2]/ul/li[2]/div/div') # 门店审批
|
||
# 将每一页显示的数量设置为100
|
||
page.click('xpath=//*[@id="app"]//input[@placeholder="请选择"]')
|
||
page.click('xpath=//span[text()="100条/页"]')
|
||
page.wait_for_timeout(2000)
|
||
page.click('xpath=//*[@id="app"]/section/section/main/div/div[3]/div[2]/div[2]/button[2]/span') # 查询
|
||
page.wait_for_timeout(1000)
|
||
# 查询出一共有多少条数据
|
||
input_string = page.text_content('xpath=//*[@id="app"]/section/section/main/div/div[4]/div[3]/div/span[1]')
|
||
# 使用正则表达式提取数字部分
|
||
numbers = re.findall(r'\d+', input_string)
|
||
# 将提取到的数字部分转换为整数列表
|
||
numbers = [int(num) for num in numbers][0]
|
||
print(f'numbers:{numbers}')
|
||
# 计算总页数
|
||
total_pages = (numbers + 100 - 1) // 100
|
||
|
||
# 计算最后一页条数
|
||
def calculate_last_page_data(total_numbers):
|
||
data_per_page = 100
|
||
last_page_data = total_numbers % data_per_page
|
||
return last_page_data if last_page_data != 0 else data_per_page
|
||
|
||
last_page_data = calculate_last_page_data(numbers)
|
||
print("最后一页显示的数据条数:", last_page_data)
|
||
|
||
# 如果需要翻页,可以在这里添加翻页的逻辑
|
||
# 创建一个空列表来存储每行的数据
|
||
data = []
|
||
last_page_data_len = 100
|
||
for page_new in range(1, total_pages + 1):
|
||
print(f"处理第 {page_new} 页的数据")
|
||
if page_new == total_pages: last_page_data_len = last_page_data
|
||
for i in range(1, last_page_data_len + 1):
|
||
# 逐条获取明细
|
||
string_1 = page.text_content(
|
||
'xpath=//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[' + str(
|
||
i) + ']/td[1]/div')
|
||
string_2 = page.text_content(
|
||
'xpath=//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[' + str(
|
||
i) + ']/td[2]/div')
|
||
string_3 = page.text_content(
|
||
'xpath=//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[' + str(
|
||
i) + ']/td[3]/div')
|
||
string_4 = page.text_content(
|
||
'xpath=//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[' + str(
|
||
i) + ']/td[4]/div')
|
||
string_5 = page.text_content(
|
||
'xpath=//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[' + str(
|
||
i) + ']/td[5]/div')
|
||
string_6 = page.text_content(
|
||
'xpath=//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[' + str(
|
||
i) + ']/td[6]/div')
|
||
string_7 = page.text_content(
|
||
'xpath=//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[' + str(
|
||
i) + ']/td[7]/div')
|
||
string_8 = page.text_content(
|
||
'xpath=//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[' + str(
|
||
i) + ']/td[8]/div')
|
||
|
||
if string_1 == "编辑门店":
|
||
continue
|
||
# 保存当前页面的上下文
|
||
context = page.context
|
||
|
||
# 点击按钮打开新页面(使用 Promise 等待弹出窗口)
|
||
with context.expect_page() as new_page_info:
|
||
page.click(
|
||
f'//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[{i}]/td[9]/div/button')
|
||
new_page = new_page_info.value
|
||
|
||
print(f"跳转到新页面: {new_page.url}")
|
||
|
||
# 使用新页面对象获取内容
|
||
string9 = new_page.text_content(
|
||
'xpath=/html/body/section/section/section/main/div/div[2]/div[3]/div/div[3]/span[2]')
|
||
|
||
string10 = new_page.text_content(
|
||
'xpath=/html/body/section/section/section/main/div/div[2]/div[3]/div/div[4]/span[2]')
|
||
|
||
# 关闭新页面
|
||
new_page.close()
|
||
|
||
# 确保焦点回到原始页面
|
||
page.bring_to_front()
|
||
|
||
df_address = cpca.transform([string_4])
|
||
string11 = string12 = string13 = ""
|
||
for index, row in df_address.iterrows():
|
||
string11 = row['省']
|
||
string12 = row['市']
|
||
string13 = row['区']
|
||
|
||
# 将数据添加到列表中
|
||
data.append(
|
||
[string_1, string_2, string_3, string_4, string_5, string_6, string_7, string_8, string9, string10,
|
||
string11, string12, string13])
|
||
print(data)
|
||
|
||
if page_new != total_pages:
|
||
try:
|
||
page.wait_for_timeout(1000)
|
||
except:
|
||
pass
|
||
# 创建DataFrame
|
||
|
||
df = pd.DataFrame(data,
|
||
columns=["类型", "门店名称", "门店id", "门店地址", "分类", "申请人", "状态", "申请时间",
|
||
"负责人", "联系电话", "省", "市", "区"])
|
||
df.to_excel(os.path.join(output_dir, "天猫门店审批.xlsx"), index=False)
|
||
|
||
time.sleep(1)
|
||
page.wait_for_timeout(1000)
|
||
context.close()
|
||
browser.close()
|
||
return df
|
||
|
||
|
||
def load_cus_data():
|
||
# 获取接车宝客服表单
|
||
payload = {"api_key": "66f3a68c6e56814df2c6b1af",
|
||
"entry_id": "6809d4ef063ece5c83fc61ad",
|
||
}
|
||
customer_service = entry_data_list(payload)
|
||
customer_service_list = customer_service.get("data") # api请求格式,将数据封装在data字典里
|
||
return customer_service_list
|
||
|
||
|
||
def row_to_dict(row, field_mapping):
|
||
"""将一行数据转换为指定格式的字典"""
|
||
result = {}
|
||
# print(field_mapping)
|
||
for col_name, widget_id in field_mapping.items():
|
||
# print(col_name, widget_id)
|
||
if col_name in row:
|
||
value = row[col_name]
|
||
clean_value = None if pd.isna(value) else value
|
||
result[widget_id] = {"value": clean_value}
|
||
return result
|
||
|
||
|
||
def today_customer_service_list1():
|
||
# 获取今日接车宝派发客服顺序
|
||
today_customer_service_list = []
|
||
all_customer_service_list = []
|
||
today_customer_service_start_list = []
|
||
customer_service_list = load_cus_data()
|
||
for row_items in customer_service_list:
|
||
# print(row_items)
|
||
customer_service_name_id = row_items.get("_widget_1740042824214", {}).get("username", {})
|
||
customer_service_name = row_items.get("_widget_1740042824214", {}).get("name", {})
|
||
customer_service_state = row_items.get("_widget_1740117343937", {})
|
||
is_last_day_end = row_items.get("_widget_1740042824216", {})
|
||
customer_service_data_id = row_items.get("_id", {})
|
||
print(customer_service_name, customer_service_name_id, customer_service_state, is_last_day_end)
|
||
all_customer_service_list.append(
|
||
[customer_service_name, customer_service_name_id, customer_service_state, is_last_day_end,
|
||
customer_service_data_id])
|
||
if is_last_day_end == "是": # 判断是否是下次开始位置
|
||
last_day_end_customer_service = customer_service_name_id
|
||
is_customer_service_data_id = row_items.get("_id", {})
|
||
|
||
split_index = None
|
||
for index, row in enumerate(all_customer_service_list):
|
||
print(row[3])
|
||
if row[3] == "是":
|
||
split_index = index
|
||
print(f"找到索引 {index}")
|
||
break
|
||
|
||
if split_index is not None:
|
||
# 根据索引切割列表
|
||
first_part = all_customer_service_list[split_index:] # 索引位置及之后的行
|
||
second_part = all_customer_service_list[:split_index] # 索引位置之前的行
|
||
# 调换两个子列表的位置并重新组合
|
||
today_customer_service_start_list = first_part + second_part
|
||
else:
|
||
# 如果没有找到“是”,保持原列表不变
|
||
today_customer_service_start_list = all_customer_service_list
|
||
pass
|
||
|
||
for index, row in enumerate(today_customer_service_start_list):
|
||
if row[2] == "开":
|
||
today_customer_service_list.append(row[1])
|
||
|
||
return today_customer_service_list, is_customer_service_data_id, all_customer_service_list
|
||
|
||
|
||
def send_request(df):
|
||
today_customer_service_list, is_customer_service_data_id, all_customer_service_list = today_customer_service_list1()
|
||
# 初始化派发索引
|
||
next_dispatcher_index = 0
|
||
|
||
# 显式循环分配跟进人
|
||
follow_up_persons = []
|
||
for _ in range(len(df)):
|
||
follow_up_person = today_customer_service_list[next_dispatcher_index]
|
||
follow_up_persons.append(follow_up_person)
|
||
next_dispatcher_index = (next_dispatcher_index + 1) % len(today_customer_service_list)
|
||
|
||
# 添加跟进人到 DataFrame
|
||
df["BD-负责人"] = follow_up_persons
|
||
df["线索来源"] = "安装服务"
|
||
|
||
# 获取下一个派发人
|
||
next_dispatcher = today_customer_service_list[next_dispatcher_index]
|
||
field_mapping = fields()
|
||
|
||
new_sign_abnormal_data = [row_to_dict(row, field_mapping) for index, row in
|
||
df.iterrows()]
|
||
data = {'api_key': '66f3a68c6e56814df2c6b1af', 'entry_id': "67f5dc467a9f5b2710da965a",
|
||
"data_list": new_sign_abnormal_data} # 派发数据
|
||
|
||
entry_data_batch_create(data)
|
||
|
||
data1 = {"api_key": "66f3a68c6e56814df2c6b1af",
|
||
"entry_id": "6809d4ef063ece5c83fc61ad",
|
||
"data_id": is_customer_service_data_id,
|
||
"data":
|
||
{"_widget_1740042824216": {"value": ""}, }
|
||
} # 原来的是"_widget_1740042824216": {"value": "是"},修改昨日截至人员
|
||
next_customer_service_data_id = None
|
||
|
||
for index, row in enumerate(all_customer_service_list):
|
||
print(row[3])
|
||
if row[1] == next_dispatcher:
|
||
next_customer_service_data_id = row[4]
|
||
break
|
||
|
||
data2 = {"api_key": "66f3a68c6e56814df2c6b1af",
|
||
"entry_id": "6809d4ef063ece5c83fc61ad",
|
||
"data_id": next_customer_service_data_id,
|
||
"data":
|
||
{"_widget_1740042824216": {"value": "是"}, }} # 明日派发起点人员
|
||
|
||
entry_data_update(data1)
|
||
entry_data_update(data2)
|
||
|
||
def send_task_status( task_start_time: str, task_name: str) -> None:
|
||
"""
|
||
将任务状态发送到简道云(开始时间为北京时间,需转换到 UTC)
|
||
:param task_start_time: 任务开始时间(字符串格式:"%Y-%m-%d %H:%M:%S",表示北京时间 UTC+8)
|
||
:param task_name: 任务名称
|
||
"""
|
||
print(1)
|
||
try:
|
||
# 1. 获取当前 UTC 时间(时区感知对象)
|
||
end_time_utc = datetime.now(UTC) # ✅ 替代 utcnow()
|
||
|
||
# 2. 解析传入的北京时间(UTC+8)
|
||
task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S")
|
||
|
||
# 3. 转换为 UTC 时间(减去 8 小时,并附加 UTC 时区)
|
||
task_start_utc = task_start_naive - timedelta(hours=8)
|
||
task_start_utc = task_start_utc.replace(tzinfo=timezone.utc) # 显式标记为 UTC
|
||
|
||
# 4. 计算运行时间(时区感知对象可直接相减)
|
||
run_time = end_time_utc - task_start_utc
|
||
run_time_sec = int(run_time.total_seconds())
|
||
|
||
# 5. 格式化时间为 UTC 的 ISO 8601 格式(带 "Z")
|
||
today_utc = end_time_utc.strftime("%Y-%m-%d")
|
||
task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
print(task_end_iso)
|
||
print(task_start_iso)
|
||
|
||
# 6. 构造请求数据(所有时间以 UTC 格式发送)
|
||
payload = {
|
||
"api_key": "6694d3c4fcb69ca9a111a6c4",
|
||
"entry_id": "67ede908eb9c22261016466e",
|
||
"data": {
|
||
"_widget_1744873387500": {"value": today_utc}, # UTC 日期
|
||
"_widget_1743644977694": {"value": task_name},
|
||
"_widget_1744873387501": {"value": task_start_iso}, # UTC 开始时间
|
||
"_widget_1744873387502": {"value": task_end_iso}, # UTC 结束时间
|
||
"_widget_1744873387504": {"value": run_time_sec},
|
||
}
|
||
}
|
||
|
||
# 7. 发送请求
|
||
response = data_batch_create(payload)
|
||
print(response)
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
|
||
|
||
def main():
|
||
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
with sync_playwright() as playwright:
|
||
df = run(playwright)
|
||
|
||
# 获取接车宝客服表单
|
||
payload = {"api_key": "66f3a68c6e56814df2c6b1af",
|
||
"entry_id": "67f5dc467a9f5b2710da965a",
|
||
}
|
||
|
||
BD_entry = entry_data_list(payload)
|
||
BD_list = BD_entry.get("data")
|
||
store_id_list = []
|
||
for row_items in BD_list:
|
||
store_id = row_items.get("_widget_1744177321451", {})
|
||
store_id_list.append(store_id)
|
||
|
||
if df is not None:
|
||
for index, row in df.iterrows():
|
||
if row["门店id"] in store_id_list:
|
||
print("数据已存在,跳过发送请求。")
|
||
df = df.drop(index) # 删除该行
|
||
continue
|
||
send_request(df)
|
||
|
||
send_task_status(task_start_time, "天猫门店审批转安装服务意向BD")
|
||
|
||
|
||
|
||
|
||
def fields():
|
||
field_mapping = {"省": "_widget_1744177321450", "市": "_widget_1744182647145",
|
||
"区": "_widget_1744182647146", "门店名称": "_widget_1744177321449",
|
||
"门店id": "_widget_1744177321451", "负责人": "_widget_1744177321452",
|
||
"联系电话": "_widget_1744177321453", "BD-负责人": "_widget_1744182647149",
|
||
"线索来源":"_widget_1744187212674",
|
||
}
|
||
return field_mapping
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|