Files
F6--/张阳脚本/tianmao_auto_BD.py
T
2026-01-30 11:28:35 +08:00

845 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sys
from playwright.sync_api import Playwright, sync_playwright
import re
import pandas as pd
import json
import numpy as np
from decimal import Decimal
from datetime import datetime, UTC, timedelta, timezone
from typing import Optional
import requests
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
import time
# 保存为CSV文件
output_dir = "output" # 设置输出目录
# 创建输出目录(如果不存在)
import os
os.makedirs(output_dir, exist_ok=True)
js = """
Object.defineProperties(navigator, {webdriver:{get:()=>undefined}});
"""
def data_batch_create(data: dict, max_retries: int = 20) -> Optional[requests.Response]: # 新建单条数据
"""
新建单条表单数据
:param max_retries: 最大重试次数
:param data: 应该包含应用id、表单id,以及新建的数据data['data']
:return: 返回创建后简道云返回的信息
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/create'
headers = {
'Authorization': "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN", # 曹伟应用api测试 app_key
'Content-Type': 'application/json'
}
"""
data 样式 # 后续优化发送数据样式 目前输入字段,后续优化输入表单名称
jiandaoyun_data['data'] = {"_widget_1731650067055":{"value":f'{username}{password}'},
"_widget_1731650067056":{"value": f"{group}"}}
"""
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"data": data['data'],
"is_start_workflow": data.get('is_start_workflow', "false"),
"is_start_trigger": data.get('is_start_trigger', "false"),
"transaction_id": data.get('transaction_id', "")
}
)
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
print("返回结果:", data_get)
if res.status_code == 200:
return data_get
else:
print("请求失败, 将重新请求")
retries += 1
time.sleep(3) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(3) # 在重试之间稍作停顿
if retries > max_retries:
print(f"超过最大重试次数({max_retries}),放弃此次请求")
def send_task_status(task_start_time: str, task_name: str) -> None:
"""
将任务状态发送到简道云(开始时间为北京时间,需转换到 UTC)
:param task_start_time: 任务开始时间(字符串格式:"%Y-%m-%d %H:%M:%S",表示北京时间 UTC+8
:param task_name: 任务名称
"""
print(1)
try:
# 1. 获取当前 UTC 时间(时区感知对象)
end_time_utc = datetime.now(UTC) # ✅ 替代 utcnow()
# 2. 解析传入的北京时间(UTC+8)
task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S")
# 3. 转换为 UTC 时间(减去 8 小时,并附加 UTC 时区)
task_start_utc = task_start_naive - timedelta(hours=8)
task_start_utc = task_start_utc.replace(tzinfo=timezone.utc) # 显式标记为 UTC
# 4. 计算运行时间(时区感知对象可直接相减)
run_time = end_time_utc - task_start_utc
run_time_sec = int(run_time.total_seconds())
# 5. 格式化时间为 UTC 的 ISO 8601 格式(带 "Z"
today_utc = end_time_utc.strftime("%Y-%m-%d")
task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
print(task_end_iso)
print(task_start_iso)
# 6. 构造请求数据(所有时间以 UTC 格式发送)
payload = {
"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "67ede908eb9c22261016466e",
"data": {
"_widget_1744873387500": {"value": today_utc}, # UTC 日期
"_widget_1743644977694": {"value": task_name},
"_widget_1744873387501": {"value": task_start_iso}, # UTC 开始时间
"_widget_1744873387502": {"value": task_end_iso}, # UTC 结束时间
"_widget_1744873387504": {"value": run_time_sec},
}
}
# 7. 发送请求
response = data_batch_create(payload)
print(response)
except Exception as e:
print(f"Error: {e}")
def replace_decimals(obj):
if isinstance(obj, dict):
return {k: replace_decimals(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [replace_decimals(item) for item in obj]
elif isinstance(obj, Decimal):
return float(obj) # 或者 str(obj)
return obj
class NpEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(NpEncoder, self).default(obj)
class runTianMao():
def __init__(self):
self.task_start_time = None
def run(self, playwright: Playwright):
"""运行程序"""
self.task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
# 1.登录逻辑
browser = playwright.chromium.launch(headless=False)
# 修改这行代码(增加高度并启用全窗口模式)
context = browser.new_context(
viewport={'width': 1700, 'height': 900}, # 设置足够大的高度
no_viewport=True # 允许页面自由扩展高度
)
# Open new page
page = context.new_page()
page.add_init_script(js) # 隐藏 webdriver属性,不然拖动滑块会失败。
# Go to https://fws.carzone365.com/#/store/quitAudit
page.goto("https://fws-tmyc.tmallyc.com/#/store/quitAudit")
# Click [placeholder="请输入用户名"]
page.click("[placeholder=\"请输入用户名\"]",timeout=3000)
# Fill [placeholder="请输入用户名"]
page.fill("[placeholder=\"请输入用户名\"]", "18362916786",timeout=3000)
# Click [placeholder="请输入密码"]
page.click("[placeholder=\"请输入密码\"]",timeout=3000)
# Fill [placeholder="请输入密码"]
page.fill("[placeholder=\"请输入密码\"]", "F6@12456",timeout=3000)
""" 拖拽滑块验证 """
deltaX = 50000
steps = 100
element = page.wait_for_selector("text=请按住滑块,拖动到最右边")
boundingBox = element.bounding_box()
df = pd.DataFrame()
def select_service_item(page, primary_text="保养服务", secondary_text="更换机油"):
"""选择服务项目(两级级联菜单),低版本 Playwright 兼容写法"""
try:
# 方法1:使用Playwright的滚动API
page.evaluate("window.scrollTo(0, 0)")
# 或者方法2:点击页面顶部的元素(如果有)
# page.click('//*[@id="app"]/section/section/header', timeout=5000)
print("已回到页面顶部")
except Exception as e:
print(f"滚动到顶部时出错: {str(e)}")
pass
# 1. 点击级联选择框触发下拉菜单
page.click('//*[@id="app"]/section/section/main/div/div[3]/div[1]/div[3]/span/span', timeout=5000) # 服务管理
# 等待菜单出现
page.wait_for_selector(".el-cascader-menu", timeout=10000)
# 获取所有菜单容器
menus = page.locator(".el-cascader-menu")
# 等待一点时间确保加载完成
page.wait_for_timeout(1000)
# 点击一级菜单项(精确匹配)
try:
first_menu = menus.nth(0).locator(f'xpath=//li[normalize-space()="{primary_text}"]')
first_menu.click(timeout=5000)
except Exception as e:
raise Exception(f"未找到一级菜单项 '{primary_text}' 或不可点击") from e
# 等待二级菜单出现
page.wait_for_timeout(1000)
# 确认二级菜单已加载并可用
second_menus_container = menus.nth(1)
try:
second_menu = second_menus_container.locator(f'xpath=//li[normalize-space()="{secondary_text}"]')
second_menu.click(timeout=5000)
page.click('//*[@id="app"]/section/section/main/div/div[3]/div[2]/div[2]/button[2]')
except Exception as e:
raise Exception(f"未找到二级菜单项 '{secondary_text}' 或不可点击") from e
def check_20inch_items(page, value):
"""拒绝审批(支持分页)"""
page.click('//*[@id="app"]/section/section/main/div/div[3]/div[2]/div[2]/button[2]', timeout=5000) # 点击查询
# 等待表格加载完成
try:
page.wait_for_selector('table.el-table__body', timeout=20000)
time.sleep(2)
except:
print("拒绝审批类目下无数据")
return 0
# 查询出一共有多少条数据
input_string = page.text_content('xpath=//*[@id="app"]/section/section/main/div/div[4]/div[3]/div/span[1]',
timeout=5000)
# 使用正则表达式提取数字部分
numbers = re.findall(r'\d+', input_string)
# 将提取到的数字部分转换为整数列表
numbers = [int(num) for num in numbers][0]
print(f'numbers:{numbers}')
# 计算总页数
# total_pages = (numbers + 100 - 1) // 100
total_pages = (numbers + 100 - 1) // 100
checked_total = 0
# 进入最后一页
ul_element = page.locator('xpath=//*[@class="el-pager"]')
list_items = ul_element.locator('li').all()
if len(list_items) > 1:
list_items[-1].click(timeout=5000)
for current_page in range(1, total_pages + 1):
print(f"\n正在处理第 {current_page}/{total_pages} 页...")
# 等待当前页数据加载
page.wait_for_timeout(3000) # 增加等待时间确保数据加载
# 获取当前页所有行
rows = page.query_selector_all('table.el-table__body tbody tr.el-table__row')
print(f"当前页找到 {len(rows)} 行数据")
checked_page = 0
for i, row in enumerate(rows, start=1):
try:
# 获取服务项目文本 - 使用更可靠的选择器 # 加列名定位
service_cell = row.query_selector('td:nth-child(5) .cell') or \
row.query_selector('td.el-table_1_column_5 .cell')
if service_cell:
service_text = service_cell.inner_text().strip()
# 检查是否包含value
if value in service_text:
print(f"{i}行 找到包含'{value}'的服务项目: {service_text}")
# 勾选复选框 - 使用更可靠的选择器
checkbox = row.query_selector('td:first-child .el-checkbox__input') or \
row.query_selector('td.el-table_1_column_1 .el-checkbox__input')
if checkbox:
# 确保复选框可见并可点击
checkbox.scroll_into_view_if_needed()
checkbox.wait_for_element_state("stable", timeout=5000)
# 获取当前勾选状态
is_checked = checkbox.evaluate('el => el.getAttribute("aria-checked")')
if is_checked != "true":
checkbox.click(timeout=5000)
print(f"已勾选第{i}行的复选框")
checked_page += 1
else:
print(f"{i}行复选框已勾选")
else:
print(f"{i}行未找到复选框")
else:
print(f"{i}行未找到服务项目单元格")
except Exception as e:
print(f"处理第{i}行时出错: {str(e)}")
continue
checked_total += checked_page
print(f"本页勾选了 {checked_page} 个包含{value}的项目")
# time.sleep(1)
# 校验是否只选择了1个
if checked_page == 1:
# 找到被勾选的行
checked_row = None
for row in rows:
# 方法1:直接查找label元素(推荐)
checkbox_label = row.query_selector('td.el-table_1_column_1 label.el-checkbox') or \
row.query_selector('td:first-child label.el-checkbox')
if checkbox_label:
# 正确检查label的aria-checked属性
is_checked = checkbox_label.get_attribute('aria-checked') == "true"
if is_checked:
checked_row = row
break
print("被勾选的行:", checked_row)
if checked_row:
# 获取最后一列的按钮
last_td_button = checked_row.query_selector('td:last-child button') or \
checked_row.query_selector('td.el-table__cell:last-child button')
if last_td_button:
try:
time.sleep(1)
last_td_button.scroll_into_view_if_needed()
last_td_button.wait_for_element_state("stable", timeout=5000)
last_td_button.click()
print("已点击该行最后一个td中的按钮")
page.click('//*[@class="el-radio"]/span[1]/span', timeout=5000) # 点击不通过
text_area = page.locator(
'//*[@id="app"]/section/section/main/div/div[5]/div/div/div[2]/div/div/div[3]/div/textarea')
text_area.fill(value) # 拒绝原因
page.click('//*[@class="el-button el-button--primary"]/span', timeout=5000) # 点击确定
page.click(
'//*[@class="el-button el-button--default el-button--small el-button--primary "]/span',
timeout=5000) # 点击确定
try:
page.click(
'//*[@class="el-button el-button--default el-button--small el-button--primary "]/span',
timeout=5000) # 点击确定
except:
pass
except Exception as e:
print(f"点击按钮时出错: {str(e)}")
elif checked_page > 1:
# 批量审批不通过
page.click('//*[@class="el-button batch-button el-button--primary el-button--mini is-plain"]/span',
timeout=5000) # 点击批量
page.click('//*[@class="el-radio"]/span[1]/span', timeout=5000) # 点击不通过
text_area = page.locator(
'//*[@id="app"]/section/section/main/div/div[5]/div/div/div[2]/div/div/div[3]/div/textarea')
text_area.fill(value) # 拒绝原因为轮胎
page.click('//*[@class="el-button el-button--primary"]/span', timeout=5000) # 点击确定
# time.sleep(1)
page.click('//*[@class="el-button el-button--default el-button--small el-button--primary "]',
timeout=5000) # 点击确定
try:
page.click('//*[@class="el-button el-button--default el-button--small el-button--primary "]',
timeout=5000) # 点击确定
except:
pass
# /html/body/div[4]/div/div[3]/button[2]
else:
pass
time.sleep(30)
# 如果不是第一页,点击上一页
if current_page < total_pages:
print("准备翻页...")
try:
# 等待下一页按钮可点击
page.click('//*[@class="btn-prev"]')
print("已点击上一页按钮")
time.sleep(0.5)
except Exception as e:
print(f"翻页时出错: {str(e)}")
break
print(f"\n共勾选了 {checked_total} 个包含'{value}'的项目")
def allow_items(page, value):
"""批量审批通过"""
# 等待表格加载完成
try:
page.wait_for_selector('table.el-table__body', timeout=20000)
time.sleep(2)
except:
print(f"{value}审批类目下无数据")
return 0
# 查询出一共有多少条数据
input_string = page.text_content('xpath=//*[@id="app"]/section/section/main/div/div[4]/div[3]/div/span[1]',timeout=20000)
# 使用正则表达式提取数字部分
numbers = re.findall(r'\d+', input_string)
# 将提取到的数字部分转换为整数列表
numbers = [int(num) for num in numbers][0]
print(f'numbers:{numbers}')
# 计算总页数
# total_pages = (numbers + 100 - 1) // 100
total_pages = (numbers + 100 - 1) // 100
is_one = numbers % 100
print(f'is_one:{is_one}')
# 进入最后一页
ul_element = page.locator('xpath=//*[@class="el-pager"]')
list_items = ul_element.locator('li').all()
if len(list_items) > 1:
list_items[-1].click(timeout=5000)
checked_total = 0
for current_page in range(1, total_pages + 1):
print(f"\n正在处理第 {current_page}/{total_pages} 页...")
# 等待当前页数据加载
page.wait_for_timeout(3000) # 增加等待时间确保数据加载
# 获取当前页所有行
rows = page.query_selector_all('table.el-table__body tbody tr.el-table__row')
print(f"当前页找到 {len(rows)} 行数据")
check = 0
for i, row in enumerate(rows, start=1):
checkbox = row.query_selector('td:first-child .el-checkbox__input') or \
row.query_selector('td.el-table_1_column_1 .el-checkbox__input')
if checkbox:
# 确保复选框可见并可点击
checkbox.scroll_into_view_if_needed()
checkbox.wait_for_element_state("stable", timeout=5000)
# 获取当前勾选状态
is_checked = checkbox.evaluate('el => el.getAttribute("aria-checked")')
if is_checked != "true":
checkbox.click(timeout=5000)
print(f"已勾选第{i}行的复选框")
check += 1
if is_one == 1 and check == 1:
checked_row = None
for row in rows:
# 方法1:直接查找label元素(推荐)
checkbox_label = row.query_selector('td.el-table_1_column_1 label.el-checkbox') or \
row.query_selector('td:first-child label.el-checkbox')
if checkbox_label:
# 正确检查label的aria-checked属性
is_checked = checkbox_label.get_attribute('aria-checked') == "true"
if is_checked:
checked_row = row
break
print("被勾选的行:", checked_row)
if checked_row:
# 获取最后一列的按钮
last_td_button = checked_row.query_selector('td:last-child button') or \
checked_row.query_selector('td.el-table__cell:last-child button')
if last_td_button:
try:
last_td_button.scroll_into_view_if_needed()
last_td_button.wait_for_element_state("stable", timeout=5000)
last_td_button.click()
page.click('//*[@class="el-radio is-checked"]/span[1]/span', timeout=5000) # 点击通过
page.click('//*[@class="el-button el-button--primary"]/span', timeout=5000) # 点击确定
page.click(
'//*[@class="el-button el-button--default el-button--small el-button--primary "]/span',
timeout=5000) # 点击确定
try:
page.click(
'//*[@class="el-button el-button--default el-button--small el-button--primary "]/span',
timeout=5000) # 点击确定
except:
pass
except:
print("审批通过失败")
else:
# 批量审批通过
# page.click('//*[@id="has-gutter"]/tr/th[1]/div/label/span/span',timeout=5000)
page.click('//*[@class="el-button batch-button el-button--primary el-button--mini is-plain"]/span',
timeout=5000) # 点击批量
page.click('//*[@class="el-radio is-checked"]/span[1]/span', timeout=5000) # 点击通过
page.click('//*[@class="el-button el-button--primary"]/span', timeout=5000) # 点击确定
page.click('//*[@class="el-button el-button--default el-button--small el-button--primary "]/span',
timeout=5000) # 点击确定
try:
page.click(
'//*[@class="el-button el-button--default el-button--small el-button--primary "]/span',
timeout=1000) # 点击确定
except:
pass
time.sleep(10)
# 如果不是第一页,点击上一页
if current_page < total_pages:
print("准备翻页...")
try:
# 等待下一页按钮可点击
page.click('//*[@class="btn-prev"]')
print("已点击上一页按钮")
time.sleep(0.5)
except Exception as e:
print(f"翻页时出错: {str(e)}")
break
print(f"{value}批量通过了{numbers}条数据")
if boundingBox:
x = boundingBox.get('x') + boundingBox.get('width') / 2
y = boundingBox.get('y') + boundingBox.get('height') / 2
page.mouse.move(x, y)
page.mouse.down()
x1 = x + deltaX
page.mouse.move(x1, y, steps=steps)
page.mouse.up()
page.wait_for_timeout(1000)
page.click('xpath=//*[@id="app"]//button[contains(@class,"login-btn")]',timeout=5000) # 登录
""" 开始自动化点击操作 """
page.click('//*[@id="app"]/section/section/aside/ul/li[2]/ul/li[5]/div/div',timeout=5000) # 服务管理
page.click('//*[@id="app"]/section/section/main/div/div[3]/div[1]/div[4]/div/div',timeout=5000) # 点击状态
page.click('xpath=/html/body/div/div[1]/div[1]/ul/li[1]/span',timeout=5000) # 审批中
# page.fill("[placeholder=\"请输入门店ID\"]", "317069990") # 测试用,输入测试门店
page.click('//*[@id="app"]/section/section/main/div/div[3]/div[2]/div[2]/button[2]',timeout=5000) # 点击查询
page.click('xpath=//*[@id="app"]//input[@placeholder="请选择"]',timeout=5000) # 选择每页显示的行数
page.click('xpath=//span[text()="100条/页"]')
def reset_and_filter_page(page):
"""重置页面并重新应用筛选条件"""
try:
# 检查页面是否已关闭
if page.is_closed():
print("页面已关闭,正在重新初始化...")
# 这里需要重新初始化页面,比如登录等操作
browser = playwright.chromium.launch(headless=False)
# 修改这行代码(增加高度并启用全窗口模式)
context = browser.new_context(
viewport={'width': 1700, 'height': 900}, # 设置足够大的高度
no_viewport=True # 允许页面自由扩展高度
)
page = context.new_page()
page.add_init_script(js) # 隐藏 webdriver属性,不然拖动滑块会失败。
# Go to https://fws.carzone365.com/#/store/quitAudit
page.goto("https://fws-tmyc.tmallyc.com/#/store/quitAudit")
# Click [placeholder="请输入用户名"]
page.click("[placeholder=\"请输入用户名\"]")
# Fill [placeholder="请输入用户名"]
page.fill("[placeholder=\"请输入用户名\"]", "18362916786")
# Click [placeholder="请输入密码"]
page.click("[placeholder=\"请输入密码\"]")
# Fill [placeholder="请输入密码"]
page.fill("[placeholder=\"请输入密码\"]", "F6@12456")
""" 拖拽滑块验证 """
deltaX = 50000
steps = 100
element = page.wait_for_selector("text=请按住滑块,拖动到最右边")
boundingBox = element.bounding_box()
if boundingBox:
x = boundingBox.get('x') + boundingBox.get('width') / 2
y = boundingBox.get('y') + boundingBox.get('height') / 2
page.mouse.move(x, y)
page.mouse.down()
x1 = x + deltaX
page.mouse.move(x1, y, steps=steps)
page.mouse.up()
page.wait_for_timeout(1000)
page.click('xpath=//*[@id="app"]//button[contains(@class,"login-btn")]') # 登录
""" 开始自动化点击操作 """
page.click('//*[@id="app"]/section/section/aside/ul/li[2]/ul/li[5]/div/div',
timeout=5000) # 服务管理
page.click('//*[@id="app"]/section/section/main/div/div[3]/div[1]/div[4]/div/div',
timeout=5000) # 点击状态
page.click('xpath=/html/body/div/div[1]/div[1]/ul/li[1]/span', timeout=5000) # 审批中
# page.fill("[placeholder=\"请输入门店ID\"]", "317069990", timeout=5000) # 测试用,输入测试门店
page.click('//*[@id="app"]/section/section/main/div/div[3]/div[2]/div[2]/button[2]',
timeout=5000) # 点击查询
page.click('xpath=//*[@id="app"]//input[@placeholder="请选择"]', timeout=5000) # 选择每页显示的行数
page.click('xpath=//span[text()="100条/页"]', timeout=5000)
print("正在重置页面状态...")
page.reload()
# 重新设置筛选条件
""" 开始自动化点击操作 """
page.click('//*[@id="app"]/section/section/aside/ul/li[2]/ul/li[5]/div/div', timeout=5000) # 服务管理
page.click('//*[@id="app"]/section/section/main/div/div[3]/div[1]/div[4]/div/div',
timeout=5000) # 点击状态
page.click('xpath=/html/body/div/div[1]/div[1]/ul/li[1]/span', timeout=5000) # 审批中
# page.fill("[placeholder=\"请输入门店ID\"]", "317069990", timeout=5000) # 测试用,输入测试门店
page.click('//*[@id="app"]/section/section/main/div/div[3]/div[2]/div[2]/button[2]',
timeout=5000) # 点击查询
page.click('xpath=//*[@id="app"]//input[@placeholder="请选择"]', timeout=5000) # 选择每页显示的行数
page.click('xpath=//span[text()="100条/页"]', timeout=5000)
print("页面重置完成")
return page
except Exception as e:
print(f"重置页面时出错: {str(e)}")
# 如果出错,尝试完全重新初始化
sys.exit(1)
def execute_with_retry(page, operation, *args, operation_name="操作", max_retries=3, retry_delay=5):
"""失败重试机制"""
retry_count = 0
while retry_count < max_retries:
try:
# 如果是筛选操作,先重置页面
if operation.__name__ == "select_service_item":
reset_and_filter_page(page)
operation(page, *args)
return True
except PlaywrightTimeoutError as e:
retry_count += 1
if retry_count < max_retries:
print(f"{operation_name}{retry_count} 次尝试失败(超时),{retry_delay}秒后重试...")
time.sleep(retry_delay)
reset_and_filter_page(page)
operation(page, *args)
print("已重置页面,准备重试...")
else:
print(f"{operation_name} 连续{max_retries}次尝试失败(超时),跳过此项。错误: {str(e)}")
return False
except Exception as e:
retry_count += 1
if retry_count < max_retries:
print(
f"{operation_name}{retry_count} 次尝试失败,{retry_delay}秒后重试... 错误: {str(e)}")
time.sleep(retry_delay)
reset_and_filter_page(page)
operation(page, *args)
else:
print(f"{operation_name} 连续{max_retries}次尝试失败,程序终止。错误: {str(e)}")
sys.exit(1) # 终止程序,返回错误码1
return False
def process_tire_rejections(page):
"""处理驳回操作"""
# 轮胎---20寸以上轮胎更换项目,全部驳回
if execute_with_retry(page, select_service_item, "轮胎轮毂", "轮胎更换",
operation_name="选择轮胎轮毂服务"):
execute_with_retry(page, check_20inch_items, "20寸", operation_name="检查20寸轮胎")
# 轮胎---货车轮胎安装 默认驳回
if execute_with_retry(page, select_service_item, "轮胎轮毂", "轮胎更换",
operation_name="选择轮胎轮毂服务"):
execute_with_retry(page, check_20inch_items, "货车轮胎", operation_name="检查货车轮胎")
# 货车行车记录仪安装,全部驳回
if execute_with_retry(page, select_service_item, "装潢服务", "安装行车记录仪",
operation_name="选择行车记录仪服务"):
execute_with_retry(page, check_20inch_items, '货车', operation_name="检查货车记录仪")
# # 宝马记录仪全部驳回
# if execute_with_retry(page, select_service_item, "装潢服务", "安装行车记录仪",
# operation_name="选择行车记录仪服务"):
# execute_with_retry(page, check_20inch_items, '宝马', operation_name="检查宝马记录仪")
for i in range(1, 4): # 循环处理3次
process_tire_rejections(page)
print("批量审批通过开始...")
df = pd.DataFrame([[self.task_start_time, "批量审批通过开始", "开始"]])
df.to_csv("天猫安装服务审批批量审批.csv")
# 批量审批通过
allow_list = [
# 有驳回操作的优先处理
("轮胎轮毂", "轮胎更换"),
# ("装潢服务", "安装行车记录仪"),
# 洗车服务
("洗车服务", "精致洗车"), ("洗车服务", "标准洗车"),
# 保养服务(来自4张图片的全部内容)
("保养服务", "更换机油"), ("保养服务", "添加机油"), ("保养服务", "更换机油滤清器"),
("保养服务", "更换空气滤清器"), ("保养服务", "更换空调滤清器"), ("保养服务", "更换防冻液"),
("保养服务", "添加防冻液"), ("保养服务", "更换刹车油"), ("保养服务", "更换变速箱油"),
("保养服务", "节气门清洗"), ("保养服务", "燃油系统养护"), ("保养服务", "刹车系统养护"),
("保养服务", "水箱清洗"), ("保养服务", "进气系统清洗"),
("保养服务", "三元催化清洗"), ("保养服务", "喷油嘴清洗"), ("保养服务", "润滑系统养护"),
("保养服务", "燃烧室除积碳"), ("保养服务", "空调管路清洗"), ("保养服务", "更换雨刮片"),
("保养服务", "更换火花塞"), ("保养服务", "更换燃油滤清器"), ("保养服务", "更换蓄电池"),
("保养服务", "空调清洗"), ("保养服务", "更换转向助力油"), ("保养服务", "检测"),
("保养服务", "变速箱系统养护"), ("保养服务", "水箱防锈保护"),
# 维修服务(来自4张图片的全部内容)
("维修服务", "更换刹车片"), ("维修服务", "更换刹车盘"), ("维修服务", "更换减震器"),
("维修服务", "更换灯泡"), ("维修服务", "更换空调制冷剂"), ("维修服务", "更换大灯总成"),
("维修服务", "更换水泵"), ("维修服务", "更换水箱"), ("维修服务", "更换气门室盖垫"),
("维修服务", "更换皮带"), ("维修服务", "更换平衡杆球头"), ("维修服务", "更换倒车镜"),
("维修服务", "更换行李箱盖撑杆"), ("维修服务", "更换车门玻璃升降器"), ("维修服务", "更换尾灯"),
("维修服务", "更换点火线圈"), ("维修服务", "更换雾灯总成"), ("维修服务", "更换倒车镜片"),
("维修服务", "更换天窗开关"), ("维修服务", "更换摆臂"), ("维修服务", "更换摆臂球头"),
("维修服务", "更换变速箱油底壳"), ("维修服务", "更换车门锁"), ("维修服务", "更换车身下护板"),
("维修服务", "更换大灯控制模块"), ("维修服务", "更换发动机支架胶"), ("维修服务", "更换方向盘"),
("维修服务", "更换高位刹车灯"), ("维修服务", "更换减震器缓冲块"), ("维修服务", "更换节气门"),
("维修服务", "更换牌照"), ("维修服务", "更换喷油嘴"), ("维修服务", "更换日间行车灯"),
("维修服务", "更换仪表台"), ("维修服务", "更换雨刮电机"), ("维修服务", "更换雨刮喷水管"),
("维修服务", "更换遮阳帘"), ("维修服务", "更换制动分泵"), ("维修服务", "调整"),
("维修服务", "检修"), ("维修服务", "维修拆装"), ("维修服务", "更换显示器支架"),
("维修服务", "更换平衡杆连杆"), ("维修服务", "更换附件皮带套装"), ("维修服务", "更换蓄电池线束"),
("维修服务", "更换蓄电池模块"), ("维修服务", "更换天窗遮阳"),
# 轮胎轮毂
("轮胎轮毂", "补胎"), ("轮胎轮毂", "动平衡"),
("轮胎轮毂", "四轮定位"), ("轮胎轮毂", "轮毂更换"), ("轮胎轮毂", "轮胎换位"),
# 美容服务
("美容服务", "轮毂清洗"),
("美容服务", "内饰清洁养护"), ("美容服务", "臭氧消毒"), ("美容服务", "发动机舱养护"),
("美容服务", "车窗玻璃养护"), ("美容服务", "车漆护理"), ("美容服务", "大灯灯壳修复"),
# 装潢服务
# ("装潢服务", "车窗贴膜"), ("装潢服务", "车身贴膜"), # 不审批贴膜业务
("装潢服务", "安装车顶行李架"),
("装潢服务", "安装底盘装甲"), ("装潢服务", "更换氙气灯"),
("装潢服务", "胎压监测安装"), ("装潢服务", "DVD导航安装"), ("装潢服务", "安装后视镜"),
("装潢服务", "安装座垫套"), ("装潢服务", "安装挡泥板"), ("装潢服务", "倒车雷达安装"),
("装潢服务", "安装儿童安全座椅"), ("装潢服务", "汽车包围安装"), ("装潢服务", "防盗器安装"),
("装潢服务", "车底防护板安装"), ("装潢服务", "安装防盗报警器"), ("装潢服务", "安装自动升窗器"),
("装潢服务", "安装车载电视"), ("装潢服务", "安装音响"), ("装潢服务", "安装脚垫"),
("装潢服务", "安装地胶"), ("装潢服务", "安装音响喇叭"), ("装潢服务", "安装防静电天线"),
("装潢服务", "安装脚踏板"), ("装潢服务", "保险杠更换"), ("装潢服务", "电动尾门安装"),
("装潢服务", "安装方向盘套"), ("装潢服务", "安装ETC"), ("装潢服务", "安装定位器"),
("装潢服务", "安装尾翼"), ("装潢服务", "安装雨挡"), ("装潢服务", "安装轮眉"),
("装潢服务", "安装弹簧助力器"), ("装潢服务", "隔音服务"), ("装潢服务", "安装灯具"),
("装潢服务", "安装中网"), ("装潢服务", "排气管改装"), ("装潢服务", "安装饰条"),
("装潢服务", "遥控器改装"), ("装潢服务", "安装一键启动"), ("装潢服务", "安装刹车卡钳装饰罩"),
("装潢服务", "安装后备箱垫"), ("装潢服务", "安装窗帘"), ("装潢服务", "包真皮"),
("装潢服务", "安装尾喉"), ("装潢服务", "安装密封条"), ("装潢服务", "安装抬头显示/HUD"),
("装潢服务", "安装并线辅助"), ("装潢服务", "安装扶手箱"), ("装潢服务", "安装方向盘"),
("装潢服务", "安装换挡杆"), ("装潢服务", "安装安定器"), ("装潢服务", "安装透镜"),
("装潢服务", "安装水箱保护网"), ("装潢服务", "安装灯饰框"), ("装潢服务", "安装电吸门"),
("装潢服务", "改装"),
# 钣金喷漆
("钣金喷漆", "保险杠喷漆"), ("钣金喷漆", "车门喷漆"), ("钣金喷漆", "叶子板喷漆"),
("钣金喷漆", "发动机盖喷漆"), ("钣金喷漆", "车顶喷漆"), ("钣金喷漆", "行李箱盖喷漆"),
("钣金喷漆", "倒车镜喷漆"), ("钣金喷漆", "立柱喷漆"), ("钣金喷漆", "裙边喷漆"),
("钣金喷漆", "更换保险杠"), ("钣金喷漆", "更换车顶行李架"),
("其他服务", "救援服务"), ("其他服务", "其他业务"),
("改装服务", "内饰改装"), ("改装服务", "外观改装")
]
for i in range(1, 4): # 循环两次
for item in allow_list:
print(f"正在审批 {item}")
retry_count = 0
success = False
while retry_count < 3 and not success:
try:
if retry_count > 0:
reset_and_filter_page(page)
# 尝试执行操作
print(f"正在选择 {item[0]} - {item[1]}")
if i > 1 and item[0] == "轮胎轮毂" and (
item[1] == "轮胎更换" or item[1] == "安装行车记录仪"):
break
select_service_item(page, item[0], item[1])
allow_items(page, item)
success = True # 如果执行到这里没有异常,标记为成功
except Exception as e:
retry_count += 1
if retry_count < 3:
print(f"{retry_count} 次尝试失败,正在重试... 错误: {str(e)}")
else:
print(f"{retry_count} 次尝试失败,跳过此项。错误: {str(e)}")
break # 退出while循环,继续下一个item
if success:
print(f"成功审批 {item}")
else:
print(f"跳过 {item},因连续3次尝试失败")
page.wait_for_timeout(2000)
send_task_status(self.task_start_time, "天猫门店安装服务审批")
df = pd.DataFrame([[self.task_start_time, "天猫门店安装服务审批", "完成"]])
df.to_csv("天猫安装服务审批.csv")
except Exception as e:
print(f"出现错误: {str(e)}")
send_task_status(self.task_start_time, "天猫门店安装服务审批失败")
df = pd.DataFrame([[self.task_start_time, "天猫门店安装服务审批", "失败"]])
df.to_csv("天猫安装服务审批.csv")
def main(self):
self.task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with sync_playwright() as playwright:
self.run(playwright)
if __name__ == '__main__':
runTianMao().main()