524 lines
20 KiB
Python
524 lines
20 KiB
Python
import os
|
||
import pandas as pd
|
||
import json
|
||
import ast
|
||
import re
|
||
from datetime import datetime
|
||
from chardet import detect
|
||
import unicodedata
|
||
|
||
|
||
# ==============================
|
||
# 核心修复:处理数组类型数据的工具函数
|
||
# ==============================
|
||
def is_empty_value(value):
|
||
"""判断值是否为空(支持数组、字符串、数值等所有类型)"""
|
||
if pd.isna(value):
|
||
return True
|
||
if value is None:
|
||
return True
|
||
if isinstance(value, str) and value.strip() == '':
|
||
return True
|
||
if isinstance(value, (list, tuple, set)) and len(value) == 0:
|
||
return True
|
||
return False
|
||
|
||
|
||
def clean_special_characters(value):
|
||
"""清理特殊字符(支持数组类型,避免歧义错误)"""
|
||
# 处理空值
|
||
if is_empty_value(value):
|
||
return value
|
||
|
||
# 处理数组类型(如 ['a', 'b'])
|
||
if isinstance(value, (list, tuple)):
|
||
cleaned_list = []
|
||
for item in value:
|
||
if isinstance(item, str):
|
||
cleaned_list.append(_clean_single_string(item))
|
||
else:
|
||
cleaned_list.append(item)
|
||
return cleaned_list
|
||
|
||
# 处理字符串类型
|
||
elif isinstance(value, str):
|
||
return _clean_single_string(value)
|
||
|
||
# 其他类型(数值、布尔等)直接返回
|
||
else:
|
||
return value
|
||
|
||
|
||
def _clean_single_string(text):
|
||
"""清理单个字符串的特殊字符(内部调用)"""
|
||
try:
|
||
# 移除控制字符(保留换行、制表符)
|
||
cleaned = ''.join(
|
||
char for char in text
|
||
if unicodedata.category(char)[0] != 'C' or char in '\n\t'
|
||
)
|
||
# 移除特定无法编码的字符(如右至左标记)
|
||
special_chars = ['\u202d', '\u202c', '\u202a', '\u202b', '\u200b']
|
||
for char in special_chars:
|
||
cleaned = cleaned.replace(char, '')
|
||
# 替换全角空格为半角空格
|
||
cleaned = cleaned.replace('\u3000', ' ')
|
||
return cleaned
|
||
except:
|
||
return text
|
||
|
||
|
||
def get_safe_encoding(file_path=None):
|
||
"""获取安全的编码格式(优先UTF-8-SIG)"""
|
||
if file_path and os.path.exists(file_path):
|
||
with open(file_path, 'rb') as f:
|
||
raw_data = f.read(10000)
|
||
result = detect(raw_data)
|
||
detected = result['encoding']
|
||
if detected in ['utf-8', 'utf-8-sig', 'gbk', 'gb2312']:
|
||
return 'utf-8-sig'
|
||
return 'utf-8-sig'
|
||
|
||
|
||
# ==============================
|
||
# 第一步:生成 expanded_yd_data.csv(彻底修复数组问题)
|
||
# ==============================
|
||
def generate_expanded_csv(output_dir):
|
||
"""生成展开后的源文件(支持数组类型数据)"""
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
converted_csv_path = os.path.join(output_dir, "converted_yd_data.csv")
|
||
|
||
if not os.path.exists(converted_csv_path):
|
||
print(f"❌ 错误:converted_yd_data.csv 不存在,路径:{converted_csv_path}")
|
||
return None
|
||
|
||
# 读取文件(兼容不同编码)
|
||
encoding = get_safe_encoding(converted_csv_path)
|
||
try:
|
||
df = pd.read_csv(converted_csv_path, encoding=encoding)
|
||
print(f"✅ 成功读取 converted_yd_data.csv(编码:{encoding}),数据规模:{df.shape[0]}行 × {df.shape[1]}列")
|
||
except Exception as e:
|
||
print(f"❌ 读取 converted_yd_data.csv 失败:{str(e)}")
|
||
return None
|
||
|
||
if 'data' not in df.columns:
|
||
print(f"❌ 错误:converted_yd_data.csv 中缺少 'data' 列")
|
||
return None
|
||
|
||
# 清理非data列的特殊字符(仅处理字符串列)
|
||
non_data_cols = [col for col in df.columns if col != 'data']
|
||
for col in non_data_cols:
|
||
if df[col].dtype == 'object': # 仅处理字符串类型列
|
||
df[col] = df[col].apply(clean_special_characters)
|
||
|
||
# 检查并解析 data 列(核心:处理数组类型)
|
||
sample = df['data'].dropna().iloc[0] if not df['data'].dropna().empty else ""
|
||
print("Sample of 'data' column (after cleaning):")
|
||
print(repr(str(sample))[:200]) # 转为字符串避免数组打印过长
|
||
|
||
if isinstance(sample, str):
|
||
print("Detected string format, parsing with ast.literal_eval...")
|
||
|
||
def safe_literal_eval(x):
|
||
if is_empty_value(x):
|
||
return {}
|
||
try:
|
||
# 先清理字符串,再解析
|
||
cleaned_x = clean_special_characters(x) if isinstance(x, str) else str(x)
|
||
parsed = ast.literal_eval(cleaned_x)
|
||
# 解析后再次清理(处理数组中的特殊字符)
|
||
return clean_special_characters(parsed)
|
||
except (ValueError, SyntaxError, TypeError) as e:
|
||
print(f"Parse error on: {repr(str(x))[:100]}... Error: {str(e)[:50]}")
|
||
return {}
|
||
|
||
df['data'] = df['data'].apply(safe_literal_eval)
|
||
|
||
# 展开 data 列(处理数组类型,转为字符串存储)
|
||
def flatten_expanded_data(expanded_df):
|
||
"""展平数据,将数组转为字符串"""
|
||
for col in expanded_df.columns:
|
||
if expanded_df[col].dtype == 'object':
|
||
expanded_df[col] = expanded_df[col].apply(
|
||
lambda x: ','.join(map(str, x)) if isinstance(x, (list, tuple)) else x
|
||
)
|
||
return expanded_df
|
||
|
||
expanded = pd.json_normalize(df['data'])
|
||
expanded = flatten_expanded_data(expanded) # 数组转字符串(如 ['a','b'] → "a,b")
|
||
|
||
# 清理展开后的数据
|
||
for col in expanded.columns:
|
||
expanded[col] = expanded[col].apply(clean_special_characters)
|
||
|
||
# 合并数据
|
||
other_cols = df.drop(columns=['data'])
|
||
new_df = pd.concat([other_cols.reset_index(drop=True), expanded.reset_index(drop=True)], axis=1)
|
||
|
||
# 数据过滤(修复数组转字符串后的判断逻辑)
|
||
if 'instanceStatus' in new_df.columns:
|
||
# 确保过滤条件处理字符串
|
||
new_df['instanceStatus'] = new_df['instanceStatus'].astype(str)
|
||
new_df = new_df[new_df["instanceStatus"].str.strip() == "RUNNING"]
|
||
print(f"✅ 过滤后(instanceStatus=RUNNING):{new_df.shape[0]}行")
|
||
else:
|
||
print(f"⚠️ 警告:缺少 'instanceStatus' 列,跳过状态过滤")
|
||
|
||
col = "textField_kto3q3ev"
|
||
if col in new_df.columns:
|
||
# 处理数组转字符串后的空值判断
|
||
new_df[col] = new_df[col].apply(
|
||
lambda x: ','.join(map(str, x)) if isinstance(x, (list, tuple)) else x
|
||
).astype(str)
|
||
mask2 = (new_df[col].str.strip() == "") | (new_df[col].str.strip() == "nan")
|
||
new_df = new_df[mask2]
|
||
print(f"✅ 过滤后({col}为空):{new_df.shape[0]}行")
|
||
else:
|
||
print(f"⚠️ 警告:缺少 '{col}' 列,跳过订单编码过滤")
|
||
|
||
# 保存文件(UTF-8-SIG编码)
|
||
expanded_csv_path = os.path.join(output_dir, "expanded_yd_data.csv")
|
||
try:
|
||
new_df.to_csv(
|
||
expanded_csv_path,
|
||
index=False,
|
||
encoding='utf-8-sig',
|
||
na_rep='',
|
||
errors='replace'
|
||
)
|
||
print(f"✅ Expanded data saved to: {expanded_csv_path}(编码:utf-8-sig)")
|
||
return expanded_csv_path
|
||
except Exception as e:
|
||
print(f"❌ 保存 expanded_yd_data.csv 失败:{str(e)}")
|
||
return None
|
||
|
||
|
||
# ==============================
|
||
# 第二步:格式转换核心函数(兼容数组处理结果)
|
||
# ==============================
|
||
def convert_to_data_ngv_format(
|
||
source_csv_path,
|
||
target_csv_path,
|
||
main_output_path,
|
||
additional_output_path,
|
||
doc_output_path
|
||
):
|
||
print("\n=== 开始格式转换(目标格式:data_NGV.csv)===")
|
||
try:
|
||
# 读取目标文件
|
||
if not os.path.exists(target_csv_path):
|
||
raise FileNotFoundError(f"目标文件不存在:{target_csv_path}")
|
||
|
||
# 兼容目标文件的编码
|
||
try:
|
||
df_target = pd.read_csv(target_csv_path, encoding='utf-8-sig')
|
||
target_encoding = 'utf-8-sig'
|
||
except:
|
||
df_target = pd.read_csv(target_csv_path, encoding='gbk')
|
||
target_encoding = 'gbk'
|
||
|
||
print(f"✅ 成功读取目标文件:{target_csv_path}(编码:{target_encoding})")
|
||
print(f" 数据规模:{df_target.shape[0]}行 × {df_target.shape[1]}列")
|
||
|
||
# 读取源文件(已处理数组问题)
|
||
df_source = pd.read_csv(source_csv_path, encoding='utf-8-sig')
|
||
print(f"✅ 成功读取源文件:{source_csv_path}(编码:utf-8-sig)")
|
||
print(f" 数据规模:{df_source.shape[0]}行 × {df_source.shape[1]}列")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 文件读取失败:{str(e)}")
|
||
return False
|
||
|
||
# 字段映射(适配 data_NGV 格式)
|
||
print("\n=== 建立字段映射关系 ===")
|
||
field_mapping = {
|
||
'createTimeGMT': 'saas_create_time',
|
||
'modifiedTimeGMT': 'etl_time',
|
||
'title': 'org_remark',
|
||
'instanceStatus': 'active_status_fmt',
|
||
'processInstanceId': 'id_own_org',
|
||
'actionExecutor': 'technician',
|
||
'originator': 'salesmen',
|
||
'textField_kuj8nx00': 'province_name',
|
||
'textField_kuj8nx01': 'city_name'
|
||
}
|
||
|
||
# 筛选有效映射
|
||
valid_mapping = {}
|
||
for source_field, target_field in field_mapping.items():
|
||
if target_field in df_target.columns and source_field in df_source.columns:
|
||
valid_mapping[source_field] = target_field
|
||
print(f" {source_field} → {target_field}")
|
||
|
||
if len(valid_mapping) == 0:
|
||
print(f"⚠️ 警告:未找到有效字段映射,将仅填充默认值")
|
||
|
||
# 字段分类
|
||
target_columns = set(df_target.columns)
|
||
source_columns = set(df_source.columns)
|
||
additional_columns = list(source_columns - target_columns - set(valid_mapping.keys()))
|
||
print(f"\n=== 字段统计 ===")
|
||
print(f" 目标文件总字段数:{len(target_columns)}")
|
||
print(f" 源文件总字段数:{len(source_columns)}")
|
||
print(f" 有效映射字段数:{len(valid_mapping)}")
|
||
print(f" 额外字段数(放入单独文件):{len(additional_columns)}")
|
||
|
||
# 创建结果数据结构
|
||
df_main = df_target.iloc[0:0].copy()
|
||
df_main.insert(0, 'main_file_id', '') # 关联ID列
|
||
df_additional = pd.DataFrame(columns=['main_file_id'] + additional_columns)
|
||
|
||
# 数据处理工具函数(兼容字符串化的数组)
|
||
def extract_org_info(title_text):
|
||
if is_empty_value(title_text):
|
||
return '', ''
|
||
title_str = str(title_text).strip()
|
||
org_name_match = re.search(r'门店名称:([^,,\n]+)', title_str)
|
||
org_code_match = re.search(r'门店编码:([^,,\n]+)', title_str)
|
||
org_name = org_name_match.group(1).strip() if org_name_match else ''
|
||
org_code = org_code_match.group(1).strip() if org_code_match else ''
|
||
return org_name, org_code
|
||
|
||
def extract_technician(executor_text):
|
||
if is_empty_value(executor_text):
|
||
return ''
|
||
executor_str = str(executor_text).strip()
|
||
# 匹配字符串化的数组中的中文姓名(如 "{'nameInChinese':'何钊'}")
|
||
name_match = re.search(r"'nameInChinese':\s*'([^']+)'", executor_str)
|
||
return name_match.group(1).strip() if name_match else ''
|
||
|
||
def convert_gmt_time(gmt_str):
|
||
if is_empty_value(gmt_str):
|
||
return None
|
||
try:
|
||
time_str = str(gmt_str).replace('T', ' ').replace('Z', '').strip()
|
||
return pd.to_datetime(time_str)
|
||
except:
|
||
return None
|
||
|
||
# 逐行处理数据
|
||
print(f"\n=== 开始数据处理(共{len(df_source)}行) ===")
|
||
for idx, source_row in df_source.iterrows():
|
||
main_file_id = f"REC-{idx:06d}"
|
||
|
||
# 处理主文件数据
|
||
main_row = pd.Series(index=df_main.columns, dtype='object')
|
||
main_row['main_file_id'] = main_file_id
|
||
|
||
# 填充映射字段(处理字符串化的数组)
|
||
for source_field, target_field in valid_mapping.items():
|
||
value = source_row[source_field]
|
||
if not is_empty_value(value):
|
||
# 将字符串化的数组转为普通字符串(如 "a,b" → "a,b")
|
||
if isinstance(value, (list, tuple)):
|
||
main_row[target_field] = ','.join(map(str, value))
|
||
else:
|
||
main_row[target_field] = str(value).strip()
|
||
else:
|
||
main_row[target_field] = ''
|
||
|
||
# 处理关键业务字段
|
||
create_time = convert_gmt_time(source_row.get('createTimeGMT'))
|
||
if create_time:
|
||
main_row['date_fmt'] = create_time.strftime('%Y/%m/%d')
|
||
main_row['date_id'] = int(create_time.strftime('%Y%m%d'))
|
||
main_row['pt'] = main_row['date_id']
|
||
|
||
# 提取门店信息
|
||
org_name, org_code = extract_org_info(source_row.get('title'))
|
||
if org_name:
|
||
main_row['org_name'] = org_name
|
||
main_row['group_name'] = org_name
|
||
if org_code:
|
||
main_row['org_code'] = org_code
|
||
|
||
# 提取处理人
|
||
technician = extract_technician(source_row.get('actionExecutor'))
|
||
if technician:
|
||
main_row['technician'] = technician
|
||
|
||
# 填充默认值
|
||
default_values = {
|
||
'org_type': '一般',
|
||
'org_status': '留存',
|
||
'group_grade': '普通客户(VIP)',
|
||
'is_active': 1,
|
||
'active_status_fmt': '活跃',
|
||
'province_name': main_row.get('province_name', '未知'),
|
||
'city_name': main_row.get('city_name', '未知'),
|
||
'area_name': '未知',
|
||
'is_wechat': 0,
|
||
'is_mini_app': 0,
|
||
'id_own_group': 0,
|
||
'org_code': main_row.get('org_code', '')
|
||
}
|
||
|
||
for col, default_val in default_values.items():
|
||
if col in main_row.index and is_empty_value(main_row[col]):
|
||
main_row[col] = default_val
|
||
|
||
df_main.loc[idx] = main_row
|
||
|
||
# 处理额外字段文件
|
||
additional_row = pd.Series(index=df_additional.columns, dtype='object')
|
||
additional_row['main_file_id'] = main_file_id
|
||
|
||
for col in additional_columns:
|
||
if col in source_row.index:
|
||
value = source_row[col]
|
||
if not is_empty_value(value):
|
||
# 统一转为字符串存储(兼容数组)
|
||
if isinstance(value, (list, tuple)):
|
||
additional_row[col] = ','.join(map(str, value))
|
||
else:
|
||
additional_row[col] = str(value).strip()
|
||
else:
|
||
additional_row[col] = ''
|
||
|
||
df_additional.loc[idx] = additional_row
|
||
|
||
# 进度提示(大数据量优化)
|
||
if (idx + 1) % 500 == 0 or (idx + 1) == len(df_source):
|
||
print(f" 已处理 {idx + 1}/{len(df_source)} 行")
|
||
|
||
# 数据类型优化
|
||
print(f"\n=== 优化数据类型 ===")
|
||
numeric_cols = ['date_id', 'pt', 'is_active', 'is_wechat', 'is_mini_app',
|
||
'id_own_group', 'active_user_count', 'limit_user_count']
|
||
|
||
for col in numeric_cols:
|
||
if col in df_main.columns:
|
||
# 处理字符串格式的数值
|
||
df_main[col] = pd.to_numeric(
|
||
df_main[col].astype(str).str.replace(',', ''), # 移除数组转字符串的逗号
|
||
errors='coerce'
|
||
).fillna(0).astype(int)
|
||
|
||
# 填充空值
|
||
df_main = df_main.fillna('')
|
||
df_additional = df_additional.fillna('')
|
||
|
||
# 保存文件
|
||
print(f"\n=== 保存结果文件 ===")
|
||
try:
|
||
# 保存主文件(匹配 data_NGV 格式)
|
||
df_main.to_csv(
|
||
main_output_path,
|
||
index=False,
|
||
encoding='utf-8-sig',
|
||
na_rep='',
|
||
errors='replace'
|
||
)
|
||
print(f"✅ 主文件保存成功: {main_output_path}(编码:utf-8-sig)")
|
||
|
||
# 保存额外字段文件
|
||
df_additional.to_csv(
|
||
additional_output_path,
|
||
index=False,
|
||
encoding='utf-8-sig',
|
||
na_rep='',
|
||
errors='replace'
|
||
)
|
||
print(f"✅ 额外字段文件保存成功: {additional_output_path}(编码:utf-8-sig)")
|
||
|
||
# 生成说明文档
|
||
generate_relation_doc(doc_output_path, main_output_path, additional_output_path,
|
||
len(target_columns), len(source_columns), len(valid_mapping))
|
||
print(f"✅ 关联说明文档保存成功: {doc_output_path}")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 文件保存失败: {str(e)}")
|
||
return False
|
||
|
||
|
||
# ==============================
|
||
# 辅助函数:生成关联说明文档
|
||
# ==============================
|
||
def generate_relation_doc(doc_path, main_file, additional_file, target_col_count, source_col_count, mapping_count):
|
||
doc_content = f"""# 数据格式转换结果说明
|
||
## 目标格式文件:data_NGV.csv
|
||
|
||
### 一、文件概述
|
||
1. **主文件(匹配目标格式)**
|
||
- 文件名:{os.path.basename(main_file)}
|
||
- 格式来源:完全匹配 data_NGV.csv 结构
|
||
- 数据规模:{pd.read_csv(main_file, encoding='utf-8-sig').shape[0]}行 × {target_col_count + 1}列
|
||
- 编码格式:UTF-8-SIG(兼容所有字符和数组数据)
|
||
|
||
2. **额外字段文件**
|
||
- 文件名:{os.path.basename(additional_file)}
|
||
- 包含内容:源文件中 data_NGV.csv 没有的字段
|
||
- 数据规模:{pd.read_csv(additional_file, encoding='utf-8-sig').shape[0]}行 × {pd.read_csv(additional_file, encoding='utf-8-sig').shape[1]}列
|
||
- 编码格式:UTF-8-SIG
|
||
|
||
### 二、关键处理说明
|
||
1. **数组数据处理**:源文件中的数组(如 ['a','b'])已转为字符串("a,b")存储,避免格式错误
|
||
2. **特殊字符清理**:自动移除无法编码的控制字符(如右至左标记 \u202d)
|
||
3. **编码统一**:所有文件使用 UTF-8-SIG 编码,兼容中文和特殊字符
|
||
|
||
### 三、关联方法
|
||
1. **关联字段**:`main_file_id`(格式:REC-000000)
|
||
2. **Excel导入**:数据→自文本/CSV→选择文件→编码选择"UTF-8"→完成
|
||
|
||
### 四、使用建议
|
||
1. 主文件可直接用于业务系统,与 data_NGV.csv 格式完全兼容
|
||
2. 额外字段文件用于原始数据追溯,通过 main_file_id 关联
|
||
3. 数组转字符串后的数据可通过 Excel 的"文本分列"功能恢复为数组
|
||
"""
|
||
with open(doc_path, 'w', encoding='utf-8') as f:
|
||
f.write(doc_content)
|
||
|
||
|
||
# ==============================
|
||
# 主执行函数
|
||
# ==============================
|
||
def main():
|
||
# 配置文件路径(请根据您的实际位置修改!)
|
||
base_dir = "D:\\Idea Project\\SaaS_V1.7\\test\\output"
|
||
target_csv_path = "D:\\Idea Project\\SaaS_V1.7\\test\\output\\data_NGV.csv"
|
||
|
||
# 生成 expanded_yd_data.csv(解决数组问题)
|
||
expanded_csv_path = generate_expanded_csv(base_dir)
|
||
if not expanded_csv_path:
|
||
print("❌ 生成 expanded_yd_data.csv 失败,终止转换")
|
||
return
|
||
|
||
# 配置输出路径
|
||
main_output_path = os.path.join(base_dir, "主文件_匹配data_NGV格式.csv")
|
||
additional_output_path = os.path.join(base_dir, "额外字段文件_源文件特有列.csv")
|
||
doc_output_path = os.path.join(base_dir, "格式转换说明.md")
|
||
|
||
# 执行转换
|
||
success = convert_to_data_ngv_format(
|
||
source_csv_path=expanded_csv_path,
|
||
target_csv_path=target_csv_path,
|
||
main_output_path=main_output_path,
|
||
additional_output_path=additional_output_path,
|
||
doc_output_path=doc_output_path
|
||
)
|
||
|
||
if success:
|
||
print(f"\n🎉 格式转换全部完成!")
|
||
print(f"📁 输出目录:{base_dir}")
|
||
print(f"🔔 重要:Excel导入时需选择'UTF-8'编码,数组数据已转为逗号分隔字符串")
|
||
else:
|
||
print(f"\n❌ 格式转换失败,请查看日志信息排查问题")
|
||
|
||
|
||
# ==============================
|
||
# 执行入口
|
||
# ==============================
|
||
if __name__ == "__main__":
|
||
# API模块导入(不影响核心功能)
|
||
try:
|
||
from yd_api import YDAPI
|
||
from api import API
|
||
|
||
print("✅ 成功导入 API 模块")
|
||
except ImportError:
|
||
print("⚠️ 警告:未找到 yd_api 或 api 模块,跳过API初始化(不影响格式转换)")
|
||
|
||
# 执行主流程
|
||
main() |