import os import pandas as pd import json import ast import re from datetime import datetime from chardet import detect import unicodedata # ============================== # 核心修复:处理数组类型数据的工具函数 # ============================== def is_empty_value(value): """判断值是否为空(支持数组、字符串、数值等所有类型)""" if pd.isna(value): return True if value is None: return True if isinstance(value, str) and value.strip() == '': return True if isinstance(value, (list, tuple, set)) and len(value) == 0: return True return False def clean_special_characters(value): """清理特殊字符(支持数组类型,避免歧义错误)""" # 处理空值 if is_empty_value(value): return value # 处理数组类型(如 ['a', 'b']) if isinstance(value, (list, tuple)): cleaned_list = [] for item in value: if isinstance(item, str): cleaned_list.append(_clean_single_string(item)) else: cleaned_list.append(item) return cleaned_list # 处理字符串类型 elif isinstance(value, str): return _clean_single_string(value) # 其他类型(数值、布尔等)直接返回 else: return value def _clean_single_string(text): """清理单个字符串的特殊字符(内部调用)""" try: # 移除控制字符(保留换行、制表符) cleaned = ''.join( char for char in text if unicodedata.category(char)[0] != 'C' or char in '\n\t' ) # 移除特定无法编码的字符(如右至左标记) special_chars = ['\u202d', '\u202c', '\u202a', '\u202b', '\u200b'] for char in special_chars: cleaned = cleaned.replace(char, '') # 替换全角空格为半角空格 cleaned = cleaned.replace('\u3000', ' ') return cleaned except: return text def get_safe_encoding(file_path=None): """获取安全的编码格式(优先UTF-8-SIG)""" if file_path and os.path.exists(file_path): with open(file_path, 'rb') as f: raw_data = f.read(10000) result = detect(raw_data) detected = result['encoding'] if detected in ['utf-8', 'utf-8-sig', 'gbk', 'gb2312']: return 'utf-8-sig' return 'utf-8-sig' # ============================== # 第一步:生成 expanded_yd_data.csv(彻底修复数组问题) # ============================== def generate_expanded_csv(output_dir): """生成展开后的源文件(支持数组类型数据)""" os.makedirs(output_dir, exist_ok=True) converted_csv_path = os.path.join(output_dir, "converted_yd_data.csv") if not os.path.exists(converted_csv_path): print(f"❌ 错误:converted_yd_data.csv 不存在,路径:{converted_csv_path}") return None # 读取文件(兼容不同编码) encoding = get_safe_encoding(converted_csv_path) try: df = pd.read_csv(converted_csv_path, encoding=encoding) print(f"✅ 成功读取 converted_yd_data.csv(编码:{encoding}),数据规模:{df.shape[0]}行 × {df.shape[1]}列") except Exception as e: print(f"❌ 读取 converted_yd_data.csv 失败:{str(e)}") return None if 'data' not in df.columns: print(f"❌ 错误:converted_yd_data.csv 中缺少 'data' 列") return None # 清理非data列的特殊字符(仅处理字符串列) non_data_cols = [col for col in df.columns if col != 'data'] for col in non_data_cols: if df[col].dtype == 'object': # 仅处理字符串类型列 df[col] = df[col].apply(clean_special_characters) # 检查并解析 data 列(核心:处理数组类型) sample = df['data'].dropna().iloc[0] if not df['data'].dropna().empty else "" print("Sample of 'data' column (after cleaning):") print(repr(str(sample))[:200]) # 转为字符串避免数组打印过长 if isinstance(sample, str): print("Detected string format, parsing with ast.literal_eval...") def safe_literal_eval(x): if is_empty_value(x): return {} try: # 先清理字符串,再解析 cleaned_x = clean_special_characters(x) if isinstance(x, str) else str(x) parsed = ast.literal_eval(cleaned_x) # 解析后再次清理(处理数组中的特殊字符) return clean_special_characters(parsed) except (ValueError, SyntaxError, TypeError) as e: print(f"Parse error on: {repr(str(x))[:100]}... Error: {str(e)[:50]}") return {} df['data'] = df['data'].apply(safe_literal_eval) # 展开 data 列(处理数组类型,转为字符串存储) def flatten_expanded_data(expanded_df): """展平数据,将数组转为字符串""" for col in expanded_df.columns: if expanded_df[col].dtype == 'object': expanded_df[col] = expanded_df[col].apply( lambda x: ','.join(map(str, x)) if isinstance(x, (list, tuple)) else x ) return expanded_df expanded = pd.json_normalize(df['data']) expanded = flatten_expanded_data(expanded) # 数组转字符串(如 ['a','b'] → "a,b") # 清理展开后的数据 for col in expanded.columns: expanded[col] = expanded[col].apply(clean_special_characters) # 合并数据 other_cols = df.drop(columns=['data']) new_df = pd.concat([other_cols.reset_index(drop=True), expanded.reset_index(drop=True)], axis=1) # 数据过滤(修复数组转字符串后的判断逻辑) if 'instanceStatus' in new_df.columns: # 确保过滤条件处理字符串 new_df['instanceStatus'] = new_df['instanceStatus'].astype(str) new_df = new_df[new_df["instanceStatus"].str.strip() == "RUNNING"] print(f"✅ 过滤后(instanceStatus=RUNNING):{new_df.shape[0]}行") else: print(f"⚠️ 警告:缺少 'instanceStatus' 列,跳过状态过滤") col = "textField_kto3q3ev" if col in new_df.columns: # 处理数组转字符串后的空值判断 new_df[col] = new_df[col].apply( lambda x: ','.join(map(str, x)) if isinstance(x, (list, tuple)) else x ).astype(str) mask2 = (new_df[col].str.strip() == "") | (new_df[col].str.strip() == "nan") new_df = new_df[mask2] print(f"✅ 过滤后({col}为空):{new_df.shape[0]}行") else: print(f"⚠️ 警告:缺少 '{col}' 列,跳过订单编码过滤") # 保存文件(UTF-8-SIG编码) expanded_csv_path = os.path.join(output_dir, "expanded_yd_data.csv") try: new_df.to_csv( expanded_csv_path, index=False, encoding='utf-8-sig', na_rep='', errors='replace' ) print(f"✅ Expanded data saved to: {expanded_csv_path}(编码:utf-8-sig)") return expanded_csv_path except Exception as e: print(f"❌ 保存 expanded_yd_data.csv 失败:{str(e)}") return None # ============================== # 第二步:格式转换核心函数(兼容数组处理结果) # ============================== def convert_to_data_ngv_format( source_csv_path, target_csv_path, main_output_path, additional_output_path, doc_output_path ): print("\n=== 开始格式转换(目标格式:data_NGV.csv)===") try: # 读取目标文件 if not os.path.exists(target_csv_path): raise FileNotFoundError(f"目标文件不存在:{target_csv_path}") # 兼容目标文件的编码 try: df_target = pd.read_csv(target_csv_path, encoding='utf-8-sig') target_encoding = 'utf-8-sig' except: df_target = pd.read_csv(target_csv_path, encoding='gbk') target_encoding = 'gbk' print(f"✅ 成功读取目标文件:{target_csv_path}(编码:{target_encoding})") print(f" 数据规模:{df_target.shape[0]}行 × {df_target.shape[1]}列") # 读取源文件(已处理数组问题) df_source = pd.read_csv(source_csv_path, encoding='utf-8-sig') print(f"✅ 成功读取源文件:{source_csv_path}(编码:utf-8-sig)") print(f" 数据规模:{df_source.shape[0]}行 × {df_source.shape[1]}列") except Exception as e: print(f"❌ 文件读取失败:{str(e)}") return False # 字段映射(适配 data_NGV 格式) print("\n=== 建立字段映射关系 ===") field_mapping = { 'createTimeGMT': 'saas_create_time', 'modifiedTimeGMT': 'etl_time', 'title': 'org_remark', 'instanceStatus': 'active_status_fmt', 'processInstanceId': 'id_own_org', 'actionExecutor': 'technician', 'originator': 'salesmen', 'textField_kuj8nx00': 'province_name', 'textField_kuj8nx01': 'city_name' } # 筛选有效映射 valid_mapping = {} for source_field, target_field in field_mapping.items(): if target_field in df_target.columns and source_field in df_source.columns: valid_mapping[source_field] = target_field print(f" {source_field} → {target_field}") if len(valid_mapping) == 0: print(f"⚠️ 警告:未找到有效字段映射,将仅填充默认值") # 字段分类 target_columns = set(df_target.columns) source_columns = set(df_source.columns) additional_columns = list(source_columns - target_columns - set(valid_mapping.keys())) print(f"\n=== 字段统计 ===") print(f" 目标文件总字段数:{len(target_columns)}") print(f" 源文件总字段数:{len(source_columns)}") print(f" 有效映射字段数:{len(valid_mapping)}") print(f" 额外字段数(放入单独文件):{len(additional_columns)}") # 创建结果数据结构 df_main = df_target.iloc[0:0].copy() df_main.insert(0, 'main_file_id', '') # 关联ID列 df_additional = pd.DataFrame(columns=['main_file_id'] + additional_columns) # 数据处理工具函数(兼容字符串化的数组) def extract_org_info(title_text): if is_empty_value(title_text): return '', '' title_str = str(title_text).strip() org_name_match = re.search(r'门店名称:([^,,\n]+)', title_str) org_code_match = re.search(r'门店编码:([^,,\n]+)', title_str) org_name = org_name_match.group(1).strip() if org_name_match else '' org_code = org_code_match.group(1).strip() if org_code_match else '' return org_name, org_code def extract_technician(executor_text): if is_empty_value(executor_text): return '' executor_str = str(executor_text).strip() # 匹配字符串化的数组中的中文姓名(如 "{'nameInChinese':'何钊'}") name_match = re.search(r"'nameInChinese':\s*'([^']+)'", executor_str) return name_match.group(1).strip() if name_match else '' def convert_gmt_time(gmt_str): if is_empty_value(gmt_str): return None try: time_str = str(gmt_str).replace('T', ' ').replace('Z', '').strip() return pd.to_datetime(time_str) except: return None # 逐行处理数据 print(f"\n=== 开始数据处理(共{len(df_source)}行) ===") for idx, source_row in df_source.iterrows(): main_file_id = f"REC-{idx:06d}" # 处理主文件数据 main_row = pd.Series(index=df_main.columns, dtype='object') main_row['main_file_id'] = main_file_id # 填充映射字段(处理字符串化的数组) for source_field, target_field in valid_mapping.items(): value = source_row[source_field] if not is_empty_value(value): # 将字符串化的数组转为普通字符串(如 "a,b" → "a,b") if isinstance(value, (list, tuple)): main_row[target_field] = ','.join(map(str, value)) else: main_row[target_field] = str(value).strip() else: main_row[target_field] = '' # 处理关键业务字段 create_time = convert_gmt_time(source_row.get('createTimeGMT')) if create_time: main_row['date_fmt'] = create_time.strftime('%Y/%m/%d') main_row['date_id'] = int(create_time.strftime('%Y%m%d')) main_row['pt'] = main_row['date_id'] # 提取门店信息 org_name, org_code = extract_org_info(source_row.get('title')) if org_name: main_row['org_name'] = org_name main_row['group_name'] = org_name if org_code: main_row['org_code'] = org_code # 提取处理人 technician = extract_technician(source_row.get('actionExecutor')) if technician: main_row['technician'] = technician # 填充默认值 default_values = { 'org_type': '一般', 'org_status': '留存', 'group_grade': '普通客户(VIP)', 'is_active': 1, 'active_status_fmt': '活跃', 'province_name': main_row.get('province_name', '未知'), 'city_name': main_row.get('city_name', '未知'), 'area_name': '未知', 'is_wechat': 0, 'is_mini_app': 0, 'id_own_group': 0, 'org_code': main_row.get('org_code', '') } for col, default_val in default_values.items(): if col in main_row.index and is_empty_value(main_row[col]): main_row[col] = default_val df_main.loc[idx] = main_row # 处理额外字段文件 additional_row = pd.Series(index=df_additional.columns, dtype='object') additional_row['main_file_id'] = main_file_id for col in additional_columns: if col in source_row.index: value = source_row[col] if not is_empty_value(value): # 统一转为字符串存储(兼容数组) if isinstance(value, (list, tuple)): additional_row[col] = ','.join(map(str, value)) else: additional_row[col] = str(value).strip() else: additional_row[col] = '' df_additional.loc[idx] = additional_row # 进度提示(大数据量优化) if (idx + 1) % 500 == 0 or (idx + 1) == len(df_source): print(f" 已处理 {idx + 1}/{len(df_source)} 行") # 数据类型优化 print(f"\n=== 优化数据类型 ===") numeric_cols = ['date_id', 'pt', 'is_active', 'is_wechat', 'is_mini_app', 'id_own_group', 'active_user_count', 'limit_user_count'] for col in numeric_cols: if col in df_main.columns: # 处理字符串格式的数值 df_main[col] = pd.to_numeric( df_main[col].astype(str).str.replace(',', ''), # 移除数组转字符串的逗号 errors='coerce' ).fillna(0).astype(int) # 填充空值 df_main = df_main.fillna('') df_additional = df_additional.fillna('') # 保存文件 print(f"\n=== 保存结果文件 ===") try: # 保存主文件(匹配 data_NGV 格式) df_main.to_csv( main_output_path, index=False, encoding='utf-8-sig', na_rep='', errors='replace' ) print(f"✅ 主文件保存成功: {main_output_path}(编码:utf-8-sig)") # 保存额外字段文件 df_additional.to_csv( additional_output_path, index=False, encoding='utf-8-sig', na_rep='', errors='replace' ) print(f"✅ 额外字段文件保存成功: {additional_output_path}(编码:utf-8-sig)") # 生成说明文档 generate_relation_doc(doc_output_path, main_output_path, additional_output_path, len(target_columns), len(source_columns), len(valid_mapping)) print(f"✅ 关联说明文档保存成功: {doc_output_path}") return True except Exception as e: print(f"❌ 文件保存失败: {str(e)}") return False # ============================== # 辅助函数:生成关联说明文档 # ============================== def generate_relation_doc(doc_path, main_file, additional_file, target_col_count, source_col_count, mapping_count): doc_content = f"""# 数据格式转换结果说明 ## 目标格式文件:data_NGV.csv ### 一、文件概述 1. **主文件(匹配目标格式)** - 文件名:{os.path.basename(main_file)} - 格式来源:完全匹配 data_NGV.csv 结构 - 数据规模:{pd.read_csv(main_file, encoding='utf-8-sig').shape[0]}行 × {target_col_count + 1}列 - 编码格式:UTF-8-SIG(兼容所有字符和数组数据) 2. **额外字段文件** - 文件名:{os.path.basename(additional_file)} - 包含内容:源文件中 data_NGV.csv 没有的字段 - 数据规模:{pd.read_csv(additional_file, encoding='utf-8-sig').shape[0]}行 × {pd.read_csv(additional_file, encoding='utf-8-sig').shape[1]}列 - 编码格式:UTF-8-SIG ### 二、关键处理说明 1. **数组数据处理**:源文件中的数组(如 ['a','b'])已转为字符串("a,b")存储,避免格式错误 2. **特殊字符清理**:自动移除无法编码的控制字符(如右至左标记 \u202d) 3. **编码统一**:所有文件使用 UTF-8-SIG 编码,兼容中文和特殊字符 ### 三、关联方法 1. **关联字段**:`main_file_id`(格式:REC-000000) 2. **Excel导入**:数据→自文本/CSV→选择文件→编码选择"UTF-8"→完成 ### 四、使用建议 1. 主文件可直接用于业务系统,与 data_NGV.csv 格式完全兼容 2. 额外字段文件用于原始数据追溯,通过 main_file_id 关联 3. 数组转字符串后的数据可通过 Excel 的"文本分列"功能恢复为数组 """ with open(doc_path, 'w', encoding='utf-8') as f: f.write(doc_content) # ============================== # 主执行函数 # ============================== def main(): # 配置文件路径(请根据您的实际位置修改!) base_dir = "D:\\Idea Project\\SaaS_V1.7\\test\\output" target_csv_path = "D:\\Idea Project\\SaaS_V1.7\\test\\output\\data_NGV.csv" # 生成 expanded_yd_data.csv(解决数组问题) expanded_csv_path = generate_expanded_csv(base_dir) if not expanded_csv_path: print("❌ 生成 expanded_yd_data.csv 失败,终止转换") return # 配置输出路径 main_output_path = os.path.join(base_dir, "主文件_匹配data_NGV格式.csv") additional_output_path = os.path.join(base_dir, "额外字段文件_源文件特有列.csv") doc_output_path = os.path.join(base_dir, "格式转换说明.md") # 执行转换 success = convert_to_data_ngv_format( source_csv_path=expanded_csv_path, target_csv_path=target_csv_path, main_output_path=main_output_path, additional_output_path=additional_output_path, doc_output_path=doc_output_path ) if success: print(f"\n🎉 格式转换全部完成!") print(f"📁 输出目录:{base_dir}") print(f"🔔 重要:Excel导入时需选择'UTF-8'编码,数组数据已转为逗号分隔字符串") else: print(f"\n❌ 格式转换失败,请查看日志信息排查问题") # ============================== # 执行入口 # ============================== if __name__ == "__main__": # API模块导入(不影响核心功能) try: from yd_api import YDAPI from api import API print("✅ 成功导入 API 模块") except ImportError: print("⚠️ 警告:未找到 yd_api 或 api 模块,跳过API初始化(不影响格式转换)") # 执行主流程 main()