Files
saas/test/宜搭续约待办转换简道云数据源.py
2026-01-14 15:13:44 +08:00

524 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import pandas as pd
import json
import ast
import re
from datetime import datetime
from chardet import detect
import unicodedata
# ==============================
# 核心修复:处理数组类型数据的工具函数
# ==============================
def is_empty_value(value):
"""判断值是否为空(支持数组、字符串、数值等所有类型)"""
if pd.isna(value):
return True
if value is None:
return True
if isinstance(value, str) and value.strip() == '':
return True
if isinstance(value, (list, tuple, set)) and len(value) == 0:
return True
return False
def clean_special_characters(value):
"""清理特殊字符(支持数组类型,避免歧义错误)"""
# 处理空值
if is_empty_value(value):
return value
# 处理数组类型(如 ['a', 'b']
if isinstance(value, (list, tuple)):
cleaned_list = []
for item in value:
if isinstance(item, str):
cleaned_list.append(_clean_single_string(item))
else:
cleaned_list.append(item)
return cleaned_list
# 处理字符串类型
elif isinstance(value, str):
return _clean_single_string(value)
# 其他类型(数值、布尔等)直接返回
else:
return value
def _clean_single_string(text):
"""清理单个字符串的特殊字符(内部调用)"""
try:
# 移除控制字符(保留换行、制表符)
cleaned = ''.join(
char for char in text
if unicodedata.category(char)[0] != 'C' or char in '\n\t'
)
# 移除特定无法编码的字符(如右至左标记)
special_chars = ['\u202d', '\u202c', '\u202a', '\u202b', '\u200b']
for char in special_chars:
cleaned = cleaned.replace(char, '')
# 替换全角空格为半角空格
cleaned = cleaned.replace('\u3000', ' ')
return cleaned
except:
return text
def get_safe_encoding(file_path=None):
"""获取安全的编码格式(优先UTF-8-SIG)"""
if file_path and os.path.exists(file_path):
with open(file_path, 'rb') as f:
raw_data = f.read(10000)
result = detect(raw_data)
detected = result['encoding']
if detected in ['utf-8', 'utf-8-sig', 'gbk', 'gb2312']:
return 'utf-8-sig'
return 'utf-8-sig'
# ==============================
# 第一步:生成 expanded_yd_data.csv(彻底修复数组问题)
# ==============================
def generate_expanded_csv(output_dir):
"""生成展开后的源文件(支持数组类型数据)"""
os.makedirs(output_dir, exist_ok=True)
converted_csv_path = os.path.join(output_dir, "converted_yd_data.csv")
if not os.path.exists(converted_csv_path):
print(f"❌ 错误:converted_yd_data.csv 不存在,路径:{converted_csv_path}")
return None
# 读取文件(兼容不同编码)
encoding = get_safe_encoding(converted_csv_path)
try:
df = pd.read_csv(converted_csv_path, encoding=encoding)
print(f"✅ 成功读取 converted_yd_data.csv(编码:{encoding}),数据规模:{df.shape[0]}× {df.shape[1]}")
except Exception as e:
print(f"❌ 读取 converted_yd_data.csv 失败:{str(e)}")
return None
if 'data' not in df.columns:
print(f"❌ 错误:converted_yd_data.csv 中缺少 'data'")
return None
# 清理非data列的特殊字符(仅处理字符串列)
non_data_cols = [col for col in df.columns if col != 'data']
for col in non_data_cols:
if df[col].dtype == 'object': # 仅处理字符串类型列
df[col] = df[col].apply(clean_special_characters)
# 检查并解析 data 列(核心:处理数组类型)
sample = df['data'].dropna().iloc[0] if not df['data'].dropna().empty else ""
print("Sample of 'data' column (after cleaning):")
print(repr(str(sample))[:200]) # 转为字符串避免数组打印过长
if isinstance(sample, str):
print("Detected string format, parsing with ast.literal_eval...")
def safe_literal_eval(x):
if is_empty_value(x):
return {}
try:
# 先清理字符串,再解析
cleaned_x = clean_special_characters(x) if isinstance(x, str) else str(x)
parsed = ast.literal_eval(cleaned_x)
# 解析后再次清理(处理数组中的特殊字符)
return clean_special_characters(parsed)
except (ValueError, SyntaxError, TypeError) as e:
print(f"Parse error on: {repr(str(x))[:100]}... Error: {str(e)[:50]}")
return {}
df['data'] = df['data'].apply(safe_literal_eval)
# 展开 data 列(处理数组类型,转为字符串存储)
def flatten_expanded_data(expanded_df):
"""展平数据,将数组转为字符串"""
for col in expanded_df.columns:
if expanded_df[col].dtype == 'object':
expanded_df[col] = expanded_df[col].apply(
lambda x: ','.join(map(str, x)) if isinstance(x, (list, tuple)) else x
)
return expanded_df
expanded = pd.json_normalize(df['data'])
expanded = flatten_expanded_data(expanded) # 数组转字符串(如 ['a','b'] → "a,b"
# 清理展开后的数据
for col in expanded.columns:
expanded[col] = expanded[col].apply(clean_special_characters)
# 合并数据
other_cols = df.drop(columns=['data'])
new_df = pd.concat([other_cols.reset_index(drop=True), expanded.reset_index(drop=True)], axis=1)
# 数据过滤(修复数组转字符串后的判断逻辑)
if 'instanceStatus' in new_df.columns:
# 确保过滤条件处理字符串
new_df['instanceStatus'] = new_df['instanceStatus'].astype(str)
new_df = new_df[new_df["instanceStatus"].str.strip() == "RUNNING"]
print(f"✅ 过滤后(instanceStatus=RUNNING):{new_df.shape[0]}")
else:
print(f"⚠️ 警告:缺少 'instanceStatus' 列,跳过状态过滤")
col = "textField_kto3q3ev"
if col in new_df.columns:
# 处理数组转字符串后的空值判断
new_df[col] = new_df[col].apply(
lambda x: ','.join(map(str, x)) if isinstance(x, (list, tuple)) else x
).astype(str)
mask2 = (new_df[col].str.strip() == "") | (new_df[col].str.strip() == "nan")
new_df = new_df[mask2]
print(f"✅ 过滤后({col}为空):{new_df.shape[0]}")
else:
print(f"⚠️ 警告:缺少 '{col}' 列,跳过订单编码过滤")
# 保存文件(UTF-8-SIG编码)
expanded_csv_path = os.path.join(output_dir, "expanded_yd_data.csv")
try:
new_df.to_csv(
expanded_csv_path,
index=False,
encoding='utf-8-sig',
na_rep='',
errors='replace'
)
print(f"✅ Expanded data saved to: {expanded_csv_path}(编码:utf-8-sig")
return expanded_csv_path
except Exception as e:
print(f"❌ 保存 expanded_yd_data.csv 失败:{str(e)}")
return None
# ==============================
# 第二步:格式转换核心函数(兼容数组处理结果)
# ==============================
def convert_to_data_ngv_format(
source_csv_path,
target_csv_path,
main_output_path,
additional_output_path,
doc_output_path
):
print("\n=== 开始格式转换(目标格式:data_NGV.csv===")
try:
# 读取目标文件
if not os.path.exists(target_csv_path):
raise FileNotFoundError(f"目标文件不存在:{target_csv_path}")
# 兼容目标文件的编码
try:
df_target = pd.read_csv(target_csv_path, encoding='utf-8-sig')
target_encoding = 'utf-8-sig'
except:
df_target = pd.read_csv(target_csv_path, encoding='gbk')
target_encoding = 'gbk'
print(f"✅ 成功读取目标文件:{target_csv_path}(编码:{target_encoding}")
print(f" 数据规模:{df_target.shape[0]}× {df_target.shape[1]}")
# 读取源文件(已处理数组问题)
df_source = pd.read_csv(source_csv_path, encoding='utf-8-sig')
print(f"✅ 成功读取源文件:{source_csv_path}(编码:utf-8-sig")
print(f" 数据规模:{df_source.shape[0]}× {df_source.shape[1]}")
except Exception as e:
print(f"❌ 文件读取失败:{str(e)}")
return False
# 字段映射(适配 data_NGV 格式)
print("\n=== 建立字段映射关系 ===")
field_mapping = {
'createTimeGMT': 'saas_create_time',
'modifiedTimeGMT': 'etl_time',
'title': 'org_remark',
'instanceStatus': 'active_status_fmt',
'processInstanceId': 'id_own_org',
'actionExecutor': 'technician',
'originator': 'salesmen',
'textField_kuj8nx00': 'province_name',
'textField_kuj8nx01': 'city_name'
}
# 筛选有效映射
valid_mapping = {}
for source_field, target_field in field_mapping.items():
if target_field in df_target.columns and source_field in df_source.columns:
valid_mapping[source_field] = target_field
print(f" {source_field}{target_field}")
if len(valid_mapping) == 0:
print(f"⚠️ 警告:未找到有效字段映射,将仅填充默认值")
# 字段分类
target_columns = set(df_target.columns)
source_columns = set(df_source.columns)
additional_columns = list(source_columns - target_columns - set(valid_mapping.keys()))
print(f"\n=== 字段统计 ===")
print(f" 目标文件总字段数:{len(target_columns)}")
print(f" 源文件总字段数:{len(source_columns)}")
print(f" 有效映射字段数:{len(valid_mapping)}")
print(f" 额外字段数(放入单独文件):{len(additional_columns)}")
# 创建结果数据结构
df_main = df_target.iloc[0:0].copy()
df_main.insert(0, 'main_file_id', '') # 关联ID列
df_additional = pd.DataFrame(columns=['main_file_id'] + additional_columns)
# 数据处理工具函数(兼容字符串化的数组)
def extract_org_info(title_text):
if is_empty_value(title_text):
return '', ''
title_str = str(title_text).strip()
org_name_match = re.search(r'门店名称:([^,\n]+)', title_str)
org_code_match = re.search(r'门店编码:([^,\n]+)', title_str)
org_name = org_name_match.group(1).strip() if org_name_match else ''
org_code = org_code_match.group(1).strip() if org_code_match else ''
return org_name, org_code
def extract_technician(executor_text):
if is_empty_value(executor_text):
return ''
executor_str = str(executor_text).strip()
# 匹配字符串化的数组中的中文姓名(如 "{'nameInChinese':'何钊'}"
name_match = re.search(r"'nameInChinese':\s*'([^']+)'", executor_str)
return name_match.group(1).strip() if name_match else ''
def convert_gmt_time(gmt_str):
if is_empty_value(gmt_str):
return None
try:
time_str = str(gmt_str).replace('T', ' ').replace('Z', '').strip()
return pd.to_datetime(time_str)
except:
return None
# 逐行处理数据
print(f"\n=== 开始数据处理(共{len(df_source)}行) ===")
for idx, source_row in df_source.iterrows():
main_file_id = f"REC-{idx:06d}"
# 处理主文件数据
main_row = pd.Series(index=df_main.columns, dtype='object')
main_row['main_file_id'] = main_file_id
# 填充映射字段(处理字符串化的数组)
for source_field, target_field in valid_mapping.items():
value = source_row[source_field]
if not is_empty_value(value):
# 将字符串化的数组转为普通字符串(如 "a,b" → "a,b"
if isinstance(value, (list, tuple)):
main_row[target_field] = ','.join(map(str, value))
else:
main_row[target_field] = str(value).strip()
else:
main_row[target_field] = ''
# 处理关键业务字段
create_time = convert_gmt_time(source_row.get('createTimeGMT'))
if create_time:
main_row['date_fmt'] = create_time.strftime('%Y/%m/%d')
main_row['date_id'] = int(create_time.strftime('%Y%m%d'))
main_row['pt'] = main_row['date_id']
# 提取门店信息
org_name, org_code = extract_org_info(source_row.get('title'))
if org_name:
main_row['org_name'] = org_name
main_row['group_name'] = org_name
if org_code:
main_row['org_code'] = org_code
# 提取处理人
technician = extract_technician(source_row.get('actionExecutor'))
if technician:
main_row['technician'] = technician
# 填充默认值
default_values = {
'org_type': '一般',
'org_status': '留存',
'group_grade': '普通客户(VIP',
'is_active': 1,
'active_status_fmt': '活跃',
'province_name': main_row.get('province_name', '未知'),
'city_name': main_row.get('city_name', '未知'),
'area_name': '未知',
'is_wechat': 0,
'is_mini_app': 0,
'id_own_group': 0,
'org_code': main_row.get('org_code', '')
}
for col, default_val in default_values.items():
if col in main_row.index and is_empty_value(main_row[col]):
main_row[col] = default_val
df_main.loc[idx] = main_row
# 处理额外字段文件
additional_row = pd.Series(index=df_additional.columns, dtype='object')
additional_row['main_file_id'] = main_file_id
for col in additional_columns:
if col in source_row.index:
value = source_row[col]
if not is_empty_value(value):
# 统一转为字符串存储(兼容数组)
if isinstance(value, (list, tuple)):
additional_row[col] = ','.join(map(str, value))
else:
additional_row[col] = str(value).strip()
else:
additional_row[col] = ''
df_additional.loc[idx] = additional_row
# 进度提示(大数据量优化)
if (idx + 1) % 500 == 0 or (idx + 1) == len(df_source):
print(f" 已处理 {idx + 1}/{len(df_source)}")
# 数据类型优化
print(f"\n=== 优化数据类型 ===")
numeric_cols = ['date_id', 'pt', 'is_active', 'is_wechat', 'is_mini_app',
'id_own_group', 'active_user_count', 'limit_user_count']
for col in numeric_cols:
if col in df_main.columns:
# 处理字符串格式的数值
df_main[col] = pd.to_numeric(
df_main[col].astype(str).str.replace(',', ''), # 移除数组转字符串的逗号
errors='coerce'
).fillna(0).astype(int)
# 填充空值
df_main = df_main.fillna('')
df_additional = df_additional.fillna('')
# 保存文件
print(f"\n=== 保存结果文件 ===")
try:
# 保存主文件(匹配 data_NGV 格式)
df_main.to_csv(
main_output_path,
index=False,
encoding='utf-8-sig',
na_rep='',
errors='replace'
)
print(f"✅ 主文件保存成功: {main_output_path}(编码:utf-8-sig")
# 保存额外字段文件
df_additional.to_csv(
additional_output_path,
index=False,
encoding='utf-8-sig',
na_rep='',
errors='replace'
)
print(f"✅ 额外字段文件保存成功: {additional_output_path}(编码:utf-8-sig")
# 生成说明文档
generate_relation_doc(doc_output_path, main_output_path, additional_output_path,
len(target_columns), len(source_columns), len(valid_mapping))
print(f"✅ 关联说明文档保存成功: {doc_output_path}")
return True
except Exception as e:
print(f"❌ 文件保存失败: {str(e)}")
return False
# ==============================
# 辅助函数:生成关联说明文档
# ==============================
def generate_relation_doc(doc_path, main_file, additional_file, target_col_count, source_col_count, mapping_count):
doc_content = f"""# 数据格式转换结果说明
## 目标格式文件:data_NGV.csv
### 一、文件概述
1. **主文件(匹配目标格式)**
- 文件名:{os.path.basename(main_file)}
- 格式来源:完全匹配 data_NGV.csv 结构
- 数据规模:{pd.read_csv(main_file, encoding='utf-8-sig').shape[0]}× {target_col_count + 1}
- 编码格式:UTF-8-SIG(兼容所有字符和数组数据)
2. **额外字段文件**
- 文件名:{os.path.basename(additional_file)}
- 包含内容:源文件中 data_NGV.csv 没有的字段
- 数据规模:{pd.read_csv(additional_file, encoding='utf-8-sig').shape[0]}× {pd.read_csv(additional_file, encoding='utf-8-sig').shape[1]}
- 编码格式:UTF-8-SIG
### 二、关键处理说明
1. **数组数据处理**:源文件中的数组(如 ['a','b'])已转为字符串("a,b")存储,避免格式错误
2. **特殊字符清理**:自动移除无法编码的控制字符(如右至左标记 \u202d
3. **编码统一**:所有文件使用 UTF-8-SIG 编码,兼容中文和特殊字符
### 三、关联方法
1. **关联字段**`main_file_id`(格式:REC-000000
2. **Excel导入**:数据→自文本/CSV→选择文件→编码选择"UTF-8"→完成
### 四、使用建议
1. 主文件可直接用于业务系统,与 data_NGV.csv 格式完全兼容
2. 额外字段文件用于原始数据追溯,通过 main_file_id 关联
3. 数组转字符串后的数据可通过 Excel 的"文本分列"功能恢复为数组
"""
with open(doc_path, 'w', encoding='utf-8') as f:
f.write(doc_content)
# ==============================
# 主执行函数
# ==============================
def main():
# 配置文件路径(请根据您的实际位置修改!)
base_dir = "D:\\Idea Project\\SaaS_V1.7\\test\\output"
target_csv_path = "D:\\Idea Project\\SaaS_V1.7\\test\\output\\data_NGV.csv"
# 生成 expanded_yd_data.csv(解决数组问题)
expanded_csv_path = generate_expanded_csv(base_dir)
if not expanded_csv_path:
print("❌ 生成 expanded_yd_data.csv 失败,终止转换")
return
# 配置输出路径
main_output_path = os.path.join(base_dir, "主文件_匹配data_NGV格式.csv")
additional_output_path = os.path.join(base_dir, "额外字段文件_源文件特有列.csv")
doc_output_path = os.path.join(base_dir, "格式转换说明.md")
# 执行转换
success = convert_to_data_ngv_format(
source_csv_path=expanded_csv_path,
target_csv_path=target_csv_path,
main_output_path=main_output_path,
additional_output_path=additional_output_path,
doc_output_path=doc_output_path
)
if success:
print(f"\n🎉 格式转换全部完成!")
print(f"📁 输出目录:{base_dir}")
print(f"🔔 重要:Excel导入时需选择'UTF-8'编码,数组数据已转为逗号分隔字符串")
else:
print(f"\n❌ 格式转换失败,请查看日志信息排查问题")
# ==============================
# 执行入口
# ==============================
if __name__ == "__main__":
# API模块导入(不影响核心功能)
try:
from yd_api import YDAPI
from api import API
print("✅ 成功导入 API 模块")
except ImportError:
print("⚠️ 警告:未找到 yd_api 或 api 模块,跳过API初始化(不影响格式转换)")
# 执行主流程
main()