Files
saas/test/天猫爬虫并派发.py
T
2025-08-12 13:43:10 +08:00

461 lines
18 KiB
Python

#!/Users/xuyeqiang/opt/miniconda3/envs/f6/bin/python3.9
from pandas import DataFrame
from playwright.sync_api import Playwright, sync_playwright
import re
import pandas as pd
from api import API
import requests
import json
from typing import Optional, List, Dict, Any
import time
import cpca
import numpy as np
import datetime
api_instance = API()
# 保存为CSV文件
output_dir = "output" # 设置输出目录
# 创建输出目录(如果不存在)
import os
os.makedirs(output_dir, exist_ok=True)
js = """
Object.defineProperties(navigator, {webdriver:{get:()=>undefined}});
"""
def data_batch_create(data: dict, max_retries: int = 20) -> Optional[requests.Response]: # 新建单条数据
"""
新建单条表单数据
:param max_retries: 最大重试次数
:param data: 应该包含应用id、表单id,以及新建的数据data['data']
:return: 返回创建后简道云返回的信息
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/create'
headers = {
'Authorization': "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN", # 曹伟应用api测试 app_key
'Content-Type': 'application/json'
}
"""
data 样式 # 后续优化发送数据样式 目前输入字段,后续优化输入表单名称
jiandaoyun_data['data'] = {"_widget_1731650067055":{"value":f'{username}{password}'},
"_widget_1731650067056":{"value": f"{group}"}}
"""
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"data": data['data'],
"is_start_workflow": data.get('is_start_workflow', "false"),
"is_start_trigger": data.get('is_start_trigger', "false"),
"transaction_id": data.get('transaction_id', "")
}
)
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
print("返回结果:", data_get)
if res.status_code == 200:
return data_get
else:
print("请求失败, 将重新请求")
retries += 1
time.sleep(3) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(3) # 在重试之间稍作停顿
if retries > max_retries:
print(f"超过最大重试次数({max_retries}),放弃此次请求")
def entry_data_list(data: dict, replace: bool = False, max_retries: int = 20) -> Dict: # 获取多条表单数据
"""
获取多条表单数据
:param max_retries: 最大重试次数
:param replace: 是否替换字段
:param data:
api_key: 应用id
entry_id: 表单id
:return:
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
headers = {
'Authorization': "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN", # 曹伟应用api测试 app_key
'Content-Type': 'application/json'
}
all_data_batches = [] # 用于存储每次请求返回的数据批次
last_data_id = None
exit_flag = False
while True:
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"limit": 100,
"data_id": last_data_id
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
# print("返回结果:", data_get)
if data_get["data"]:
all_data_batches.extend(data_get['data'])
last_data_id = data_get['data'][-1].get('_id')
print(f"已获取 {len(all_data_batches)} 条数据")
break # 成功则跳出循环
else:
if 'data' not in data_get or len(data_get['data']) == 0:
exit_flag = True
break
print("请求失败, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
if retries > max_retries:
print(f"超过最大重试次数({max_retries}),放弃此次请求")
all_data_batches.append(None) # 或者可以选择记录失败的payload以便后续处理
if exit_flag:
break
# 构建最终返回的字典
final_data = {
'data': all_data_batches # 'data' 键对应的值是列表的列表
}
return final_data
def run(playwright: Playwright) -> DataFrame:
browser = playwright.chromium.launch(headless=False)
context = browser.new_context(viewport={'width': 1700, 'height': 1080})
# Open new page
page = context.new_page()
page.add_init_script(js) # 隐藏 webdriver属性,不然拖动滑块会失败。
# Go to https://fws.carzone365.com/#/store/quitAudit
page.goto("https://fws.carzone365.com/#/store/quitAudit")
# Click [placeholder="请输入用户名"]
page.click("[placeholder=\"请输入用户名\"]")
# Fill [placeholder="请输入用户名"]
page.fill("[placeholder=\"请输入用户名\"]", "17710217084")
# Click [placeholder="请输入密码"]
page.click("[placeholder=\"请输入密码\"]")
# Fill [placeholder="请输入密码"]
page.fill("[placeholder=\"请输入密码\"]", "123456F6!")
""" 拖拽滑块验证 """
deltaX = 50000
steps = 100
element = page.wait_for_selector("text=请按住滑块,拖动到最右边")
boundingBox = element.bounding_box()
df = pd.DataFrame()
if boundingBox:
x = boundingBox.get('x') + boundingBox.get('width') / 2
y = boundingBox.get('y') + boundingBox.get('height') / 2
page.mouse.move(x, y)
page.mouse.down()
x1 = x + deltaX
page.mouse.move(x1, y, steps=steps)
page.mouse.up()
page.wait_for_timeout(1000)
page.click('xpath=//*[@id="app"]//button[contains(@class,"login-btn")]') # 登录
""" 开始自动化点击操作 """
page.click('xpath=//*[@id="app"]/section/section/aside/ul/li[2]/ul/li[2]/div/div') # 门店审批
# 将每一页显示的数量设置为100
page.click('xpath=//*[@id="app"]//input[@placeholder="请选择"]')
page.click('xpath=//span[text()="100条/页"]')
page.wait_for_timeout(2000)
page.click('xpath=//*[@id="app"]/section/section/main/div/div[3]/div[2]/div[2]/button[2]/span') # 查询
page.wait_for_timeout(1000)
# 查询出一共有多少条数据
input_string = page.text_content('xpath=//*[@id="app"]/section/section/main/div/div[4]/div[3]/div/span[1]')
# 使用正则表达式提取数字部分
numbers = re.findall(r'\d+', input_string)
# 将提取到的数字部分转换为整数列表
numbers = [int(num) for num in numbers][0]
print(f'numbers:{numbers}')
# 计算总页数
total_pages = (numbers + 100 - 1) // 100
# 计算最后一页条数
def calculate_last_page_data(total_numbers):
data_per_page = 100
last_page_data = total_numbers % data_per_page
return last_page_data if last_page_data != 0 else data_per_page
last_page_data = calculate_last_page_data(numbers)
print("最后一页显示的数据条数:", last_page_data)
# 如果需要翻页,可以在这里添加翻页的逻辑
# 创建一个空列表来存储每行的数据
data = []
last_page_data_len = 100
for page_new in range(1, total_pages + 1):
print(f"处理第 {page_new} 页的数据")
if page_new == total_pages: last_page_data_len = last_page_data
for i in range(1, last_page_data_len + 1):
# 逐条获取明细
string_1 = page.text_content(
'xpath=//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[' + str(
i) + ']/td[1]/div')
string_2 = page.text_content(
'xpath=//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[' + str(
i) + ']/td[2]/div')
string_3 = page.text_content(
'xpath=//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[' + str(
i) + ']/td[3]/div')
string_4 = page.text_content(
'xpath=//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[' + str(
i) + ']/td[4]/div')
string_5 = page.text_content(
'xpath=//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[' + str(
i) + ']/td[5]/div')
string_6 = page.text_content(
'xpath=//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[' + str(
i) + ']/td[6]/div')
string_7 = page.text_content(
'xpath=//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[' + str(
i) + ']/td[7]/div')
string_8 = page.text_content(
'xpath=//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[' + str(
i) + ']/td[8]/div')
if string_1 == "编辑门店":
continue
# 保存当前页面的上下文
context = page.context
# 点击按钮打开新页面(使用 Promise 等待弹出窗口)
with context.expect_page() as new_page_info:
page.click(
f'//*[@id="app"]/section/section/main/div/div[4]/div[2]/div[3]/table/tbody/tr[{i}]/td[9]/div/button')
new_page = new_page_info.value
print(f"跳转到新页面: {new_page.url}")
# 使用新页面对象获取内容
string9 = new_page.text_content(
'xpath=/html/body/section/section/section/main/div/div[2]/div[3]/div/div[3]/span[2]')
string10 = new_page.text_content(
'xpath=/html/body/section/section/section/main/div/div[2]/div[3]/div/div[4]/span[2]')
# 关闭新页面
new_page.close()
# 确保焦点回到原始页面
page.bring_to_front()
df_address = cpca.transform([string_4])
string11 = string12 = string13 = ""
for index, row in df_address.iterrows():
string11 = row['']
string12 = row['']
string13 = row['']
# 将数据添加到列表中
data.append(
[string_1, string_2, string_3, string_4, string_5, string_6, string_7, string_8, string9, string10,
string11, string12, string13])
print(data)
if page_new != total_pages:
try:
page.wait_for_timeout(1000)
except:
pass
# 创建DataFrame
df = pd.DataFrame(data,
columns=["类型", "门店名称", "门店id", "门店地址", "分类", "申请人", "状态", "申请时间",
"负责人", "联系电话", "", "", ""])
df.to_excel(os.path.join(output_dir, "天猫门店审批.xlsx"), index=False)
time.sleep(1)
page.wait_for_timeout(1000)
context.close()
browser.close()
return df
def load_cus_data():
# 获取接车宝客服表单
payload = {"api_key": "66f3a68c6e56814df2c6b1af",
"entry_id": "6809d4ef063ece5c83fc61ad",
}
customer_service = api_instance.entry_data_list(payload)
customer_service_list = customer_service.get("data") # api请求格式,将数据封装在data字典里
return customer_service_list
def row_to_dict(row, field_mapping):
"""将一行数据转换为指定格式的字典"""
result = {}
# print(field_mapping)
for col_name, widget_id in field_mapping.items():
# print(col_name, widget_id)
if col_name in row:
value = row[col_name]
clean_value = None if pd.isna(value) else value
result[widget_id] = {"value": clean_value}
return result
def today_customer_service_list1():
# 获取今日接车宝派发客服顺序
today_customer_service_list = []
all_customer_service_list = []
today_customer_service_start_list = []
customer_service_list = load_cus_data()
for row_items in customer_service_list:
# print(row_items)
customer_service_name_id = row_items.get("_widget_1740042824214", {}).get("username", {})
customer_service_name = row_items.get("_widget_1740042824214", {}).get("name", {})
customer_service_state = row_items.get("_widget_1740117343937", {})
is_last_day_end = row_items.get("_widget_1740042824216", {})
customer_service_data_id = row_items.get("_id", {})
print(customer_service_name, customer_service_name_id, customer_service_state, is_last_day_end)
all_customer_service_list.append(
[customer_service_name, customer_service_name_id, customer_service_state, is_last_day_end,
customer_service_data_id])
if is_last_day_end == "": # 判断是否是下次开始位置
last_day_end_customer_service = customer_service_name_id
is_customer_service_data_id = row_items.get("_id", {})
split_index = None
for index, row in enumerate(all_customer_service_list):
print(row[3])
if row[3] == "":
split_index = index
print(f"找到索引 {index}")
break
if split_index is not None:
# 根据索引切割列表
first_part = all_customer_service_list[split_index:] # 索引位置及之后的行
second_part = all_customer_service_list[:split_index] # 索引位置之前的行
# 调换两个子列表的位置并重新组合
today_customer_service_start_list = first_part + second_part
else:
# 如果没有找到“是”,保持原列表不变
today_customer_service_start_list = all_customer_service_list
pass
for index, row in enumerate(today_customer_service_start_list):
if row[2] == "":
today_customer_service_list.append(row[1])
return today_customer_service_list, is_customer_service_data_id, all_customer_service_list
def send_request(df):
today_customer_service_list, is_customer_service_data_id, all_customer_service_list = today_customer_service_list1()
# 初始化派发索引
next_dispatcher_index = 0
# 显式循环分配跟进人
follow_up_persons = []
for _ in range(len(df)):
follow_up_person = today_customer_service_list[next_dispatcher_index]
follow_up_persons.append(follow_up_person)
next_dispatcher_index = (next_dispatcher_index + 1) % len(today_customer_service_list)
# 添加跟进人到 DataFrame
df["BD-负责人"] = follow_up_persons
# 获取下一个派发人
next_dispatcher = today_customer_service_list[next_dispatcher_index]
field_mapping = fields()
new_sign_abnormal_data = [row_to_dict(row, field_mapping) for index, row in
df.iterrows()]
data = {'api_key': '66f3a68c6e56814df2c6b1af', 'entry_id': "6809a1cedfb68ab53de82d43",
"data_list": new_sign_abnormal_data} # 派发数据
api_instance.entry_data_batch_create(data)
data1 = {"api_key": "66f3a68c6e56814df2c6b1af",
"entry_id": "6809d4ef063ece5c83fc61ad",
"data_id": is_customer_service_data_id,
"data":
{"_widget_1740042824216": {"value": ""}, }
} # 原来的是"_widget_1740042824216": {"value": "是"},修改昨日截至人员
next_customer_service_data_id = None
for index, row in enumerate(all_customer_service_list):
print(row[3])
if row[1] == next_dispatcher:
next_customer_service_data_id = row[4]
break
data2 = {"api_key": "66f3a68c6e56814df2c6b1af",
"entry_id": "6809d4ef063ece5c83fc61ad",
"data_id": next_customer_service_data_id,
"data":
{"_widget_1740042824216": {"value": ""}, }} # 明日派发起点人员
api_instance.entry_data_update(data1)
api_instance.entry_data_update(data2)
def main():
task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with sync_playwright() as playwright:
df = run(playwright)
# 获取接车宝客服表单
payload = {"api_key": "66f3a68c6e56814df2c6b1af",
"entry_id": "6809a1cedfb68ab53de82d43",
}
BD_entry = api_instance.entry_data_list(payload)
BD_list = BD_entry.get("data")
store_id_list = []
for row_items in BD_list:
store_id = row_items.get("_widget_1744177321451", {})
store_id_list.append(store_id)
if df is not None:
for index, row in df.iterrows():
if row["门店id"] in store_id_list:
print("数据已存在,跳过发送请求。")
df = df.drop(index) # 删除该行
continue
send_request(df)
def fields():
field_mapping = {"": "_widget_1744177321450", "": "_widget_1744182647145",
"": "_widget_1744182647146", "门店名称": "_widget_1744177321449",
"门店id": "_widget_1744177321451", "负责人": "_widget_1744177321452",
"联系电话": "_widget_1744177321453", "BD-负责人": "_widget_1744182647149",
}
return field_mapping
if __name__ == "__main__":
main()