Files
F6--/张阳脚本/竞品系统数据导出/H1会员卡.py
T
2026-04-18 09:22:23 +08:00

282 lines
8.8 KiB
Python

import argparse
import csv
import json
import os
import re
import time
from html import unescape
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
from urllib.parse import urlencode
import requests
BASE_URL = "https://scrm.h1cd.com"
DEFAULT_COOKIES = {
"showSmsActivity": "1",
"showEasyMoney": "1",
"LOGIN_URL": "https%3A%2F%2Fscrm.h1cd.com%2Flogin-h1cd.html",
"adminpd": "jVISiRrtcJplFhLoCuUIxK9XG5ekdfwzq%2B0y482ZKxE%3D",
"adminun": "15224781773",
"uid": "10291",
"PHPSESSID": "nbn58laakng0rv5iqln82a6qpu",
}
DEFAULT_HEADERS = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "iframe",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36 Edg/146.0.0.0",
"sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Microsoft Edge";v="146"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
}
DEFAULT_PARAMS = {
"type": "",
"expired": "",
"storeId": "0",
"search": "",
}
_TABLE_RE = re.compile(r"<table\b[^>]*>.*?</table>", re.IGNORECASE | re.DOTALL)
_TR_RE = re.compile(r"<tr\b[^>]*>.*?</tr>", re.IGNORECASE | re.DOTALL)
_CELL_RE = re.compile(r"<t[hd]\b[^>]*>(.*?)</t[hd]>", re.IGNORECASE | re.DOTALL)
_TAG_RE = re.compile(r"<[^>]+>", re.DOTALL)
_BR_RE = re.compile(r"<\s*br\s*/?\s*>", re.IGNORECASE)
_NBSP_RE = re.compile(r"(\xa0|&nbsp;)+", re.IGNORECASE)
def _clean_html_text(raw: str) -> str:
raw = _BR_RE.sub("\n", raw)
raw = _TAG_RE.sub("", raw)
raw = unescape(raw)
raw = _NBSP_RE.sub(" ", raw)
raw = raw.replace("\r", "").strip()
raw = re.sub(r"[ \t]+\n", "\n", raw)
raw = re.sub(r"\n{3,}", "\n\n", raw)
raw = re.sub(r"[ \t]{2,}", " ", raw)
return raw.strip()
def build_cards_url(page: int) -> str:
if page <= 1:
return f"{BASE_URL}/admin/members/cards.html"
return f"{BASE_URL}/admin/members/cards_{page}.html"
def _load_cookies() -> Dict[str, str]:
env_json = os.environ.get("H1_COOKIES_JSON")
if env_json:
loaded = json.loads(env_json)
if not isinstance(loaded, dict):
raise ValueError("H1_COOKIES_JSON must be a JSON object")
return {str(k): str(v) for k, v in loaded.items()}
return dict(DEFAULT_COOKIES)
def _fetch_html(
session: requests.Session,
page: int,
params: Dict[str, str],
base_headers: Dict[str, str],
timeout_seconds: int = 30,
) -> str:
url = build_cards_url(page)
headers = dict(base_headers)
if page >= 2:
referer_params = dict(params)
headers["Referer"] = f"{build_cards_url(page - 1)}?{urlencode(referer_params, doseq=True)}"
else:
referer_params = dict(params)
headers["Referer"] = f"{build_cards_url(1)}?{urlencode(referer_params, doseq=True)}"
resp = session.get(url, params=params, headers=headers, timeout=timeout_seconds)
resp.raise_for_status()
if not resp.encoding:
resp.encoding = "utf-8"
return resp.text
def _parse_table(table_html: str) -> Tuple[List[str], List[List[str]]]:
header: List[str] = []
data_rows: List[List[str]] = []
for tr_match in _TR_RE.finditer(table_html):
tr_html = tr_match.group(0)
cell_html_list = _CELL_RE.findall(tr_html)
if not cell_html_list:
continue
cells = [_clean_html_text(c) for c in cell_html_list]
if not any(cells):
continue
is_header = bool(re.search(r"<th\b", tr_html, re.IGNORECASE))
if is_header and not header:
header = cells
else:
data_rows.append(cells)
if not data_rows:
return header, []
if not header:
max_cols = max(len(r) for r in data_rows)
header = [f"col_{i + 1}" for i in range(max_cols)]
width = len(header)
normalized_rows: List[List[str]] = []
for row in data_rows:
if len(row) < width:
row = row + [""] * (width - len(row))
elif len(row) > width:
row = row[:width]
normalized_rows.append(row)
return header, normalized_rows
def parse_cards_page(html_text: str) -> Tuple[List[str], List[Dict[str, str]]]:
tables = _TABLE_RE.findall(html_text)
best_header: List[str] = []
best_rows: List[List[str]] = []
for table_html in tables:
header, rows = _parse_table(table_html)
if len(header) <= 1:
continue
if len(rows) > len(best_rows):
best_header, best_rows = header, rows
if not best_rows:
return best_header, []
records = [dict(zip(best_header, row)) for row in best_rows]
return best_header, records
def _merge_headers(existing: List[str], incoming: Sequence[str]) -> List[str]:
seen = set(existing)
merged = list(existing)
for col in incoming:
if col not in seen:
merged.append(col)
seen.add(col)
return merged
def export_all_cards(
output_csv_path: str,
params: Optional[Dict[str, str]] = None,
headers: Optional[Dict[str, str]] = None,
max_pages: int = 200,
sleep_seconds: float = 0.3,
) -> Tuple[int, int]:
cookies = _load_cookies()
params = dict(DEFAULT_PARAMS if params is None else params)
headers = dict(DEFAULT_HEADERS if headers is None else headers)
session = requests.Session()
session.cookies.update(cookies)
all_records: List[Dict[str, str]] = []
merged_header: List[str] = []
seen_keys: set[Tuple[str, ...]] = set()
pages_fetched = 0
for page in range(1, max_pages + 1):
html_text = _fetch_html(session=session, page=page, params=params, base_headers=headers)
page_header, page_records = parse_cards_page(html_text)
pages_fetched += 1
if not page_records:
break
merged_header = _merge_headers(merged_header, page_header)
for rec in page_records:
key = tuple(rec.get(col, "") for col in page_header)
if key in seen_keys:
continue
seen_keys.add(key)
all_records.append(rec)
if sleep_seconds > 0:
time.sleep(sleep_seconds)
if not all_records:
raise RuntimeError("未解析到任何表格数据(可能是登录失效/页面结构变化/被重定向到登录页)")
if not merged_header:
merged_header = sorted({k for r in all_records for k in r.keys()})
os.makedirs(os.path.dirname(os.path.abspath(output_csv_path)) or ".", exist_ok=True)
with open(output_csv_path, "w", encoding="utf-8-sig", newline="") as f:
writer = csv.DictWriter(f, fieldnames=merged_header, extrasaction="ignore")
writer.writeheader()
for rec in all_records:
writer.writerow({k: rec.get(k, "") for k in merged_header})
return pages_fetched, len(all_records)
def _self_test() -> None:
html_text = """
<html><body>
<table>
<tr><th>会员名</th><th>卡号</th><th>余额</th></tr>
<tr><td>张三</td><td>NO001</td><td>100</td></tr>
<tr><td>李四</td><td>NO002</td><td>200</td></tr>
</table>
</body></html>
"""
header, records = parse_cards_page(html_text)
assert header == ["会员名", "卡号", "余额"]
assert records[0]["卡号"] == "NO001"
assert records[1]["余额"] == "200"
def main(argv: Optional[Sequence[str]] = None) -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--output", default="H1会员卡.csv")
parser.add_argument("--storeId", default=DEFAULT_PARAMS["storeId"])
parser.add_argument("--search", default=DEFAULT_PARAMS["search"])
parser.add_argument("--type", default=DEFAULT_PARAMS["type"])
parser.add_argument("--expired", default=DEFAULT_PARAMS["expired"])
parser.add_argument("--max-pages", type=int, default=200)
parser.add_argument("--sleep", type=float, default=0.3)
parser.add_argument("--self-test", action="store_true")
args = parser.parse_args(argv)
if args.self_test:
_self_test()
print("self-test ok")
return 0
params = {
"type": str(args.type),
"expired": str(args.expired),
"storeId": str(args.storeId),
"search": str(args.search),
}
pages_fetched, rows = export_all_cards(
output_csv_path=args.output,
params=params,
max_pages=args.max_pages,
sleep_seconds=args.sleep,
)
print(f"导出完成: pages={pages_fetched}, rows={rows}, output={os.path.abspath(args.output)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())