Files
F6--/张阳脚本/快修哥客户信息导出.py
2026-06-02 15:08:26 +08:00

194 lines
7.7 KiB
Python

import requests
from bs4 import BeautifulSoup
import pandas as pd
from tqdm import tqdm
import time
import os
url = "http://www.kuaixiuge.com/carinfo.aspx?clientWidth=1286"
output_path = r"C:\Users\hp_z66\Desktop\快修哥客户信息导出.xlsx"
temp_path = r"C:\Users\hp_z66\Desktop\快修哥客户信息导出_temp.xlsx"
html_file = r"C:\Users\hp_z66\Desktop\page1_ref.html" # 用户提供的参考HTML
session = requests.Session()
session.headers.update({
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "max-age=0",
"Connection": "keep-alive",
"Content-Type": "application/x-www-form-urlencoded",
"Cookie": "ASP.NET_SessionId=ciw1fuls0atkqaj4gkxhzngy; Hm_lvt_ab3baaa579f771d051a6b0baad5a8cfe=1777102209; HMACCOUNT=0838F6FCCBE848D9; iswatchme=0; setaddat=0; hksdms=username2=admin&truename2=%e8%91%9b&id=9864&wxusername2=&zb=false&qx=111-11111111111-11111111111111-0-1111-11111111111111-111111111-1111111111-111111111111-1111-0-0-0-0-0-0-0-0-0-0&login=1&actname=%e7%ae%a1%e7%90%86%e5%91%98&act=%e7%ae%a1%e7%90%86%e5%91%98&username=admin&truename=%e8%91%9b&userid=11955&valid=True&wxusername=&uniqueKey=8472d715-0f01-4b27-aa1c-90471f6cafa3&timeunitprice=0.00&allowquickout=True&telqx=1&tel=&StoreName=%e6%9c%ac%e4%bf%a1%e6%b1%bd%e8%bd%a6%e6%8a%a4%e7%90%86&attestationTel=13952699256&StoreName2=%e6%9c%ac%e4%bf%a1%e6%b1%bd%e8%bd%a6%e6%8a%a4%e7%90%86&vipid=SAAS9864&zonecode=1004&zone=%e6%b1%9f%e8%8b%8f&CustomerID=176575&IsInitialized=1&ScrmModuleValidTime=&isScrmModule=False&isBasicModule=True&isTechnologyModule=True&isPartsManageModule=True&isBusinessImprovementModule=False; SERVERID=000e421eb0ab0efb9790874bd5c8f758|1777105134|1777102207; Hm_lpvt_ab3baaa579f771d051a6b0baad5a8cfe=1777105137",
"Origin": "http://www.kuaixiuge.com",
"Referer": "http://www.kuaixiuge.com/carinfo.aspx?clientWidth=1286",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/147.0.0.0 Safari/537.36",
})
# 搜索表单隐藏字段
form_hidden = {
"HiddenCountyear": "", "HiddenCount120": "", "HiddenCount240": "",
"HiddenTtime": "", "HiddenOpenId": "", "HiddenBdtime": "",
"HiddenVipId": "", "HiddenLasttime": "", "HiddenHfstate": "",
"HiddenLastgch": "", "HiddenJyId": "", "HiddenHkscode": "",
"HiddenBrand": "", "HiddenBrandname": "", "HiddenChassisnumber": "",
"HiddenEngineDesc": "", "HiddenEngineStyle": "", "HiddenFamilyname": "",
"HiddenGearbox": "", "HiddenGearboxName": "", "HiddenLyid": "",
"HiddenProductyear": "", "HiddenVehiclename": "", "HiddenVehiclesale": "",
"HiddenVin": "", "HiddenYearpattern": "", "HiddenDrivetype": "",
"HiddenModelbrandlogourl": "", "HiddenModelbrandmfr": "", "HiddenModelid": "",
"HiddenFueltype": "", "HiddenKilowattpower": "", "HiddenListedyear": "",
"HiddenListedmonth": "", "HiddenStopyear": "", "HiddenBodynumdoors": "",
"HiddenTransmissiondescription": "", "HiddenMakename": "",
"HiddenModelbrandid": "", "HiddenMakeid": "", "HiddenIschoosevehicletype": "",
}
def post_page(vs, ev, eventtarget="", eventarg="", button=None):
"""发 POST 请求"""
data = {
"__EVENTTARGET": eventtarget,
"__EVENTARGUMENT": eventarg,
"__VIEWSTATE": vs,
"__VIEWSTATEGENERATOR": "B80C0CC7",
"__VIEWSTATEENCRYPTED": "",
"__EVENTVALIDATION": ev,
"TextTime1": "2015-01-01",
"TextTime2": "2026-04-25",
"TextCname": "",
"txtVin": "",
"Txtcartype": "",
"txtEngineno": "",
"AspNetPager1_input": "",
**form_hidden,
}
if button:
data[button] = "搜索"
for attempt in range(3):
try:
resp = session.post(url, data=data, timeout=30)
resp.raise_for_status()
return resp.text
except Exception as e:
if attempt == 2:
raise
print(f" 请求失败({e}),第{attempt+1}次重试...")
time.sleep(3)
def parse_html(html):
"""解析 HTML"""
soup = BeautifulSoup(html, "html.parser")
table = soup.find("table", class_="table-theme1")
if not table:
return None, None, None, None, soup
headers = [th.text.strip() for th in table.find_all("th")]
rows = []
for tr in table.find_all("tr"):
tds = tr.find_all("td")
if tds:
rows.append([td.text.strip() for td in tds])
vs_el = soup.find("input", id="__VIEWSTATE")
ev_el = soup.find("input", id="__EVENTVALIDATION")
vs = vs_el.get("value", "") if vs_el else ""
ev = ev_el.get("value", "") if ev_el else ""
return headers, rows, vs, ev, soup
def main():
print("=" * 50)
print("快修哥客户信息导出")
print("=" * 50)
# 从用户提供的 HTML 中提取初始状态
print("\n[1/3] 读取参考页面...")
with open(html_file, "r", encoding="utf-8") as f:
ref_html = f.read()
ref_soup = BeautifulSoup(ref_html, "html.parser")
init_vs = ref_soup.find("input", id="__VIEWSTATE").get("value", "")
init_ev = ref_soup.find("input", id="__EVENTVALIDATION").get("value", "")
print(f" VIEWSTATE 长度: {len(init_vs)}")
print(f" EVENTVALIDATION 长度: {len(init_ev)}")
# 先请求第1页(用当前 VIEWSTATE 跳到第1页)
print("\n[2/3] 请求第1页...")
try:
html = post_page(init_vs, init_ev, eventtarget="AspNetPager1", eventarg="1")
headers, rows, vs, ev, soup = parse_html(html)
if not headers:
print(" 未找到数据表格!")
# 可能是VIEWSTATE不匹配,尝试直接搜索
print(" 尝试直接搜索...")
html = post_page(init_vs, init_ev, button="Button3")
headers, rows, vs, ev, soup = parse_html(html)
if not headers:
print(" 仍然无法获取数据,Cookie 可能已过期")
return
print(f" 第1页成功: {len(headers)}列 x {len(rows)}")
print(f" 表头: {headers}")
# 检查总记录数
label = soup.find("span", id="Label1")
if label:
print(f" 总记录: {label.text.strip()}")
except Exception as e:
print(f" 请求失败: {e}")
return
all_data = list(rows)
# 检查总页数
total_pages = 134 # 从HTML中看到的尾页
print(f"\n[3/3] 开始翻页 (共{total_pages}页)...")
consecutive_empty = 0
for page_num in tqdm(range(2, total_pages + 1), desc="翻页进度"):
try:
html = post_page(vs, ev, eventtarget="AspNetPager1", eventarg=str(page_num))
_, page_rows, vs, ev, soup = parse_html(html)
if not page_rows:
consecutive_empty += 1
if consecutive_empty >= 3:
print(f"\n 连续3页无数据,停止于第{page_num}")
break
continue
else:
consecutive_empty = 0
all_data.extend(page_rows)
# 每50页保存临时文件
if page_num % 50 == 0:
df_temp = pd.DataFrame(all_data, columns=headers)
df_temp.to_excel(temp_path, index=False)
print(f"\n 临时保存: {len(all_data)}")
time.sleep(0.3)
except Exception as e:
print(f"\n{page_num}页失败: {e},停止")
break
# 清理临时文件
if os.path.exists(temp_path):
os.remove(temp_path)
# 输出最终结果
print(f"\n总共获取 {len(all_data)} 行数据")
df = pd.DataFrame(all_data, columns=headers)
df.to_excel(output_path, index=False)
print(f"已保存到: {output_path}")
if __name__ == "__main__":
main()