440 lines
16 KiB
Python
440 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
快富通系统数据导出脚本
|
|
功能:登录快富通 -> 导出客户信息(个人+单位) + 会员卡项目明细 -> 输出Excel
|
|
用法:python 快富通数据导出.py [账号] [密码] [--limit 50]
|
|
"""
|
|
|
|
import requests
|
|
import sys
|
|
import io
|
|
import time
|
|
import re
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
# 修复Windows终端编码 + 实时flush
|
|
if sys.platform == "win32":
|
|
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
|
|
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
|
|
|
|
def p(msg=""):
|
|
print(msg, flush=True)
|
|
|
|
try:
|
|
import openpyxl
|
|
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
|
|
except ImportError:
|
|
import subprocess
|
|
subprocess.check_call(["pip", "install", "openpyxl"], stdout=subprocess.DEVNULL)
|
|
import openpyxl
|
|
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
|
|
|
|
|
|
# ============ 配置 ============
|
|
BASE_URL = "https://fos-api.lunz.cn/api"
|
|
APP_KEY = "8e240000-3e12-0016-1e0f-08d58267a484"
|
|
LOGIN_URL = f"{BASE_URL}/Membership/Login"
|
|
MEMBER_LIST_URL = f"{BASE_URL}/member/GetMemberinfoList"
|
|
MEMBER_PRODUCT_URL = f"{BASE_URL}/memberProduc/GetMemProductByMemId"
|
|
PAGE_SIZE = 200
|
|
MAX_WORKERS = 1 # 单线程,取消并发
|
|
|
|
OUTPUT_DIR = Path.home() / "Desktop"
|
|
|
|
|
|
def progress_bar(current, total, prefix="", width=30):
|
|
pct = current / total if total > 0 else 0
|
|
filled = int(width * pct)
|
|
bar = "\u2588" * filled + "\u2591" * (width - filled)
|
|
return f"{prefix} |{bar}| {current}/{total} ({pct:.0%})"
|
|
|
|
|
|
def login(username: str, password: str) -> dict:
|
|
headers = {
|
|
"accept": "application/json, text/plain, */*",
|
|
"appkey": APP_KEY,
|
|
"content-type": "application/json",
|
|
"origin": "https://fos.lunz.cn",
|
|
}
|
|
payload = {"username": username, "password": password}
|
|
p(f"[login] ...")
|
|
resp = requests.post(LOGIN_URL, headers=headers, json=payload, timeout=30)
|
|
data = resp.json()
|
|
|
|
if not data.get("Success"):
|
|
raise Exception(f"login failed: {data.get('Messages', data)}")
|
|
|
|
login_data = data["Data"]
|
|
token = login_data["tokenId"]
|
|
store_name = login_data.get("qxentity", {}).get("businessproductunitName", "?")
|
|
p(f"[login] OK store={store_name} token={token[:8]}...")
|
|
return login_data
|
|
|
|
|
|
def build_headers(login_data: dict) -> dict:
|
|
qx = login_data["qxentity"]
|
|
return {
|
|
"accept": "application/json, text/plain, */*",
|
|
"appkey": APP_KEY,
|
|
"audl_user": qx["memberuserid"],
|
|
"authtoken": login_data["tokenId"],
|
|
"content-type": "application/json",
|
|
"openchainsign": qx["openChainSign"],
|
|
"origin": "https://fos.lunz.cn",
|
|
"signstring": qx["signstring"],
|
|
"ucuserid": qx["userid"],
|
|
}
|
|
|
|
|
|
def fetch_all_members(headers: dict, qxentity: dict, mem_type: int, limit: int = 0) -> list:
|
|
type_name = "个人客户" if mem_type == 1 else "单位客户"
|
|
all_data = []
|
|
page_index = 1
|
|
|
|
while True:
|
|
payload = {
|
|
"paging": {
|
|
"pageSize": PAGE_SIZE,
|
|
"pageIndex": page_index,
|
|
"sort": [],
|
|
"filters": [
|
|
{"field": "IsDisplay", "op": "eq", "Term": "1"},
|
|
{"field": "Enabled", "op": "eq", "Term": "1"},
|
|
],
|
|
},
|
|
"qxentity": qxentity,
|
|
"searchValue": "",
|
|
"memType": mem_type,
|
|
}
|
|
|
|
p(f" [{type_name}] page {page_index} ...")
|
|
resp = requests.post(MEMBER_LIST_URL, headers=headers, json=payload, timeout=30)
|
|
result = resp.json()
|
|
|
|
if not result.get("Success"):
|
|
p(f" [{type_name}] ERROR: {result.get('Messages', result)}")
|
|
break
|
|
|
|
items = result.get("Data", [])
|
|
all_data.extend(items)
|
|
p(f" [{type_name}] page {page_index} +{len(items)} total {len(all_data)}")
|
|
|
|
# limit check
|
|
if limit > 0 and len(all_data) >= limit:
|
|
all_data = all_data[:limit]
|
|
break
|
|
|
|
if len(items) < PAGE_SIZE:
|
|
break
|
|
page_index += 1
|
|
|
|
p(f"[{type_name}] DONE {len(all_data)}")
|
|
return all_data
|
|
|
|
|
|
def _fetch_one_product(args):
|
|
"""获取一个客户的会员卡列表"""
|
|
headers, mer_store_id, member = args
|
|
member_id = member.get("MemberId") or member.get("Id")
|
|
params = {"merStoreId": mer_store_id, "memberId": member_id}
|
|
try:
|
|
resp = requests.get(MEMBER_PRODUCT_URL, params=params, headers=headers, timeout=15)
|
|
result = resp.json()
|
|
if result.get("Success"):
|
|
prods = result.get("Data", [])
|
|
for prod in prods:
|
|
prod["_MemName"] = member.get("MemName", "")
|
|
prod["_MemMobile"] = member.get("MemMobile", "")
|
|
prod["_MemberId"] = member_id
|
|
return prods
|
|
except Exception:
|
|
pass
|
|
return []
|
|
|
|
|
|
def expand_card_items(products: list) -> list:
|
|
"""
|
|
将会员卡的一对多项目展开为明细行
|
|
每行: 卡信息 + 单个项目(项目名, 总次数, 剩余次数)
|
|
"""
|
|
card_items = []
|
|
for card in products:
|
|
item_names = card.get("BuyProductItemName", "")
|
|
item_amounts = card.get("BuyProductItemAmount", "")
|
|
item_ex_amounts = card.get("BuyProductItemExAmount", "")
|
|
|
|
if not item_names:
|
|
# 没有项目明细的卡,保留为一行(项目列为空)
|
|
row = {
|
|
"客户姓名": card.get("_MemName", ""),
|
|
"手机号": card.get("_MemMobile", ""),
|
|
"客户ID": card.get("_MemberId", ""),
|
|
"会员卡ID": card.get("Id", ""),
|
|
"套餐名称": card.get("ProductName", ""),
|
|
"项目名称": "",
|
|
"总次数": "",
|
|
"剩余次数": "",
|
|
"实付金额": card.get("ActualPrice", ""),
|
|
"套餐价格": card.get("ProductPrice", ""),
|
|
"支付方式": card.get("PayTypeName", ""),
|
|
"购买渠道": card.get("BuyChannelName", ""),
|
|
"是否有余量": "是" if card.get("IsSurplus") else "否",
|
|
"是否撤销": "是" if card.get("IsRevoke") else "否",
|
|
"到期时间": card.get("ExpiryTime", ""),
|
|
"购买时间": card.get("CreatedAt", ""),
|
|
"操作员工": card.get("BonusEmployName", ""),
|
|
"绑定车牌": card.get("MemCarList", ""),
|
|
"备注": card.get("Remark", ""),
|
|
}
|
|
card_items.append(row)
|
|
continue
|
|
|
|
names = item_names.split(";") if item_names else []
|
|
amounts = item_amounts.split(";") if item_amounts else []
|
|
ex_amounts = item_ex_amounts.split(";") if item_ex_amounts else []
|
|
|
|
for i, name in enumerate(names):
|
|
name = name.strip()
|
|
if not name:
|
|
continue
|
|
total = amounts[i].strip() if i < len(amounts) else ""
|
|
remain = ex_amounts[i].strip() if i < len(ex_amounts) else ""
|
|
row = {
|
|
"客户姓名": card.get("_MemName", ""),
|
|
"手机号": card.get("_MemMobile", ""),
|
|
"客户ID": card.get("_MemberId", ""),
|
|
"会员卡ID": card.get("Id", ""),
|
|
"套餐名称": card.get("ProductName", ""),
|
|
"项目名称": name,
|
|
"总次数": total,
|
|
"剩余次数": remain,
|
|
"实付金额": card.get("ActualPrice", "") if i == 0 else "", # 只在第一行显示卡级别信息
|
|
"套餐价格": card.get("ProductPrice", "") if i == 0 else "",
|
|
"支付方式": card.get("PayTypeName", "") if i == 0 else "",
|
|
"购买渠道": card.get("BuyChannelName", "") if i == 0 else "",
|
|
"是否有余量": "是" if card.get("IsSurplus") else "否" if i == 0 else "",
|
|
"是否撤销": "是" if card.get("IsRevoke") else "否" if i == 0 else "",
|
|
"到期时间": card.get("ExpiryTime", "") if i == 0 else "",
|
|
"购买时间": card.get("CreatedAt", "") if i == 0 else "",
|
|
"操作员工": card.get("BonusEmployName", "") if i == 0 else "",
|
|
"绑定车牌": card.get("MemCarList", "") if i == 0 else "",
|
|
"备注": card.get("Remark", "") if i == 0 else "",
|
|
}
|
|
card_items.append(row)
|
|
|
|
return card_items
|
|
|
|
|
|
def fetch_all_products(headers: dict, members: list, mer_store_id: str, limit: int = 0) -> list:
|
|
"""并发获取会员卡并展开为项目明细"""
|
|
if limit > 0:
|
|
members = members[:limit]
|
|
|
|
total = len(members)
|
|
if total == 0:
|
|
return []
|
|
|
|
p(f"[会员卡] fetching {total} members (workers={MAX_WORKERS}) ...")
|
|
|
|
all_products = []
|
|
task_args = [(headers, mer_store_id, m) for m in members]
|
|
|
|
done_count = 0
|
|
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
|
futures = {executor.submit(_fetch_one_product, arg): i for i, arg in enumerate(task_args)}
|
|
for future in as_completed(futures):
|
|
done_count += 1
|
|
prods = future.result()
|
|
all_products.extend(prods)
|
|
if done_count % 20 == 0 or done_count == total:
|
|
p(f" {progress_bar(done_count, total, prefix='cards')} got {len(all_products)} cards")
|
|
|
|
p(f"[会员卡] raw cards: {len(all_products)}")
|
|
|
|
# 展开为项目明细
|
|
card_items = expand_card_items(all_products)
|
|
p(f"[会员卡] expanded items: {len(card_items)}")
|
|
return card_items
|
|
|
|
|
|
# ============ 字段定义 ============
|
|
MEMBER_FIELDS = [
|
|
("MemberId", "客户ID"), ("MemName", "客户姓名"), ("MemMobile", "手机号"),
|
|
("MemSexType", "性别"), ("MemCardNo", "卡号"), ("MemBirthDay", "生日"),
|
|
("MemCardGradeName", "会员等级"), ("BalancePrice", "余额"),
|
|
("IntegralAmount", "积分"), ("TotalRechargePrice", "累计充值"),
|
|
("TotalConsumPrice", "累计消费"), ("TotalConsumTimes", "消费次数"),
|
|
("LastConsumTime", "最近消费时间"), ("ToStoreTimes", "到店次数"),
|
|
("MemCarsAmount", "车辆数"), ("AllCarPlateNumber", "车牌号"),
|
|
("AllCarModelName", "车型"), ("MemCreatedAt", "注册时间"),
|
|
("MemCompany", "公司"), ("MemAddress", "地址"),
|
|
("MerStoreName", "所属门店"), ("Remark", "备注"),
|
|
]
|
|
|
|
# 会员卡项目明细 - 字段顺序
|
|
CARD_ITEM_COLUMNS = [
|
|
"客户姓名", "手机号", "客户ID", "会员卡ID", "套餐名称",
|
|
"项目名称", "总次数", "剩余次数",
|
|
"实付金额", "套餐价格", "支付方式", "购买渠道",
|
|
"是否有余量", "是否撤销", "到期时间", "购买时间",
|
|
"操作员工", "绑定车牌", "备注",
|
|
]
|
|
|
|
|
|
def format_value(val):
|
|
if val is None:
|
|
return ""
|
|
if isinstance(val, bool):
|
|
return "是" if val else "否"
|
|
if isinstance(val, str) and "T" in val and len(val) > 18:
|
|
try:
|
|
dt = datetime.fromisoformat(val.replace("Z", "+00:00"))
|
|
return dt.strftime("%Y-%m-%d %H:%M:%S")
|
|
except Exception:
|
|
return val
|
|
return val
|
|
|
|
|
|
def write_excel(personal_members, company_members, card_items, output_path):
|
|
p("[Excel] writing ...")
|
|
wb = openpyxl.Workbook()
|
|
|
|
hdr_font = Font(bold=True, color="FFFFFF", size=11)
|
|
hdr_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
|
|
hdr_align = Alignment(horizontal="center", vertical="center", wrap_text=True)
|
|
cell_align = Alignment(vertical="center", wrap_text=True)
|
|
border = Border(left=Side("thin"), right=Side("thin"), top=Side("thin"), bottom=Side("thin"))
|
|
|
|
def write_member_sheet(ws, fields, data):
|
|
for c, (_, name) in enumerate(fields, 1):
|
|
cell = ws.cell(1, c, name)
|
|
cell.font = hdr_font; cell.fill = hdr_fill; cell.alignment = hdr_align; cell.border = border
|
|
for r, item in enumerate(data, 2):
|
|
for c, (key, _) in enumerate(fields, 1):
|
|
cell = ws.cell(r, c, format_value(item.get(key, "")))
|
|
cell.alignment = cell_align; cell.border = border
|
|
# auto width
|
|
for c in range(1, len(fields) + 1):
|
|
col_letter = openpyxl.utils.get_column_letter(c)
|
|
mx = 6
|
|
for row in ws.iter_rows(min_col=c, max_col=c):
|
|
for cell in row:
|
|
v = str(cell.value or "")
|
|
w = sum(2 if ord(ch) > 127 else 1 for ch in v)
|
|
if w > mx:
|
|
mx = w
|
|
ws.column_dimensions[col_letter].width = min(mx + 4, 50)
|
|
|
|
def write_card_item_sheet(ws, columns, data):
|
|
# header with color coding
|
|
item_fill = PatternFill(start_color="ED7D31", end_color="ED7D31", fill_type="solid")
|
|
for c, name in enumerate(columns, 1):
|
|
cell = ws.cell(1, c, name)
|
|
cell.font = hdr_font
|
|
# 项目名称/总次数/剩余次数 用橙色高亮
|
|
cell.fill = item_fill if name in ("项目名称", "总次数", "剩余次数") else hdr_fill
|
|
cell.alignment = hdr_align; cell.border = border
|
|
for r, item in enumerate(data, 2):
|
|
for c, key in enumerate(columns, 1):
|
|
cell = ws.cell(r, c, format_value(item.get(key, "")))
|
|
cell.alignment = cell_align; cell.border = border
|
|
# auto width
|
|
for c in range(1, len(columns) + 1):
|
|
col_letter = openpyxl.utils.get_column_letter(c)
|
|
mx = 6
|
|
for row in ws.iter_rows(min_col=c, max_col=c):
|
|
for cell in row:
|
|
v = str(cell.value or "")
|
|
w = sum(2 if ord(ch) > 127 else 1 for ch in v)
|
|
if w > mx:
|
|
mx = w
|
|
ws.column_dimensions[col_letter].width = min(mx + 4, 50)
|
|
|
|
ws1 = wb.active; ws1.title = "个人客户"
|
|
write_member_sheet(ws1, MEMBER_FIELDS, personal_members)
|
|
ws2 = wb.create_sheet("单位客户")
|
|
write_member_sheet(ws2, MEMBER_FIELDS, company_members)
|
|
ws3 = wb.create_sheet("会员卡项目明细")
|
|
write_card_item_sheet(ws3, CARD_ITEM_COLUMNS, card_items)
|
|
|
|
wb.save(str(output_path))
|
|
p(f"[Excel] OK -> {output_path}")
|
|
|
|
|
|
def parse_args():
|
|
"""解析命令行参数,支持 --limit N"""
|
|
args = sys.argv[1:]
|
|
username, password, limit = None, None, 0
|
|
|
|
i = 0
|
|
while i < len(args):
|
|
if args[i] == "--limit" and i + 1 < len(args):
|
|
limit = int(args[i + 1])
|
|
i += 2
|
|
elif username is None:
|
|
username = args[i]
|
|
i += 1
|
|
elif password is None:
|
|
password = args[i]
|
|
i += 1
|
|
else:
|
|
i += 1
|
|
|
|
return username, password, limit
|
|
|
|
|
|
def main():
|
|
p("=" * 50)
|
|
p(" 快富通系统数据导出工具")
|
|
p("=" * 50)
|
|
|
|
username, password, limit = parse_args()
|
|
|
|
if not username:
|
|
username = input("账号 [zhongdexinqcyp]: ").strip() or "zhongdexinqcyp"
|
|
if not password:
|
|
password = input("密码 [ZDX2018]: ").strip() or "ZDX2018"
|
|
|
|
limit_msg = f" (limit={limit})" if limit > 0 else ""
|
|
p(f"\nparams: user={username} limit={limit}{limit_msg}")
|
|
|
|
# 1. login
|
|
login_data = login(username, password)
|
|
qxentity = login_data["qxentity"]
|
|
headers = build_headers(login_data)
|
|
mer_store_id = qxentity["businessproductunitid"]
|
|
|
|
# 2. members
|
|
p("\n--- members ---")
|
|
t0 = time.time()
|
|
personal_members = fetch_all_members(headers, qxentity, mem_type=1, limit=limit)
|
|
company_members = fetch_all_members(headers, qxentity, mem_type=2, limit=limit)
|
|
p(f"[members] {time.time()-t0:.1f}s personal={len(personal_members)} company={len(company_members)}")
|
|
|
|
# 3. card items
|
|
p("\n--- card items ---")
|
|
t0 = time.time()
|
|
all_members = personal_members + company_members
|
|
card_items = fetch_all_products(headers, all_members, mer_store_id, limit=limit)
|
|
p(f"[card items] {time.time()-t0:.1f}s")
|
|
|
|
# 4. excel
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
store_name = qxentity.get("businessproductunitName", "未知门店")
|
|
suffix = f"_limit{limit}" if limit > 0 else ""
|
|
output_path = OUTPUT_DIR / f"快富通导出_{store_name}_{timestamp}{suffix}.xlsx"
|
|
write_excel(personal_members, company_members, card_items, output_path)
|
|
|
|
# 5. summary
|
|
p("\n" + "=" * 50)
|
|
p(f" DONE! store: {store_name}")
|
|
p(f" personal: {len(personal_members)} company: {len(company_members)} card_items: {len(card_items)}")
|
|
p(f" file: {output_path}")
|
|
p("=" * 50)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|