Files
F6--/张阳脚本/竞品系统数据导出/kuaixiuge_export.py
T
2026-06-02 15:08:26 +08:00

582 lines
21 KiB
Python

"""
快修哥系统数据导出脚本
登录入口: http://139.129.162.9/
字段: ucode=公司代码, uname=用户名, upwd=密码
- 客户信息(2015年起) -> 桌面/快修哥_客户信息.xlsx
- 库存统计 -> 桌面/快修哥_库存统计.xlsx
"""
import requests
import time
import os
import sys
import re
import pandas as pd
from bs4 import BeautifulSoup
from tqdm import tqdm
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 强制无缓冲输出
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(line_buffering=True)
DESKTOP = os.path.join(os.path.expanduser("~"), "Desktop")
# ── 登录配置 ──────────────────────────────────────────────
BASE = "http://139.129.162.9"
HOST_DOMAIN = "139.129.162.9"
COMPANY_CODE = "25375"
USERNAME = "admin"
PASSWORD = "123456"
TEST_MODE = True # 测试模式:只抓5条数据
TEST_LIMIT = 50
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9",
"Connection": "keep-alive",
})
def log(msg=""):
print(msg, flush=True)
def get_val(soup, name):
el = soup.find("input", {"id": name}) or soup.find("input", {"name": name})
return el.get("value", "") if el else ""
def login():
"""登录快修哥"""
log("正在登录快修哥系统...")
login_url = f"{BASE}/login.aspx"
r = session.get(login_url, timeout=20, verify=False)
if r.status_code != 200:
log(f"获取登录页失败: HTTP {r.status_code}")
return False
soup = BeautifulSoup(r.text, "html.parser")
payload = {
"ucode": COMPANY_CODE,
"uname": USERNAME,
"upwd": PASSWORD,
"windowSize": "1614",
"DeviceVersion": "",
"ipAdress": "",
"Location": "",
}
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Referer": login_url,
"Origin": BASE,
}
r2 = session.post(login_url, data=payload, headers=headers,
timeout=20, verify=False, allow_redirects=True)
log(f"登录后URL: {r2.url}")
cookies = session.cookies.get_dict()
log(f"Cookie keys: {list(cookies.keys())}")
if "hksdms" in cookies or "ASP.NET_SessionId" in cookies:
log("登录成功!")
return True
if r2.url != login_url and "login" not in r2.url.lower():
log("登录成功(URL跳转)")
return True
log("登录状态不确定,继续尝试...")
return True
def fetch_post(url, data, extra_headers=None):
"""带重试的POST"""
h = {"Content-Type": "application/x-www-form-urlencoded"}
if extra_headers:
h.update(extra_headers)
for attempt in range(5):
try:
r = session.post(url, data=data, headers=h, timeout=30, verify=False)
r.raise_for_status()
return r.text
except Exception as e:
if attempt == 4:
raise
log(f" 请求失败({e}),第{attempt+1}次重试...")
time.sleep(2)
def extract_table(soup, table_class="table-theme1"):
"""提取表格数据为 (headers, rows)"""
table = soup.find("table", class_=table_class) or soup.find("table")
if not table:
return None, None
header_list = []
for th in table.find_all("th"):
header_list.append(th.text.strip())
if not header_list:
first_row = table.find("tr")
if first_row:
header_list = [td.text.strip() for td in first_row.find_all("td")]
rows = []
for row in table.find_all("tr"):
cells = row.find_all("td")
if cells:
rows.append([c.text.strip() for c in cells])
return header_list, rows
def parse_total_pages(soup, first_page_rows):
"""从页面解析总页数"""
total_pages = 1
all_text = soup.get_text()
m = re.search(r'\s*(\d+)\s*条', all_text)
if m:
total_records = int(m.group(1))
rows_per_page = max(first_page_rows, 10)
total_pages = (total_records + rows_per_page - 1) // rows_per_page
log(f"{total_records} 条记录,每页 {rows_per_page} 条,预计 {total_pages}")
pager = soup.find(id="AspNetPager1")
if pager:
nums = []
for a in pager.find_all("a"):
try:
nums.append(int(a.text.strip()))
except:
pass
if nums:
total_pages = max(total_pages, max(nums))
return total_pages
def get_hidden_fields(soup):
"""获取当前页面的 VIEWSTATE 等隐藏字段"""
return {
"vs": get_val(soup, "__VIEWSTATE"),
"vs_gen": get_val(soup, "__VIEWSTATEGENERATOR"),
"ev": get_val(soup, "__EVENTVALIDATION"),
}
# ══════════════════════════════════════════════════════════════
# 1. 客户信息 (从2015年起)
# ══════════════════════════════════════════════════════════════
CUSTOMER_HIDDEN_FIELDS = [
"HiddenCountyear", "HiddenCount120", "HiddenCount240", "HiddenTtime",
"HiddenOpenId", "HiddenBdtime", "HiddenVipId", "HiddenLasttime",
"HiddenHfstate", "HiddenLastgch", "HiddenJyId", "HiddenHkscode",
"HiddenBrand", "HiddenBrandname", "HiddenChassisnumber", "HiddenEngineDesc",
"HiddenEngineStyle", "HiddenFamilyname", "HiddenGearbox", "HiddenGearboxName",
"HiddenLyid", "HiddenProductyear", "HiddenVehiclename", "HiddenVehiclesale",
"HiddenVin", "HiddenYearpattern", "HiddenDrivetype", "HiddenModelbrandlogourl",
"HiddenModelbrandmfr", "HiddenModelid", "HiddenFueltype", "HiddenKilowattpower",
"HiddenListedyear", "HiddenListedmonth", "HiddenStopyear", "HiddenBodynumdoors",
"HiddenTransmissiondescription", "HiddenMakename", "HiddenModelbrandid",
"HiddenMakeid", "HiddenIschoosevehicletype",
]
def make_customer_data(page_no, viewstate, event_val, vs_generator, is_first=False):
from datetime import datetime
d = {
"__EVENTTARGET": "" if is_first else "AspNetPager1",
"__EVENTARGUMENT": "" if is_first else str(page_no),
"__VIEWSTATE": viewstate,
"__VIEWSTATEGENERATOR": vs_generator,
"__VIEWSTATEENCRYPTED": "",
"__EVENTVALIDATION": event_val,
"TextTime1": "2015-01-01",
"TextTime2": datetime.now().strftime("%Y-%m-%d"),
"TextCname": "",
"txtVin": "",
"Txtcartype": "",
"txtEngineno": "",
}
if is_first:
d["Button3"] = "搜索"
# AspNetPager1_input 不需要发送(浏览器 form.submit() 不会带它)
for f in CUSTOMER_HIDDEN_FIELDS:
d[f] = ""
return d
def reset_customer_session(page_url, extra_h):
"""重新 GET 页面 + 搜索,返回 (soup, ok)"""
r0 = session.get(page_url, timeout=20, verify=False)
if "登录" in r0.text and "ucode" in r0.text:
log(" Session过期,重新登录...")
login()
r0 = session.get(page_url, timeout=20, verify=False)
soup = BeautifulSoup(r0.text, "html.parser")
hf = get_hidden_fields(soup)
if not hf["vs"]:
log(" 重置后仍无VIEWSTATE")
return soup, False
html = fetch_post(page_url, make_customer_data(
1, hf["vs"], hf["ev"], hf["vs_gen"], is_first=True
), extra_h)
soup = BeautifulSoup(html, "html.parser")
return soup, True
def export_customer_info():
log("\n========== 导出客户信息(2015年起) ==========")
page_url = f"{BASE}/carinfo.aspx?clientWidth=1614"
ref = f"{BASE}/carinfo.aspx"
extra_h = {"Referer": ref, "Origin": BASE, "Host": HOST_DOMAIN}
data_list = []
header_list = []
seen_ids = set() # 已见的第一列ID,用于去重
# 初始化:GET + 搜索第一页
soup, ok = reset_customer_session(page_url, extra_h)
if not ok:
log("初始化失败")
return None
header_list, rows = extract_table(soup)
if not rows:
log("未找到客户信息表格")
return None
data_list.extend(rows)
for r in rows:
if r and r[0]:
seen_ids.add(r[0])
# 测试模式:限制总条数
effective_limit = TEST_LIMIT if TEST_MODE else float('inf')
if len(data_list) >= effective_limit:
data_list = data_list[:effective_limit]
total_pages = parse_total_pages(soup, len(rows))
log(f"客户信息共 {total_pages} 页,开始爬取...")
if TEST_MODE:
log(f"[测试模式] 目标 {TEST_LIMIT} 条,当前已 {len(data_list)}")
if len(data_list) < effective_limit:
page_idx = 2 # 当前要爬的页码
max_attempts = 5 # 单页最大重试次数
max_resets = 3 # 最大重新初始化次数
reset_count = 0
with tqdm(total=total_pages - 1, desc="客户信息") as pbar:
while page_idx <= total_pages:
# 测试模式:已达目标条数则退出
if len(data_list) >= effective_limit:
if TEST_MODE:
log(f"[测试模式] 已达 {effective_limit} 条,停止翻页")
break
hf = get_hidden_fields(soup)
if not hf["vs"]:
log(f"{page_idx}页VIEWSTATE丢失,重新初始化...")
soup, ok = reset_customer_session(page_url, extra_h)
if not ok:
log(f" 初始化失败,跳过剩余页")
break
# 重置后回到第1页,需要重新翻到目标页
log(f" 重新翻页到第{page_idx}页...")
# 重新搜索
hf = get_hidden_fields(soup)
html = fetch_post(page_url, make_customer_data(
1, hf["vs"], hf["ev"], hf["vs_gen"], is_first=True
), extra_h)
soup = BeautifulSoup(html, "html.parser")
for skip_pg in range(2, page_idx):
hf = get_hidden_fields(soup)
if not hf["vs"]:
log(f" 翻页到{skip_pg}时VIEWSTATE丢失,放弃")
break
html = fetch_post(page_url, make_customer_data(
skip_pg, hf["vs"], hf["ev"], hf["vs_gen"]
), extra_h)
soup = BeautifulSoup(html, "html.parser")
continue
attempt = 0
success = False
while attempt < max_attempts:
attempt += 1
html = fetch_post(page_url, make_customer_data(
page_idx, hf["vs"], hf["ev"], hf["vs_gen"]
), extra_h)
soup = BeautifulSoup(html, "html.parser")
_, page_rows = extract_table(soup)
if not page_rows:
log(f"{page_idx}页无数据(尝试{attempt}/{max_attempts})")
hf = get_hidden_fields(soup)
if not hf["vs"]:
break
continue
# 检查是否重复
first_id = page_rows[0][0] if page_rows else ""
if first_id and first_id in seen_ids:
log(f"{page_idx}页首条ID={first_id}已存在(尝试{attempt}/{max_attempts})")
hf = get_hidden_fields(soup)
if not hf["vs"]:
break
continue
# 成功获取新数据
new_rows = []
for r in page_rows:
rid = r[0] if r else ""
if rid and rid in seen_ids:
continue # 跳过已存在的行
new_rows.append(r)
if rid:
seen_ids.add(rid)
data_list.extend(new_rows)
# 测试模式截断
if TEST_MODE and len(data_list) > effective_limit:
data_list = data_list[:effective_limit]
pbar.update(1)
success = True
break
if not success:
reset_count += 1
if reset_count > max_resets:
log(f"{page_idx}页已重置{reset_count}次仍失败,跳过剩余页")
break
log(f"{page_idx}页连续{max_attempts}次失败,重新初始化session({reset_count}/{max_resets})...")
soup, ok = reset_customer_session(page_url, extra_h)
if not ok:
log(f" 初始化失败,跳过剩余页")
break
# 不增加 page_idx,重试当前页
continue
page_idx += 1
log(f"客户信息共抓取 {len(data_list)} 条(去重后)")
df = pd.DataFrame(data_list, columns=header_list if header_list else None)
df = df.dropna(how="all")
df = df[df.apply(lambda row: any(str(v).strip() for v in row), axis=1)]
# 去掉操作列(编辑、删除)
op_cols = [c for c in df.columns if c in ("编辑", "删除") or str(c).startswith("Unnamed")]
if op_cols:
df = df.drop(columns=op_cols)
out = os.path.join(DESKTOP, "快修哥_客户信息.xlsx")
df.to_excel(out, index=False)
log(f"客户信息已保存: {out}")
return out
# ══════════════════════════════════════════════════════════════
# 2. 库存统计
# ══════════════════════════════════════════════════════════════
def make_stock_data(page_no, viewstate, vs_gen, event_val, is_first=False):
d = {
"__EVENTTARGET": "" if is_first else "AspNetPager1",
"__EVENTARGUMENT": "" if is_first else str(page_no),
"__VIEWSTATE": viewstate,
"__VIEWSTATEGENERATOR": vs_gen,
"__EVENTVALIDATION": event_val,
"__VIEWSTATEENCRYPTED": "",
"TextPjmc": "",
"TextKwh": "",
"TextVehicle": "",
}
if is_first:
d["Button3"] = "搜索"
return d
def reset_stock_session(page_url, extra_h):
"""重新 GET 页面 + 搜索,返回 (soup, ok)"""
r0 = session.get(page_url, timeout=20, verify=False)
if "登录" in r0.text and "ucode" in r0.text:
log(" Session过期,重新登录...")
login()
r0 = session.get(page_url, timeout=20, verify=False)
soup = BeautifulSoup(r0.text, "html.parser")
hf = get_hidden_fields(soup)
if not hf["vs"]:
log(" 重置后仍无VIEWSTATE")
return soup, False
html = fetch_post(page_url, make_stock_data(
1, hf["vs"], hf["vs_gen"], hf["ev"], is_first=True
), extra_h)
soup = BeautifulSoup(html, "html.parser")
return soup, True
def export_stock():
log("\n========== 导出库存统计 ==========")
page_url = f"{BASE}/stockpj.aspx?clientWidth=1614"
ref = f"{BASE}/stockpj.aspx"
extra_h = {"Referer": ref, "Origin": BASE, "Host": HOST_DOMAIN}
data_list = []
header_list = []
seen_ids = set()
soup, ok = reset_stock_session(page_url, extra_h)
if not ok:
log("初始化失败")
return None
header_list, rows = extract_table(soup)
if not rows:
log("未找到库存表格")
return None
data_list.extend(rows)
for r in rows:
if r and r[0]:
seen_ids.add(r[0])
# 测试模式:限制总条数
effective_limit = TEST_LIMIT if TEST_MODE else float('inf')
if len(data_list) >= effective_limit:
data_list = data_list[:effective_limit]
total_pages = parse_total_pages(soup, len(rows))
log(f"库存统计共 {total_pages} 页,开始爬取...")
if TEST_MODE:
log(f"[测试模式] 目标 {TEST_LIMIT} 条,当前已 {len(data_list)}")
if len(data_list) < effective_limit:
page_idx = 2
max_attempts = 5
max_resets = 3
reset_count = 0
with tqdm(total=total_pages - 1, desc="库存统计") as pbar:
while page_idx <= total_pages:
# 测试模式:已达目标条数则退出
if len(data_list) >= effective_limit:
if TEST_MODE:
log(f"[测试模式] 已达 {effective_limit} 条,停止翻页")
break
hf = get_hidden_fields(soup)
if not hf["vs"]:
log(f"{page_idx}页VIEWSTATE丢失,重新初始化...")
soup, ok = reset_stock_session(page_url, extra_h)
if not ok:
break
log(f" 重新翻页到第{page_idx}页...")
hf = get_hidden_fields(soup)
html = fetch_post(page_url, make_stock_data(
1, hf["vs"], hf["vs_gen"], hf["ev"], is_first=True
), extra_h)
soup = BeautifulSoup(html, "html.parser")
for skip_pg in range(2, page_idx):
hf = get_hidden_fields(soup)
if not hf["vs"]:
break
html = fetch_post(page_url, make_stock_data(
skip_pg, hf["vs"], hf["vs_gen"], hf["ev"]
), extra_h)
soup = BeautifulSoup(html, "html.parser")
continue
attempt = 0
success = False
while attempt < max_attempts:
attempt += 1
html = fetch_post(page_url, make_stock_data(
page_idx, hf["vs"], hf["vs_gen"], hf["ev"]
), extra_h)
soup = BeautifulSoup(html, "html.parser")
_, page_rows = extract_table(soup)
if not page_rows:
log(f"{page_idx}页无数据(尝试{attempt}/{max_attempts})")
hf = get_hidden_fields(soup)
if not hf["vs"]:
break
continue
first_id = page_rows[0][0] if page_rows else ""
if first_id and first_id in seen_ids:
log(f"{page_idx}页首条ID={first_id}已存在(尝试{attempt}/{max_attempts})")
hf = get_hidden_fields(soup)
if not hf["vs"]:
break
continue
new_rows = []
for r in page_rows:
rid = r[0] if r else ""
if rid and rid in seen_ids:
continue
new_rows.append(r)
if rid:
seen_ids.add(rid)
data_list.extend(new_rows)
# 测试模式截断
if TEST_MODE and len(data_list) > effective_limit:
data_list = data_list[:effective_limit]
pbar.update(1)
success = True
break
if not success:
reset_count += 1
if reset_count > max_resets:
log(f"{page_idx}页已重置{reset_count}次仍失败,跳过剩余页")
break
log(f"{page_idx}页连续{max_attempts}次失败,重新初始化session({reset_count}/{max_resets})...")
soup, ok = reset_stock_session(page_url, extra_h)
if not ok:
break
continue
page_idx += 1
log(f"库存统计共抓取 {len(data_list)} 条(去重后)")
df = pd.DataFrame(data_list, columns=header_list if header_list else None)
df = df.dropna(how="all")
df = df[df.apply(lambda row: any(str(v).strip() for v in row), axis=1)]
out = os.path.join(DESKTOP, "快修哥_库存统计.xlsx")
df.to_excel(out, index=False)
log(f"库存统计已保存: {out}")
return out
if __name__ == "__main__":
ok = login()
if not ok:
log("登录失败")
sys.exit(1)
customer_path = export_customer_info()
stock_path = None
try:
stock_path = export_stock()
except Exception as e:
log(f"库存导出失败: {e}")
log("\n========== 全部完成 ==========")
if customer_path:
log(f" 客户信息: {customer_path}")
if stock_path:
log(f" 库存统计: {stock_path}")