import argparse from pathlib import Path import pandas as pd def keep_latest_by_time(df: pd.DataFrame, store_col: str, time_col: str) -> pd.DataFrame: if store_col not in df.columns: raise ValueError(f"缺少列: {store_col}") if time_col not in df.columns: raise ValueError(f"缺少列: {time_col}") working = df.copy() working[store_col] = working[store_col].astype(str).fillna("") working[time_col] = working[time_col].astype(str).fillna("") working["_parsed_time"] = pd.to_datetime(working[time_col], errors="coerce") working["_row_order"] = range(len(working)) working = working.sort_values( by=["_parsed_time", "_row_order"], ascending=[True, True], kind="mergesort", na_position="first", ) latest = working.groupby(store_col, sort=False).tail(1) latest = latest.drop(columns=["_parsed_time", "_row_order"]).reset_index(drop=True) return latest def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser() parser.add_argument("--input", "-i", required=False, help="输入 Excel 路径(.xlsx)") parser.add_argument("--output", "-o", required=False, help="输出 Excel 路径(.xlsx)") parser.add_argument("--sheet", default="需要保留一条", help="Sheet 名称") parser.add_argument("--store-col", default="门店编码", help="门店编码列名") parser.add_argument("--time-col", default="创建时间", help="创建时间列名") parser.add_argument("--codes-output", required=False, help="可选:输出门店编码清单(.txt 或 .csv)") parser.add_argument("--demo", action="store_true", help="运行内置示例(不读写 Excel)") return parser def main() -> int: args = build_parser().parse_args() if args.demo: demo_df = pd.DataFrame( [ {"门店编码": "A001", "创建时间": "2026-03-01 10:00:00", "其他": "x"}, {"门店编码": "A001", "创建时间": "2026-03-05 09:00:00", "其他": "y"}, {"门店编码": "B002", "创建时间": "2026/03/02 12:00", "其他": "m"}, {"门店编码": "B002", "创建时间": "无效时间", "其他": "n"}, ] ) result = keep_latest_by_time(demo_df, store_col=args.store_col, time_col=args.time_col) print(result) print("门店编码:", ",".join(result[args.store_col].astype(str).tolist())) return 0 if not args.input: raise SystemExit("缺少参数 --input") input_path = Path(args.input).expanduser().resolve() if not input_path.exists(): raise SystemExit(f"输入文件不存在: {input_path}") df = pd.read_excel(input_path, sheet_name=args.sheet, dtype=str).fillna("") latest = keep_latest_by_time(df, store_col=args.store_col, time_col=args.time_col) output_path = Path(args.output).expanduser().resolve() if args.output else None if output_path is None: output_path = input_path.with_name(f"{input_path.stem}_保留最新.xlsx") with pd.ExcelWriter(output_path, engine="openpyxl") as writer: latest.to_excel(writer, sheet_name="保留最新", index=False) store_codes = latest[args.store_col].astype(str).tolist() print(f"保留行数: {len(latest)}") print(f"门店编码数量: {len(store_codes)}") if args.codes_output: codes_path = Path(args.codes_output).expanduser().resolve() if codes_path.suffix.lower() == ".csv": pd.DataFrame({args.store_col: store_codes}).to_csv(codes_path, index=False, encoding="utf-8-sig") else: codes_path.write_text("\n".join(store_codes), encoding="utf-8") print(f"输出文件: {output_path}") return 0 if __name__ == "__main__": raise SystemExit(main())