Files
2026-01-30 11:28:35 +08:00

411 lines
18 KiB
Python

import tkinter as tk
from tkinter import filedialog, messagebox, ttk
import openpyxl
import xlrd
from openpyxl.reader.excel import load_workbook
from pyexcel_io.manager import get_data, save_data
from pyexcel import Sheet, Book
from pathlib import Path
from tqdm import tqdm
import threading
import re
class ExcelProcessorApp:
def __init__(self, root):
self.root = root
self.root.title("Excel 数据处理工具")
# 创建主框架
self.main_frame = tk.Frame(root)
self.main_frame.pack(padx=10, pady=10)
# 文件选择部分
self.file_label = tk.Label(self.main_frame, text="选择 Excel 文件:")
self.file_label.grid(row=0, column=0, sticky=tk.W)
self.file_entry = tk.Entry(self.main_frame, width=50)
self.file_entry.grid(row=0, column=1, padx=5)
self.file_button = tk.Button(self.main_frame, text="浏览", command=self.select_file)
self.file_button.grid(row=0, column=2)
# Sheet 选择部分
self.sheet_label = tk.Label(self.main_frame, text="选择 Sheet:")
self.sheet_label.grid(row=1, column=0, sticky=tk.W)
self.sheet_combobox = ttk.Combobox(self.main_frame, state="readonly")
self.sheet_combobox.grid(row=1, column=1, padx=5)
self.sheet_combobox.bind("<<ComboboxSelected>>", self.update_columns)
# 工单号选择
self.key_column_label = tk.Label(self.main_frame, text="选择工单号列:")
self.key_column_label.grid(row=2, column=0, sticky=tk.W)
self.key_column_combobox = ttk.Combobox(self.main_frame, state="readonly")
self.key_column_combobox.grid(row=2, column=1, padx=5)
# 添加组部分
self.groups_frame = tk.LabelFrame(self.main_frame, text="添加处理组")
self.groups_frame.grid(row=4, column=0, columnspan=2, pady=10, sticky=tk.W + tk.E)
# 创建 Canvas 和 Scrollbar
self.canvas = tk.Canvas(self.groups_frame)
self.scrollbar = tk.Scrollbar(self.groups_frame, orient="vertical", command=self.canvas.yview)
self.scrollable_frame = tk.Frame(self.canvas)
self.scrollable_frame.bind(
"<Configure>",
lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all"))
)
self.canvas.create_window((0, 0), window=self.scrollable_frame, anchor="nw")
self.canvas.configure(yscrollcommand=self.scrollbar.set)
self.canvas.pack(side="left", fill="both", expand=True)
self.scrollbar.pack(side="right", fill="y")
# 将 "添加组" 按钮移到 Canvas 外部
self.add_group_button = tk.Button(self.groups_frame, text="添加组", command=self.add_group)
self.add_group_button.pack(pady=5) # 放置在 Canvas 下方
# 运行按钮
self.run_button = tk.Button(self.main_frame, text="运行", command=self.run_processing)
self.run_button.grid(row=5, column=0, columnspan=2, pady=10)
# 运行明细部分
self.progress_frame = tk.LabelFrame(self.main_frame, text="运行明细")
self.progress_frame.grid(row=6, column=0, columnspan=2, pady=10, sticky=tk.W + tk.E)
self.progress_bar = ttk.Progressbar(self.progress_frame, mode='determinate')
self.progress_bar.pack(fill=tk.X, pady=5)
self.status_label = tk.Label(self.progress_frame, text="")
self.status_label.pack()
# 存储文件路径
self.output_file_path = None
# 存储列选择
self.selected_columns = []
self.group_frames = []
# 初始化变量
self.sheet_names = []
self.column_names = []
# 默认添加第一个组
self.add_group(is_first=True)
def select_file(self):
"""选择 Excel 文件"""
file_path = filedialog.askopenfilename(filetypes=[("Excel files", "*.xls;*.xlsx")])
if file_path:
self.file_entry.delete(0, tk.END)
self.file_entry.insert(0, file_path)
self.load_sheets(file_path)
def load_sheets(self, file_path):
"""加载 Excel 文件中的 Sheet 名"""
try:
data = get_data(file_path)
self.sheet_names = list(data.keys())
self.sheet_combobox['values'] = self.sheet_names
if self.sheet_names:
self.sheet_combobox.current(0) # 默认选择第一个 Sheet
self.update_columns(None)
except Exception as e:
messagebox.showerror("错误", f"无法加载文件: {e}")
def update_columns(self, event):
"""更新列选择列表"""
selected_sheet = self.sheet_combobox.get()
if selected_sheet:
data = get_data(self.file_entry.get(), sheet_name=selected_sheet)
self.column_names = list(data[selected_sheet][0]) # 假设第一行为列名
self.key_column_combobox['values'] = self.column_names
if self.column_names:
self.key_column_combobox.current(0) # 默认选择第一个列作为工单号
def add_group(self, is_first=False):
"""添加一个新的处理组"""
group_index = len(self.group_frames) + 1
group_frame = tk.Frame(self.scrollable_frame)
# 使用 grid 布局管理器
row_index = len(self.group_frames) + 1 # 确保每个组在新的一行
group_frame.grid(row=row_index, column=0, columnspan=2, pady=5, sticky=tk.W + tk.E)
group_label = tk.Label(group_frame, text=f"{group_index}:")
group_label.grid(row=0, column=0, padx=5, sticky=tk.W)
# 如果是第一个组,默认显示“选择列”按钮
if is_first or not self.group_frames:
select_columns_button = tk.Button(group_frame, text="选择列",
command=lambda: self.open_group_column_selection_window(group_frame))
select_columns_button.grid(row=0, column=1, padx=5, sticky=tk.W)
else:
select_columns_button = tk.Button(group_frame, text="选择列",
command=lambda: self.open_group_column_selection_window(group_frame))
select_columns_button.grid(row=0, column=1, padx=5, sticky=tk.W)
remove_button = tk.Button(group_frame, text="移除", command=lambda: self.remove_group(group_frame))
remove_button.grid(row=0, column=2, padx=5, sticky=tk.E)
self.group_frames.append((group_frame, select_columns_button))
# 更新 Canvas 的滚动区域
self.scrollable_frame.update_idletasks()
self.canvas.config(scrollregion=self.canvas.bbox("all"))
def open_group_column_selection_window(self, group_frame):
"""打开组的列选择窗口"""
if not self.column_names:
messagebox.showwarning("警告", "请选择一个有效的 Sheet")
return
# 创建新的窗口用于选择列
column_selection_window = tk.Toplevel(self.root)
column_selection_window.title(f"选择组 {len(self.group_frames)} 的列")
# 创建多选列表框
columns_listbox = tk.Listbox(column_selection_window, selectmode=tk.MULTIPLE, height=10, exportselection=False)
for col in self.column_names:
columns_listbox.insert(tk.END, col)
columns_listbox.pack(pady=5, fill=tk.BOTH, expand=True)
# 确认按钮
confirm_button = tk.Button(column_selection_window, text="确认",
command=lambda: self.confirm_group_column_selection(columns_listbox, group_frame,
column_selection_window))
confirm_button.pack(pady=5)
def confirm_group_column_selection(self, columns_listbox, group_frame, window):
"""确认组的列选择"""
selected_columns = [self.column_names[i] for i in columns_listbox.curselection()]
if not selected_columns:
messagebox.showwarning("警告", "请选择至少一列")
return
# 更新组的列选择
group_frame.children['!label'].config(text=f"{len(self.group_frames)}: {', '.join(selected_columns)}")
window.destroy()
def remove_group(self, group_frame):
"""移除一个处理组"""
for frame, button in self.group_frames:
if frame == group_frame:
frame.destroy()
self.group_frames.remove((frame, button))
break
# 重新调整剩余组的行号
for i, (frame, button) in enumerate(self.group_frames, start=1):
frame.grid(row=i, column=0, columnspan=2, pady=5, sticky=tk.W + tk.E)
frame.children['!label'].config(text=f"{i}:") # 更新组标签
# 更新 Canvas 的滚动区域
self.scrollable_frame.update_idletasks()
self.canvas.config(scrollregion=self.canvas.bbox("all"))
def run_processing(self):
"""运行数据处理"""
if not self.sheet_combobox.get():
messagebox.showwarning("警告", "请选择一个有效的 Sheet")
return
# 获取用户选择的工单号列
key_column = self.key_column_combobox.get()
if not key_column:
messagebox.showwarning("警告", "请选择工单号列")
return
# 获取用户选择的组
group_columns = {}
for frame, button in self.group_frames:
group_label = frame.children['!label']
group_text = group_label.cget("text")
if "" in group_text:
group_name, columns_text = group_text.split(": ")
columns = [col.strip() for col in columns_text.split(",")]
if columns:
group_columns[group_name] = columns
# 检查是否有重复的组名
if len(group_columns) != len(set(group_columns)):
messagebox.showwarning("警告", "组名不能重复")
return
# 确保输出文件路径
output_dir = Path(self.file_entry.get()).parent
output_file_name = "拆分后_明细统计.xlsx"
self.output_file_path = output_dir / output_file_name
# 使用线程运行处理函数,避免阻塞 UI
self.run_button.config(state=tk.DISABLED)
self.progress_bar['value'] = 0
self.status_label.config(text="正在处理数据...")
self.root.update_idletasks()
processing_thread = threading.Thread(target=self.process_data, args=(key_column, group_columns))
processing_thread.start()
def process_data(self, key_column, group_columns):
"""处理数据的函数"""
try:
# 确定文件类型并加载工作簿
file_path = self.file_entry.get()
if file_path.endswith('.xlsx'):
wb = load_workbook(filename=file_path)
ws = wb[self.sheet_combobox.get()]
elif file_path.endswith('.xls'):
wb = xlrd.open_workbook(filename=file_path)
ws = wb.sheet_by_name(self.sheet_combobox.get())
else:
raise ValueError("不支持的文件格式")
# 处理数据
normal_rows, abnormal_rows, abnormal_normal_rows, abnormal_original_rows = self.split_rows(ws, key_column, group_columns)
# 更新状态为“正在保存”
self.status_label.config(text="正在保存...")
self.root.update_idletasks()
# 写入新的 Excel 文件
output_file_path = Path(file_path).parent / "拆分后_明细统计.xlsx"
self.write_to_excel(normal_rows, abnormal_rows, abnormal_normal_rows, abnormal_original_rows, output_file_path)
# 更新状态为“处理完成”
self.status_label.config(text=f"处理完成,文件已保存到: {output_file_path}")
self.root.after(0, lambda: messagebox.showinfo("成功", f"文件已保存到: {output_file_path}"))
except Exception as e:
self.status_label.config(text=f"处理失败: {e}")
self.root.after(0, lambda error=e: messagebox.showerror("错误", f"处理失败: {e}"))
finally:
self.run_button.config(state=tk.NORMAL)
def split_rows(self, ws, key_column, group_columns):
"""分割数据的逻辑"""
print(f"开始拆分数据,关键列: {key_column}") # 添加日志
# 初始化三个空列表来存储正常、异常和异常工单号对应的正常数据
normal_rows = []
abnormal_rows = []
abnormal_key_values = set() # 用于存储异常工单号
abnormal_original_rows = [] # 用于存储异常工单的原始行
# 获取所有列名
header_row = next(ws.iter_rows(min_row=1, max_row=1, values_only=True)) if isinstance(ws, openpyxl.worksheet.worksheet.Worksheet) else [ws.cell_value(0, col) for col in range(ws.ncols)]
key_col_index = header_row.index(key_column)
total_rows = ws.max_row if isinstance(ws, openpyxl.worksheet.worksheet.Worksheet) else ws.nrows
self.progress_bar['maximum'] = total_rows - 1 # 排除表头
for row_idx in tqdm(range(2, total_rows + 1), desc="Processing Rows"):
if isinstance(ws, openpyxl.worksheet.worksheet.Worksheet):
row = [cell.value for cell in ws[row_idx]]
else:
row = [ws.cell_value(row_idx - 1, col) for col in range(ws.ncols)]
all_data = {}
max_length = 0
# 遍历每个组
for group_name, columns in group_columns.items():
group_data = {
col: row[header_row.index(col)].split('\r\n') if isinstance(row[header_row.index(col)], str) and row[header_row.index(col)] is not None else [row[header_row.index(col)]]
for col in columns}
all_data.update(group_data)
max_length = max(max_length, max(len(data) for data in group_data.values()))
# 记录当前行的工单号
key_value = row[key_col_index]
# 将所有列的数据扩展到最大长度
for i in range(max_length):
new_row = row.copy()
for col, data in all_data.items():
if i < len(data):
new_row[header_row.index(col)] = data[i]
else:
new_row[header_row.index(col)] = None
# 检查是否有异常情况
non_null_values = [new_row[header_row.index(col)] for col in all_data.keys() if new_row[header_row.index(col)] is not None]
if len(non_null_values) == 1:
# 如果这一行只有单个非空值,则记录为异常数据
abnormal_row = {
key_column: key_value,
'异常列': list(all_data.keys())[0],
'异常值': non_null_values[0],
'原始行': row # 记录原始行的所有信息
}
abnormal_rows.append(abnormal_row)
abnormal_key_values.add(key_value) # 记录异常工单号
abnormal_original_rows.append(row) # 记录异常工单的原始行
else:
normal_rows.append(new_row)
# 更新进度条
self.progress_bar['value'] = row_idx - 1
self.root.update_idletasks()
# 提取异常工单号对应的正常数据
abnormal_normal_rows = [row for row in normal_rows if row[key_col_index] in abnormal_key_values]
# 从正常数据中移除异常工单号的数据
normal_rows = [row for row in normal_rows if row[key_col_index] not in abnormal_key_values]
print("拆分完成") # 添加日志
return normal_rows, abnormal_rows, abnormal_normal_rows, abnormal_original_rows
def write_to_excel(self, normal_rows, abnormal_rows, abnormal_normal_rows, abnormal_original_rows, output_file_path):
"""将处理后的数据写入新的 Excel 文件"""
wb = Workbook()
ws_normal = wb.active
ws_normal.title = '正常数据'
ws_abnormal = wb.create_sheet(title='异常数据')
ws_abnormal_normal = wb.create_sheet(title='异常工单的正常数据')
ws_abnormal_original = wb.create_sheet(title='异常工单的原始行')
# 获取所有列名
header_row = [col for col in self.column_names]
# 写入表头
for col_num, col in enumerate(header_row, start=1):
ws_normal.cell(row=1, column=col_num, value=col)
ws_abnormal_normal.cell(row=1, column=col_num, value=col)
ws_abnormal_original.cell(row=1, column=col_num, value=col)
# 写入异常数据表头
ws_abnormal.append(['工单号', '异常列', '异常值', '原始行'])
# 写入正常数据
for row_num, row in enumerate(normal_rows, start=2):
for col_num, value in enumerate(row, start=1):
ws_normal.cell(row=row_num, column=col_num, value=value)
# 写入异常数据
for row_num, row in enumerate(abnormal_rows, start=2):
ws_abnormal.append([row[key_column], row['异常列'], row['异常值'], row['原始行']])
# 写入异常工单的正常数据
for row_num, row in enumerate(abnormal_normal_rows, start=2):
for col_num, value in enumerate(row, start=1):
ws_abnormal_normal.cell(row=row_num, column=col_num, value=value)
# 写入异常工单的原始行
for row_num, row in enumerate(abnormal_original_rows, start=2):
for col_num, value in enumerate(row, start=1):
ws_abnormal_original.cell(row=row_num, column=col_num, value=value)
# 保存工作簿
wb.save(output_file_path)
if __name__ == "__main__":
# 创建 Tkinter 根窗口
root = tk.Tk()
# 初始化应用程序
app = ExcelProcessorApp(root)
# 运行 Tkinter 主循环
root.mainloop()