Files
F6--/张阳脚本/工具/单元格包含多行文本拆分.py
2026-01-30 11:28:35 +08:00

352 lines
15 KiB
Python

import tkinter as tk
from tkinter import filedialog, messagebox, ttk
import pandas as pd
from pathlib import Path
from tqdm import tqdm
import threading
class ExcelProcessorApp:
def __init__(self, root):
self.root = root
self.root.title("Excel 数据处理工具")
# 创建主框架
self.main_frame = tk.Frame(root)
self.main_frame.pack(padx=10, pady=10)
# 文件选择部分
self.file_label = tk.Label(self.main_frame, text="选择 Excel 文件:")
self.file_label.grid(row=0, column=0, sticky=tk.W)
self.file_entry = tk.Entry(self.main_frame, width=50)
self.file_entry.grid(row=0, column=1, padx=5)
self.file_button = tk.Button(self.main_frame, text="浏览", command=self.select_file)
self.file_button.grid(row=0, column=2)
# Sheet 选择部分
self.sheet_label = tk.Label(self.main_frame, text="选择 Sheet:")
self.sheet_label.grid(row=1, column=0, sticky=tk.W)
self.sheet_combobox = ttk.Combobox(self.main_frame, state="readonly")
self.sheet_combobox.grid(row=1, column=1, padx=5)
self.sheet_combobox.bind("<<ComboboxSelected>>", self.update_columns)
# 工单号选择
self.key_column_label = tk.Label(self.main_frame, text="选择工单号列:")
self.key_column_label.grid(row=2, column=0, sticky=tk.W)
self.key_column_combobox = ttk.Combobox(self.main_frame, state="readonly")
self.key_column_combobox.grid(row=2, column=1, padx=5)
# 添加组部分
self.groups_frame = tk.LabelFrame(self.main_frame, text="添加处理组")
self.groups_frame.grid(row=4, column=0, columnspan=2, pady=10, sticky=tk.W + tk.E)
# 创建 Canvas 和 Scrollbar
self.canvas = tk.Canvas(self.groups_frame)
self.scrollbar = tk.Scrollbar(self.groups_frame, orient="vertical", command=self.canvas.yview)
self.scrollable_frame = tk.Frame(self.canvas)
self.scrollable_frame.bind(
"<Configure>",
lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all"))
)
self.canvas.create_window((0, 0), window=self.scrollable_frame, anchor="nw")
self.canvas.configure(yscrollcommand=self.scrollbar.set)
self.canvas.pack(side="left", fill="both", expand=True)
self.scrollbar.pack(side="right", fill="y")
# 将 "添加组" 按钮移到 Canvas 外部
self.add_group_button = tk.Button(self.groups_frame, text="添加组", command=self.add_group)
self.add_group_button.pack(pady=5) # 放置在 Canvas 下方
# 运行按钮
self.run_button = tk.Button(self.main_frame, text="运行", command=self.run_processing)
self.run_button.grid(row=5, column=0, columnspan=2, pady=10)
# 运行明细部分
self.progress_frame = tk.LabelFrame(self.main_frame, text="运行明细")
self.progress_frame.grid(row=6, column=0, columnspan=2, pady=10, sticky=tk.W + tk.E)
self.progress_bar = ttk.Progressbar(self.progress_frame, mode='determinate')
self.progress_bar.pack(fill=tk.X, pady=5)
self.status_label = tk.Label(self.progress_frame, text="")
self.status_label.pack()
# 存储文件路径
self.output_file_path = None
# 存储列选择
self.selected_columns = []
self.group_frames = []
# 初始化变量
self.df = None
self.sheet_names = []
self.column_names = []
# 默认添加第一个组
self.add_group(is_first=True)
def select_file(self):
"""选择 Excel 文件"""
file_path = filedialog.askopenfilename()
if file_path:
self.file_entry.delete(0, tk.END)
self.file_entry.insert(0, file_path)
self.load_sheets(file_path)
def load_sheets(self, file_path):
"""加载 Excel 文件中的 Sheet 名"""
try:
self.df = pd.read_excel(file_path, sheet_name=None)
self.sheet_names = list(self.df.keys())
self.sheet_combobox['values'] = self.sheet_names
if self.sheet_names:
self.sheet_combobox.current(0) # 默认选择第一个 Sheet
self.update_columns(None)
except Exception as e:
messagebox.showerror("错误", f"无法加载文件: {e}")
def update_columns(self, event):
"""更新列选择列表"""
selected_sheet = self.sheet_combobox.get()
if selected_sheet:
self.df_selected = self.df[selected_sheet]
self.column_names = list(self.df_selected.columns)
self.key_column_combobox['values'] = self.column_names
if self.column_names:
self.key_column_combobox.current(0) # 默认选择第一个列作为工单号
def add_group(self, is_first=False):
"""添加一个新的处理组"""
group_index = len(self.group_frames) + 1
group_frame = tk.Frame(self.scrollable_frame)
# 使用 grid 布局管理器
row_index = len(self.group_frames) + 1 # 确保每个组在新的一行
group_frame.grid(row=row_index, column=0, columnspan=2, pady=5, sticky=tk.W + tk.E)
group_label = tk.Label(group_frame, text=f"{group_index}:")
group_label.grid(row=0, column=0, padx=5, sticky=tk.W)
# 如果是第一个组,默认显示“选择列”按钮
if is_first or not self.group_frames:
select_columns_button = tk.Button(group_frame, text="选择列",
command=lambda: self.open_group_column_selection_window(group_frame))
select_columns_button.grid(row=0, column=1, padx=5, sticky=tk.W)
else:
select_columns_button = tk.Button(group_frame, text="选择列",
command=lambda: self.open_group_column_selection_window(group_frame))
select_columns_button.grid(row=0, column=1, padx=5, sticky=tk.W)
remove_button = tk.Button(group_frame, text="移除", command=lambda: self.remove_group(group_frame))
remove_button.grid(row=0, column=2, padx=5, sticky=tk.E)
self.group_frames.append((group_frame, select_columns_button))
# 更新 Canvas 的滚动区域
self.scrollable_frame.update_idletasks()
self.canvas.config(scrollregion=self.canvas.bbox("all"))
def open_group_column_selection_window(self, group_frame):
"""打开组的列选择窗口"""
if not self.column_names:
messagebox.showwarning("警告", "请选择一个有效的 Sheet")
return
# 创建新的窗口用于选择列
column_selection_window = tk.Toplevel(self.root)
column_selection_window.title(f"选择组 {len(self.group_frames)} 的列")
# 创建多选列表框
columns_listbox = tk.Listbox(column_selection_window, selectmode=tk.MULTIPLE, height=10, exportselection=False)
for col in self.column_names:
columns_listbox.insert(tk.END, col)
columns_listbox.pack(pady=5, fill=tk.BOTH, expand=True)
# 确认按钮
confirm_button = tk.Button(column_selection_window, text="确认",
command=lambda: self.confirm_group_column_selection(columns_listbox, group_frame,
column_selection_window))
confirm_button.pack(pady=5)
def confirm_group_column_selection(self, columns_listbox, group_frame, window):
"""确认组的列选择"""
selected_columns = [self.column_names[i] for i in columns_listbox.curselection()]
if not selected_columns:
messagebox.showwarning("警告", "请选择至少一列")
return
# 更新组的列选择
group_frame.children['!label'].config(text=f"{len(self.group_frames)}: {', '.join(selected_columns)}")
window.destroy()
def remove_group(self, group_frame):
"""移除一个处理组"""
for frame, button in self.group_frames:
if frame == group_frame:
frame.destroy()
self.group_frames.remove((frame, button))
break
# 重新调整剩余组的行号
for i, (frame, button) in enumerate(self.group_frames, start=1):
frame.grid(row=i, column=0, columnspan=2, pady=5, sticky=tk.W + tk.E)
frame.children['!label'].config(text=f"{i}:") # 更新组标签
# 更新 Canvas 的滚动区域
self.scrollable_frame.update_idletasks()
self.canvas.config(scrollregion=self.canvas.bbox("all"))
def run_processing(self):
"""运行数据处理"""
if self.df_selected is None or self.df_selected.empty:
messagebox.showwarning("警告", "请选择一个有效的 Sheet")
return
# 获取用户选择的工单号列
key_column = self.key_column_combobox.get()
if not key_column:
messagebox.showwarning("警告", "请选择工单号列")
return
# 获取用户选择的组
group_columns = {}
for frame, button in self.group_frames:
group_label = frame.children['!label']
group_text = group_label.cget("text")
if "" in group_text:
group_name, columns_text = group_text.split(": ")
columns = [col.strip() for col in columns_text.split(",")]
if columns:
group_columns[group_name] = columns
# 检查是否有重复的组名
if len(group_columns) != len(set(group_columns)):
messagebox.showwarning("警告", "组名不能重复")
return
# 确保输出文件路径
output_dir = Path(self.file_entry.get()).parent
output_file_name = "拆分后_明细统计.xlsx"
self.output_file_path = output_dir / output_file_name
# 使用线程运行处理函数,避免阻塞 UI
self.run_button.config(state=tk.DISABLED)
self.progress_bar['value'] = 0
self.status_label.config(text="正在处理数据...")
self.root.update_idletasks()
processing_thread = threading.Thread(target=self.process_data, args=(key_column, group_columns))
processing_thread.start()
def process_data(self, key_column, group_columns):
"""处理数据的函数"""
try:
# 处理数据
normal_df, abnormal_df, abnormal_normal_df, abnormal_original_rows = self.split_rows(self.df_selected,
key_column,
group_columns)
# 更新状态为“正在保存”
self.status_label.config(text="正在保存...")
self.root.update_idletasks()
# 写入新的 Excel 文件
with pd.ExcelWriter(self.output_file_path, engine='openpyxl') as writer:
normal_df.to_excel(writer, sheet_name='正常数据', index=False)
abnormal_df.to_excel(writer, sheet_name='异常数据', index=False)
abnormal_normal_df.to_excel(writer, sheet_name='异常工单的正常数据', index=False)
abnormal_original_rows.to_excel(writer, sheet_name='异常工单的原始行', index=False)
# 更新状态为“处理完成”
self.status_label.config(text=f"处理完成,文件已保存到: {self.output_file_path}")
self.root.after(0, lambda: messagebox.showinfo("成功", f"文件已保存到: {self.output_file_path}"))
except Exception as e:
self.status_label.config(text=f"处理失败: {e}")
self.root.after(0, lambda error=e: messagebox.showerror("错误", f"处理失败: {e}"))
finally:
self.run_button.config(state=tk.NORMAL)
def split_rows(self, df, key_column, group_columns):
"""修复第一行第一个单元格未被拆分的问题"""
print(f"开始拆分数据,关键列: {key_column}") # 添加日志
normal_rows = []
abnormal_rows = []
abnormal_key_values = set() # 用于存储异常工单号
abnormal_original_rows = [] # 用于存储异常工单的原始行
total_rows = len(df)
self.progress_bar['maximum'] = total_rows
for index, row in df.iterrows():
all_data = {}
max_length = 0
# 遍历每个组
for group_name, columns in group_columns.items():
group_data = {
col: row[col].split('\r\n') if isinstance(row[col], str) and pd.notna(row[col]) else [row[col]]
for col in columns}
all_data.update(group_data)
max_length = max(max_length, max(len(data) for data in group_data.values()))
# 确保第一个单元格也被正确拆分
if index == 0: # 第一行的特殊处理
first_cell_data = row[next(iter(all_data.keys()))] # 获取第一个单元格的数据
if isinstance(first_cell_data, str) and '\r\n' in first_cell_data:
all_data[next(iter(all_data.keys()))] = first_cell_data.split('\r\n')
max_length = max(max_length, len(all_data[next(iter(all_data.keys()))]))
# 记录当前行的工单号
key_value = row[key_column]
# 将所有列的数据扩展到最大长度
for i in range(max_length):
new_row = row.copy()
for col, data in all_data.items():
# 如果数据长度不足,填充为 None
new_row[col] = data[i] if i < len(data) else None
# 检查是否有异常情况
non_null_values = [new_row[col] for col in all_data.keys() if pd.notna(new_row[col])]
if len(non_null_values) == 1:
# 如果这一行只有单个非空值,则记录为异常数据
abnormal_row = {
key_column: key_value,
'异常列': list(all_data.keys())[0],
'异常值': non_null_values[0],
'原始行': row.to_dict() # 记录原始行的所有信息
}
abnormal_rows.append(abnormal_row)
abnormal_key_values.add(key_value) # 记录异常工单号
abnormal_original_rows.append(row) # 记录异常工单的原始行
else:
normal_rows.append(new_row)
# 更新进度条
self.progress_bar['value'] = index + 1
root.update_idletasks()
# 一次性将所有正常和异常的行合并为新的 DataFrame
normal_df = pd.DataFrame(normal_rows)
abnormal_df = pd.DataFrame(abnormal_rows)
abnormal_original_rows_df = pd.DataFrame(abnormal_original_rows)
# 提取异常工单号对应的正常数据
abnormal_normal_df = normal_df[normal_df[key_column].isin(abnormal_key_values)]
# 从正常数据中移除异常工单号的数据
normal_df = normal_df[~normal_df[key_column].isin(abnormal_key_values)]
print("拆分完成") # 添加日志
return normal_df, abnormal_df, abnormal_normal_df, abnormal_original_rows_df
if __name__ == "__main__":
root = tk.Tk()
app = ExcelProcessorApp(root)
root.mainloop()