import os import shutil import zipfile import pandas as pd from datetime import datetime from pathlib import Path from utils.logger import log class FileHandler: """ 通用文件操作工具类(所有输入输出均为DataFrame格式) 功能包括:文件读写、目录操作、文件压缩、路径处理等 """ def __init__(self, base_path=None): """ 初始化文件处理器 :param base_path: 基础路径,所有操作将基于此路径 """ self.base_path = Path(base_path) if base_path else None self.log = log.bind(module=self.__class__.__name__) def _resolve_path(self, path): """解析路径,处理相对路径和绝对路径""" path = Path(path) if not path.is_absolute() and self.base_path: return self.base_path / path return path def _to_dataframe(self, data, columns=None): """将数据转换为DataFrame格式""" if isinstance(data, pd.DataFrame): return data if isinstance(data, dict): return pd.DataFrame([data]) if isinstance(data, list): return pd.DataFrame(data, columns=columns) if columns else pd.DataFrame(data) return pd.DataFrame([{'value': data}]) def read_file(self, file_path, encoding='utf-8', **kwargs): """ 读取文件内容为DataFrame :param file_path: 文件路径 :param encoding: 文件编码 :param kwargs: pandas.read_* 方法的其他参数 :return: DataFrame """ file_path = self._resolve_path(file_path) try: ext = self.get_file_extension(file_path).lower() if ext in ['csv', 'txt']: df = pd.read_csv(file_path, encoding=encoding, **kwargs) elif ext in ['xls', 'xlsx']: df = pd.read_excel(file_path, **kwargs) elif ext == 'json': df = pd.read_json(file_path, encoding=encoding, **kwargs) elif ext == 'parquet': df = pd.read_parquet(file_path, **kwargs) else: # 默认按文本文件处理 with open(file_path, 'r', encoding=encoding) as f: content = f.read() df = self._to_dataframe({'content': content}) self.log.debug("文件读取成功 | path={} shape={}", file_path, df.shape) return df except Exception as e: self.log.error("文件读取失败 | path={} error={}", file_path, str(e)) raise def write_file(self, file_path, data, encoding='utf-8', **kwargs): """ 将DataFrame写入文件 :param file_path: 文件路径 :param data: 要写入的DataFrame数据 :param encoding: 文件编码 :param kwargs: pandas.to_* 方法的其他参数 :return: DataFrame({'success': bool, 'file_path': str, 'file_size': int}) """ file_path = self._resolve_path(file_path) df = self._to_dataframe(data) try: self.create_dir(os.path.dirname(file_path)) ext = self.get_file_extension(file_path) # 现在返回的是字符串 if ext in ['csv', 'txt']: df.to_csv(file_path, encoding=encoding, index=False, **kwargs) elif ext in ['xls', 'xlsx']: df.to_excel(file_path, index=False, **kwargs) elif ext == 'json': df.to_json(file_path, force_ascii=False, **kwargs) elif ext == 'parquet': df.to_parquet(file_path, **kwargs) else: # 默认按文本文件处理 content = df.to_string(index=False) with open(file_path, 'w', encoding=encoding) as f: f.write(content) file_size = os.path.getsize(file_path) result = { 'success': True, 'file_path': str(file_path), 'file_size': file_size } self.log.debug("文件写入成功 | path={} size={} bytes", file_path, file_size) return self._to_dataframe(result) except Exception as e: self.log.error("文件写入失败 | path={} error={}", file_path, str(e)) raise def read_lines(self, file_path, encoding='utf-8', columns=['line_content']): """ 按行读取文件内容为DataFrame :param file_path: 文件路径 :param encoding: 文件编码 :param columns: 列名列表 :return: DataFrame """ file_path = self._resolve_path(file_path) try: with open(file_path, 'r', encoding=encoding) as f: lines = f.readlines() df = self._to_dataframe(lines, columns=columns) self.log.debug("文件按行读取成功 | path={} lines={}", file_path, len(df)) return df except Exception as e: self.log.error("文件按行读取失败 | path={} error={}", file_path, str(e)) raise def write_lines(self, file_path, data, encoding='utf-8', line_column=None): """ 将DataFrame按行写入文件 :param file_path: 文件路径 :param data: 要写入的DataFrame数据 :param encoding: 文件编码 :param line_column: 指定作为行内容的列名 """ file_path = self._resolve_path(file_path) df = self._to_dataframe(data) try: self.create_dir(os.path.dirname(file_path)) if line_column and line_column in df.columns: lines = df[line_column].tolist() else: lines = df.to_string(index=False, header=False).split('\n') with open(file_path, 'w', encoding=encoding) as f: f.writelines([line + '\n' for line in lines]) self.log.debug("文件按行写入成功 | path={} lines={}", file_path, len(lines)) except Exception as e: self.log.error("文件按行写入失败 | path={} error={}", file_path, str(e)) raise def file_exists(self, file_path): """ 检查文件是否存在 :param file_path: 文件路径 :return: DataFrame({'exists': bool}) """ file_path = self._resolve_path(file_path) exists = os.path.isfile(file_path) self.log.trace("文件存在检查 | path={} exists={}", file_path, exists) return self._to_dataframe({'exists': [exists]}) def dir_exists(self, dir_path): """ 检查目录是否存在 :param dir_path: 目录路径 :return: DataFrame({'exists': bool}) """ dir_path = self._resolve_path(dir_path) exists = os.path.isdir(dir_path) self.log.trace("目录存在检查 | path={} exists={}", dir_path, exists) return self._to_dataframe({'exists': [exists]}) def create_dir(self, dir_path): """ 创建目录(包括父目录) :param dir_path: 目录路径 :return: DataFrame({'created': bool, 'path': str}) """ dir_path = self._resolve_path(dir_path) try: os.makedirs(dir_path, exist_ok=True) self.log.debug("目录创建成功 | path={}", dir_path) return self._to_dataframe({'created': [True], 'path': [str(dir_path)]}) except Exception as e: self.log.error("目录创建失败 | path={} error={}", dir_path, str(e)) raise def delete_file(self, file_path): """ 删除文件 :param file_path: 文件路径 :return: DataFrame({'deleted': bool, 'path': str}) """ file_path = self._resolve_path(file_path) try: exists = self.file_exists(file_path).iloc[0]['exists'] if exists: os.remove(file_path) self.log.debug("文件删除成功 | path={}", file_path) return self._to_dataframe({'deleted': [True], 'path': [str(file_path)]}) return self._to_dataframe({'deleted': [False], 'path': [str(file_path)]}) except Exception as e: self.log.error("文件删除失败 | path={} error={}", file_path, str(e)) raise def delete_dir(self, dir_path): """ 删除目录及其内容 :param dir_path: 目录路径 :return: DataFrame({'deleted': bool, 'path': str}) """ dir_path = self._resolve_path(dir_path) try: exists = self.dir_exists(dir_path).iloc[0]['exists'] if exists: shutil.rmtree(dir_path) self.log.debug("目录删除成功 | path={}", dir_path) return self._to_dataframe({'deleted': [True], 'path': [str(dir_path)]}) return self._to_dataframe({'deleted': [False], 'path': [str(dir_path)]}) except Exception as e: self.log.error("目录删除失败 | path={} error={}", dir_path, str(e)) raise def copy_file(self, src_path, dst_path): """ 复制文件 :param src_path: 源文件路径 :param dst_path: 目标文件路径 :return: DataFrame({'copied': bool, 'source': str, 'destination': str}) """ src_path = self._resolve_path(src_path) dst_path = self._resolve_path(dst_path) try: self.create_dir(os.path.dirname(dst_path)) shutil.copy2(src_path, dst_path) self.log.debug("文件复制成功 | src={} dst={}", src_path, dst_path) return self._to_dataframe({ 'copied': [True], 'source': [str(src_path)], 'destination': [str(dst_path)] }) except Exception as e: self.log.error("文件复制失败 | src={} dst={} error={}", src_path, dst_path, str(e)) raise def move_file(self, src_path, dst_path): """ 移动/重命名文件 :param src_path: 源文件路径 :param dst_path: 目标文件路径 :return: DataFrame({'moved': bool, 'source': str, 'destination': str}) """ src_path = self._resolve_path(src_path) dst_path = self._resolve_path(dst_path) try: self.create_dir(os.path.dirname(dst_path)) shutil.move(src_path, dst_path) self.log.debug("文件移动成功 | src={} dst={}", src_path, dst_path) return self._to_dataframe({ 'moved': [True], 'source': [str(src_path)], 'destination': [str(dst_path)] }) except Exception as e: self.log.error("文件移动失败 | src={} dst={} error={}", src_path, dst_path, str(e)) raise def list_files(self, dir_path, recursive=False, pattern='*'): """ 列出目录中的文件 :param dir_path: 目录路径 :param recursive: 是否递归查找 :param pattern: 文件匹配模式 :return: DataFrame({'file_path': str, 'file_name': str, 'extension': str}) """ dir_path = self._resolve_path(dir_path) try: if recursive: files = [str(f) for f in Path(dir_path).rglob(pattern) if f.is_file()] else: files = [str(f) for f in Path(dir_path).glob(pattern) if f.is_file()] result = [] for f in files: p = Path(f) result.append({ 'file_path': str(p), 'file_name': p.name, 'extension': p.suffix.lower().lstrip('.') }) df = self._to_dataframe(result) self.log.trace("列出目录文件 | path={} recursive={} count={}", dir_path, recursive, len(df)) return df except Exception as e: self.log.error("列出文件失败 | path={} error={}", dir_path, str(e)) raise def list_dirs(self, dir_path, recursive=False): """ 列出目录中的子目录 :param dir_path: 目录路径 :param recursive: 是否递归查找 :return: DataFrame({'dir_path': str, 'dir_name': str}) """ dir_path = self._resolve_path(dir_path) try: if recursive: dirs = [str(d) for d in Path(dir_path).rglob('*') if d.is_dir()] else: dirs = [str(d) for d in Path(dir_path).glob('*') if d.is_dir()] result = [{'dir_path': d, 'dir_name': Path(d).name} for d in dirs] df = self._to_dataframe(result) self.log.trace("列出子目录 | path={} recursive={} count={}", dir_path, recursive, len(df)) return df except Exception as e: self.log.error("列出目录失败 | path={} error={}", dir_path, str(e)) raise def get_file_size(self, file_path): """ 获取文件大小(字节) :param file_path: 文件路径 :return: DataFrame({'file_path': str, 'size_bytes': int, 'size_mb': float}) """ file_path = self._resolve_path(file_path) try: size_bytes = os.path.getsize(file_path) result = { 'file_path': str(file_path), 'size_bytes': size_bytes, 'size_mb': round(size_bytes / 1024 / 1024, 4) } df = self._to_dataframe(result) self.log.trace("获取文件大小 | path={} size={} bytes", file_path, size_bytes) return df except Exception as e: self.log.error("获取文件大小失败 | path={} error={}", file_path, str(e)) raise def get_file_modified_time(self, file_path): """ 获取文件修改时间 :param file_path: 文件路径 :return: DataFrame({'file_path': str, 'modified_time': datetime, 'timestamp': float}) """ file_path = self._resolve_path(file_path) try: mtime = datetime.fromtimestamp(os.path.getmtime(file_path)) result = { 'file_path': str(file_path), 'modified_time': mtime, 'timestamp': mtime.timestamp() } df = self._to_dataframe(result) self.log.trace("获取文件修改时间 | path={} mtime={}", file_path, mtime.isoformat()) return df except Exception as e: self.log.error("获取文件修改时间失败 | path={} error={}", file_path, str(e)) raise def zip_files(self, file_paths, zip_path): """ 压缩多个文件到zip :param file_paths: 要压缩的文件路径列表或DataFrame :param zip_path: 压缩文件路径 :return: DataFrame({'zipped': bool, 'zip_path': str, 'file_count': int}) """ zip_path = self._resolve_path(zip_path) # 处理输入可以是DataFrame或列表 if isinstance(file_paths, pd.DataFrame): if 'file_path' in file_paths.columns: file_list = file_paths['file_path'].tolist() else: file_list = file_paths.iloc[:, 0].tolist() else: file_list = file_paths try: self.create_dir(os.path.dirname(zip_path)) file_count = 0 with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for file_path in file_list: file_path = self._resolve_path(file_path) if self.file_exists(file_path).iloc[0]['exists']: zipf.write(file_path, os.path.basename(file_path)) file_count += 1 result = { 'zipped': True, 'zip_path': str(zip_path), 'file_count': file_count } self.log.info("文件压缩成功 | zip={} files={}", zip_path, file_count) return self._to_dataframe(result) except Exception as e: self.log.error("文件压缩失败 | zip={} error={}", zip_path, str(e)) raise def zip_dir(self, dir_path, zip_path): """ 压缩整个目录到zip :param dir_path: 要压缩的目录路径 :param zip_path: 压缩文件路径 :return: DataFrame({'zipped': bool, 'zip_path': str, 'dir_path': str, 'file_count': int}) """ dir_path = self._resolve_path(dir_path) zip_path = self._resolve_path(zip_path) try: self.create_dir(os.path.dirname(zip_path)) file_count = 0 with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(dir_path): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, dir_path) zipf.write(file_path, arcname) file_count += 1 result = { 'zipped': True, 'zip_path': str(zip_path), 'dir_path': str(dir_path), 'file_count': file_count } self.log.info("目录压缩成功 | zip={} dir={} files={}", zip_path, dir_path, file_count) return self._to_dataframe(result) except Exception as e: self.log.error("目录压缩失败 | zip={} error={}", zip_path, str(e)) raise def unzip(self, zip_path, extract_to=None): """ 解压zip文件 :param zip_path: zip文件路径 :param extract_to: 解压目标目录,默认为zip文件所在目录 :return: DataFrame({'unzipped': bool, 'zip_path': str, 'extract_to': str, 'file_count': int}) """ zip_path = self._resolve_path(zip_path) if extract_to is None: extract_to = os.path.dirname(zip_path) else: extract_to = self._resolve_path(extract_to) try: self.create_dir(extract_to) with zipfile.ZipFile(zip_path, 'r') as zipf: file_list = zipf.namelist() zipf.extractall(extract_to) result = { 'unzipped': True, 'zip_path': str(zip_path), 'extract_to': str(extract_to), 'file_count': len(file_list) } self.log.info("文件解压成功 | zip={} extract_to={} files={}", zip_path, extract_to, len(file_list)) return self._to_dataframe(result) except Exception as e: self.log.error("文件解压失败 | zip={} error={}", zip_path, str(e)) raise def compress_large_log(self, log_path, max_size_mb=20): """ 压缩过大的日志文件 :param log_path: 日志文件路径 :param max_size_mb: 最大大小(MB),超过则压缩 :return: DataFrame({'compressed': bool, 'original_path': str, 'zip_path': str, 'original_size_mb': float}) """ log_path = self._resolve_path(log_path) if not self.file_exists(log_path).iloc[0]['exists']: return self._to_dataframe({'compressed': [False]}) max_size_bytes = max_size_mb * 1024 * 1024 size_info = self.get_file_size(log_path) current_size = size_info.iloc[0]['size_bytes'] if current_size > max_size_bytes: try: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') zip_path = f"{log_path}_{timestamp}.zip" self.zip_files([log_path], zip_path) self.delete_file(log_path) result = { 'compressed': True, 'original_path': str(log_path), 'zip_path': zip_path, 'original_size_mb': round(current_size/1024/1024, 2) } self.log.info("日志文件压缩 | original={} compressed={} original_size={} MB", log_path, zip_path, result['original_size_mb']) return self._to_dataframe(result) except Exception as e: self.log.error("日志压缩失败 | path={} error={}", log_path, str(e)) raise return self._to_dataframe({'compressed': [False]}) def get_file_extension(self, file_path): """ 获取文件扩展名 :param file_path: 文件路径 :return: 文件扩展名字符串(小写,不带点) """ file_path = self._resolve_path(file_path) ext = Path(file_path).suffix.lower().lstrip('.') self.log.trace("获取文件扩展名 | path={} ext={}", file_path, ext) return ext # 直接返回字符串而不是DataFrame def change_file_extension(self, file_path, new_extension): """ 修改文件扩展名 :param file_path: 文件路径 :param new_extension: 新扩展名(不带点) :return: DataFrame({'original_path': str, 'new_path': str}) """ file_path = self._resolve_path(file_path) new_path = str(Path(file_path).with_suffix(f'.{new_extension}')) result = {'original_path': str(file_path), 'new_path': new_path} self.log.debug("修改文件扩展名 | original={} new={}", file_path, new_path) return self._to_dataframe(result) def join_path(self, *paths): """ 拼接路径 :param paths: 多个路径部分 :return: DataFrame({'joined_path': str}) """ joined_path = str(Path(*paths)) self.log.trace("路径拼接 | parts={} result={}", paths, joined_path) return self._to_dataframe({'joined_path': [joined_path]})