diff --git a/test/subdir/test.json b/test/subdir/test.json new file mode 100644 index 0000000..449d36b --- /dev/null +++ b/test/subdir/test.json @@ -0,0 +1 @@ +{"a":{"0":1},"b":{"0":2}} \ No newline at end of file diff --git a/test/通用文件读取测试.py b/test/通用文件读取测试.py index 7176c18..0302bac 100644 --- a/test/通用文件读取测试.py +++ b/test/通用文件读取测试.py @@ -79,13 +79,99 @@ def test_read_write_excel(file_handler, temp_dir, sample_dataframe): df = file_handler.read_file(test_file) assert df.shape == (3, 3) -def test_read_write_text(file_handler, sample_text_file): - """测试文本文件读写""" - # 测试读取 - df = file_handler.read_file(sample_text_file) - assert df.shape == (1, 1) # 默认单行读取 +def test_read_write_csv(file_handler, temp_dir, sample_dataframe): + """测试CSV文件读写""" + test_file = temp_dir / "test.csv" - # 测试按行读取 - lines_df = file_handler.read_lines(sample_text_file) - assert lines_df.shape == (3, 1) - assert lines_df.iloc[0, 0] == "line1" \ No newline at end of file + # 测试写入 + write_result = file_handler.write_file(test_file, sample_dataframe) + + # 修改断言方式 + assert bool(write_result.iloc[0]['success']) == True # 使用bool()转换 + # 或者 + assert write_result.iloc[0]['success'] == True # 使用值比较 + + assert os.path.exists(test_file) + + # 测试读取 + df = file_handler.read_file(test_file) + assert df.shape == (3, 3) + assert list(df.columns) == ['id', 'name', 'value'] + + +# 文件操作测试 +def test_file_operations(file_handler, sample_text_file): + """测试文件存在检查、删除等操作""" + # 测试文件存在检查 + exists_df = file_handler.file_exists(sample_text_file) + assert exists_df.iloc[0]['exists'] == True + + # 测试获取文件大小 + size_df = file_handler.get_file_size(sample_text_file) + assert size_df.iloc[0]['size_bytes'] > 0 + + # 测试获取修改时间 + mtime_df = file_handler.get_file_modified_time(sample_text_file) + assert isinstance(mtime_df.iloc[0]['modified_time'], datetime) + + # 测试删除文件 + delete_df = file_handler.delete_file(sample_text_file) + assert delete_df.iloc[0]['deleted'] == True + assert not os.path.exists(sample_text_file) + +def test_directory_operations(file_handler, temp_dir): + """测试目录操作""" + test_dir = temp_dir / "subdir" + + # 测试创建目录 + create_df = file_handler.create_dir(test_dir) + assert create_df.iloc[0]['created'] == True + assert os.path.isdir(test_dir) + + # 测试列出目录 + list_df = file_handler.list_dirs(temp_dir) + assert any("subdir" in d for d in list_df['dir_name'].values) + + # 测试删除目录 + delete_df = file_handler.delete_dir(test_dir) + assert delete_df.iloc[0]['deleted'] == True + assert not os.path.exists(test_dir) + + +# 文件压缩 +def test_zip_operations(file_handler, temp_dir, sample_dataframe): + """测试文件压缩解压""" + # 创建测试文件 + test_file1 = temp_dir / "file1.txt" + test_file2 = temp_dir / "file2.csv" + file_handler.write_file(test_file1, "test content") + file_handler.write_file(test_file2, sample_dataframe) + + # 测试压缩文件 + zip_path = temp_dir / "test.zip" + zip_result = file_handler.zip_files([test_file1, test_file2], zip_path) + assert zip_result.iloc[0]['zipped'] == True + assert zip_result.iloc[0]['file_count'] == 2 + + # 测试解压 + extract_dir = temp_dir / "extracted" + unzip_result = file_handler.unzip(zip_path, extract_dir) + assert unzip_result.iloc[0]['unzipped'] is True + assert os.path.exists(extract_dir / "file1.txt") + assert os.path.exists(extract_dir / "file2.csv") + +def test_zip_directory(file_handler, temp_dir): + """测试目录压缩""" + # 创建测试目录结构 + test_dir = temp_dir / "test_dir" + sub_dir = test_dir / "sub" + sub_dir.mkdir(parents=True) + + (test_dir / "file1.txt").write_text("content1") + (sub_dir / "file2.txt").write_text("content2") + + # 测试压缩目录 + zip_path = temp_dir / "dir.zip" + zip_result = file_handler.zip_dir(test_dir, zip_path) + assert zip_result.iloc[0]['zipped'] == True + assert zip_result.iloc[0]['file_count'] == 2 \ No newline at end of file diff --git a/utils/file_handler.py b/utils/file_handler.py index 2ac8ce3..e668749 100644 --- a/utils/file_handler.py +++ b/utils/file_handler.py @@ -3,51 +3,67 @@ import shutil import zipfile import pandas as pd from datetime import datetime -from pathlib import Path +from pathlib import Path, PurePath +from typing import Union, Optional, List, Dict, Any from utils.logger import log class FileHandler: """ - 通用文件操作工具类(所有输入输出均为DataFrame格式) - 功能包括:文件读写、目录操作、文件压缩、路径处理等 + 跨平台文件操作工具类(兼容Windows/macOS/Linux) + 功能规范: + - 读取文件内容的方法返回DataFrame + - 其他所有方法返回统一格式字典: + { + 'success': bool, # 操作是否成功 + 'message': str, # 操作结果描述 + 'data': Any # 操作返回的数据(可选) + } """ - def __init__(self, base_path=None): + def __init__(self, base_path: Optional[Union[str, Path]] = None): """ 初始化文件处理器 - :param base_path: 基础路径,所有操作将基于此路径 + :param base_path: 基础路径(自动处理跨平台路径格式) """ - self.base_path = Path(base_path) if base_path else None + self.base_path = self._normalize_path(base_path) if base_path else None self.log = log.bind(module=self.__class__.__name__) - def _resolve_path(self, path): - """解析路径,处理相对路径和绝对路径""" - path = Path(path) + def _normalize_path(self, path: Union[str, Path]) -> Path: + """统一转换为跨平台Path对象""" + return Path(str(path).replace('\\', '/')) + + def _resolve_path(self, path: Union[str, Path]) -> Path: + """解析路径(自动处理跨平台路径)""" + path = self._normalize_path(path) if not path.is_absolute() and self.base_path: - return self.base_path / path + return self._normalize_path(self.base_path / path) return path - def _to_dataframe(self, data, columns=None): - """将数据转换为DataFrame格式""" - if isinstance(data, pd.DataFrame): - return data - if isinstance(data, dict): - return pd.DataFrame([data]) - if isinstance(data, list): - return pd.DataFrame(data, columns=columns) if columns else pd.DataFrame(data) - return pd.DataFrame([{'value': data}]) + def _format_result(self, + success: bool, + message: str = "", + data: Optional[Any] = None) -> Dict[str, Any]: + """统一返回结果格式""" + return { + 'success': bool(success), + 'message': str(message), + 'data': data + } - def read_file(self, file_path, encoding='utf-8', **kwargs): + def read_file(self, + file_path: Union[str, Path], + encoding: str = 'utf-8', + **kwargs) -> pd.DataFrame: """ - 读取文件内容为DataFrame - :param file_path: 文件路径 - :param encoding: 文件编码 - :param kwargs: pandas.read_* 方法的其他参数 - :return: DataFrame + 读取文件内容为DataFrame(跨平台兼容) + :param file_path: 文件路径(自动处理跨平台格式) + :param encoding: 文件编码(默认utf-8) + :return: 包含文件内容的DataFrame + :raises: 文件读取失败时抛出原始异常 """ file_path = self._resolve_path(file_path) try: - ext = self.get_file_extension(file_path).lower() + ext = self.get_file_extension(file_path) if ext in ['csv', 'txt']: df = pd.read_csv(file_path, encoding=encoding, **kwargs) @@ -58,33 +74,42 @@ class FileHandler: elif ext == 'parquet': df = pd.read_parquet(file_path, **kwargs) else: - # 默认按文本文件处理 with open(file_path, 'r', encoding=encoding) as f: - content = f.read() - df = self._to_dataframe({'content': content}) + return pd.DataFrame({'content': [f.read()]}) - self.log.debug("文件读取成功 | path={} shape={}", file_path, df.shape) + self.log.debug(f"文件读取成功 | path={file_path} shape={df.shape}") return df except Exception as e: - self.log.error("文件读取失败 | path={} error={}", file_path, str(e)) + self.log.error(f"文件读取失败 | path={file_path} error={str(e)}") raise - def write_file(self, file_path, data, encoding='utf-8', **kwargs): + def write_file(self, + file_path: Union[str, Path], + data: Union[pd.DataFrame, Dict, List], + encoding: str = 'utf-8', + **kwargs) -> Dict[str, Any]: """ - 将DataFrame写入文件 - :param file_path: 文件路径 - :param data: 要写入的DataFrame数据 - :param encoding: 文件编码 - :param kwargs: pandas.to_* 方法的其他参数 - :return: DataFrame({'success': bool, 'file_path': str, 'file_size': int}) + 写入文件(跨平台兼容) + :param file_path: 目标文件路径 + :param data: 要写入的数据(支持DataFrame/dict/list) + :param encoding: 文件编码(默认utf-8) + :return: 操作结果字典 """ file_path = self._resolve_path(file_path) - df = self._to_dataframe(data) - try: - self.create_dir(os.path.dirname(file_path)) - ext = self.get_file_extension(file_path) # 现在返回的是字符串 + # 自动创建父目录 + parent_dir = file_path.parent + if not parent_dir.exists(): + self.create_dir(parent_dir) + # 统一数据格式 + if isinstance(data, pd.DataFrame): + df = data + else: + df = pd.DataFrame(data if isinstance(data, list) else [data]) + + # 根据扩展名选择写入方式 + ext = self.get_file_extension(file_path) if ext in ['csv', 'txt']: df.to_csv(file_path, encoding=encoding, index=False, **kwargs) elif ext in ['xls', 'xlsx']: @@ -94,459 +119,375 @@ class FileHandler: elif ext == 'parquet': df.to_parquet(file_path, **kwargs) else: - # 默认按文本文件处理 - content = df.to_string(index=False) with open(file_path, 'w', encoding=encoding) as f: - f.write(content) + f.write(str(data)) - file_size = os.path.getsize(file_path) - result = { - 'success': True, - 'file_path': str(file_path), - 'file_size': file_size - } - self.log.debug("文件写入成功 | path={} size={} bytes", file_path, file_size) - return self._to_dataframe(result) + # 返回成功结果 + return self._format_result( + True, + "文件写入成功", + { + 'file_path': str(file_path), + 'file_size': os.path.getsize(file_path) + } + ) except Exception as e: - self.log.error("文件写入失败 | path={} error={}", file_path, str(e)) - raise + return self._format_result( + False, + f"文件写入失败: {str(e)}", + {'file_path': str(file_path)} + ) - def read_lines(self, file_path, encoding='utf-8', columns=['line_content']): + def file_exists(self, file_path: Union[str, Path]) -> Dict[str, Any]: """ - 按行读取文件内容为DataFrame - :param file_path: 文件路径 - :param encoding: 文件编码 - :param columns: 列名列表 - :return: DataFrame + 检查文件是否存在(跨平台兼容) + :return: 包含exists字段的结果字典 + """ + file_path = self._resolve_path(file_path) + exists = file_path.is_file() + msg = f"文件{'' if exists else '不'}存在: {file_path}" + return self._format_result(True, msg, {'exists': exists}) + + def dir_exists(self, dir_path: Union[str, Path]) -> Dict[str, Any]: + """ + 检查目录是否存在(跨平台兼容) + :return: 包含exists字段的结果字典 + """ + dir_path = self._resolve_path(dir_path) + exists = dir_path.is_dir() + msg = f"目录{'' if exists else '不'}存在: {dir_path}" + return self._format_result(True, msg, {'exists': exists}) + + def create_dir(self, dir_path: Union[str, Path]) -> Dict[str, Any]: + """ + 创建目录(跨平台兼容) + :return: 包含path字段的结果字典 + """ + dir_path = self._resolve_path(dir_path) + try: + dir_path.mkdir(parents=True, exist_ok=True) + + # Windows系统需要额外设置权限 + if os.name == 'nt': + try: + os.chmod(dir_path, 0o777) + except: + pass + + return self._format_result(True, "目录创建成功", {'path': str(dir_path)}) + except Exception as e: + return self._format_result(False, f"目录创建失败: {str(e)}", {'path': str(dir_path)}) + + def delete_file(self, file_path: Union[str, Path]) -> Dict[str, Any]: + """ + 删除文件(跨平台兼容) + :return: 包含path字段的结果字典 """ file_path = self._resolve_path(file_path) try: - with open(file_path, 'r', encoding=encoding) as f: - lines = f.readlines() + if not file_path.exists(): + return self._format_result(False, "文件不存在", {'path': str(file_path)}) - df = self._to_dataframe(lines, columns=columns) - self.log.debug("文件按行读取成功 | path={} lines={}", file_path, len(df)) - return df + file_path.unlink() + return self._format_result(True, "文件删除成功", {'path': str(file_path)}) except Exception as e: - self.log.error("文件按行读取失败 | path={} error={}", file_path, str(e)) - raise + return self._format_result(False, f"文件删除失败: {str(e)}", {'path': str(file_path)}) - def write_lines(self, file_path, data, encoding='utf-8', line_column=None): + def delete_dir(self, dir_path: Union[str, Path]) -> Dict[str, Any]: """ - 将DataFrame按行写入文件 - :param file_path: 文件路径 - :param data: 要写入的DataFrame数据 - :param encoding: 文件编码 - :param line_column: 指定作为行内容的列名 + 删除目录及其内容(跨平台兼容) + :return: 包含path字段的结果字典 """ - file_path = self._resolve_path(file_path) - df = self._to_dataframe(data) - + dir_path = self._resolve_path(dir_path) try: - self.create_dir(os.path.dirname(file_path)) + if not dir_path.exists(): + return self._format_result(False, "目录不存在", {'path': str(dir_path)}) - if line_column and line_column in df.columns: - lines = df[line_column].tolist() + shutil.rmtree(dir_path) + return self._format_result(True, "目录删除成功", {'path': str(dir_path)}) + except Exception as e: + return self._format_result(False, f"目录删除失败: {str(e)}", {'path': str(dir_path)}) + + def list_files(self, + dir_path: Union[str, Path], + recursive: bool = False, + pattern: str = '*') -> Dict[str, Any]: + """ + 列出目录中的文件(跨平台兼容) + :param recursive: 是否递归查找 + :param pattern: 文件匹配模式(如*.txt) + :return: 包含files字段的结果字典 + """ + dir_path = self._resolve_path(dir_path) + try: + if recursive: + files = list(dir_path.rglob(pattern)) else: - lines = df.to_string(index=False, header=False).split('\n') + files = list(dir_path.glob(pattern)) - with open(file_path, 'w', encoding=encoding) as f: - f.writelines([line + '\n' for line in lines]) + file_info = [ + { + 'path': str(f), + 'name': f.name, + 'size': f.stat().st_size, + 'modified': datetime.fromtimestamp(f.stat().st_mtime).isoformat(), + 'is_dir': f.is_dir() + } for f in files if f.is_file() # 只返回文件,不包括目录 + ] - self.log.debug("文件按行写入成功 | path={} lines={}", file_path, len(lines)) + return self._format_result( + True, + f"找到 {len(file_info)} 个文件", + {'files': file_info} + ) except Exception as e: - self.log.error("文件按行写入失败 | path={} error={}", file_path, str(e)) - raise + return self._format_result( + False, + f"列出文件失败: {str(e)}", + {'files': []} + ) - def file_exists(self, file_path): + def get_file_extension(self, file_path: Union[str, Path]) -> str: """ - 检查文件是否存在 - :param file_path: 文件路径 - :return: DataFrame({'exists': bool}) + 获取文件扩展名(跨平台兼容) + :return: 小写且不带点的扩展名(如 'jpg') """ file_path = self._resolve_path(file_path) - exists = os.path.isfile(file_path) - self.log.trace("文件存在检查 | path={} exists={}", file_path, exists) - return self._to_dataframe({'exists': [exists]}) + ext = file_path.suffix.lower().lstrip('.') + self.log.trace(f"获取文件扩展名 | path={file_path} ext={ext}") + return ext - def dir_exists(self, dir_path): + def copy_file(self, + src_path: Union[str, Path], + dst_path: Union[str, Path]) -> Dict[str, Any]: """ - 检查目录是否存在 - :param dir_path: 目录路径 - :return: DataFrame({'exists': bool}) - """ - dir_path = self._resolve_path(dir_path) - exists = os.path.isdir(dir_path) - self.log.trace("目录存在检查 | path={} exists={}", dir_path, exists) - return self._to_dataframe({'exists': [exists]}) - - def create_dir(self, dir_path): - """ - 创建目录(包括父目录) - :param dir_path: 目录路径 - :return: DataFrame({'created': bool, 'path': str}) - """ - dir_path = self._resolve_path(dir_path) - try: - os.makedirs(dir_path, exist_ok=True) - self.log.debug("目录创建成功 | path={}", dir_path) - return self._to_dataframe({'created': [True], 'path': [str(dir_path)]}) - except Exception as e: - self.log.error("目录创建失败 | path={} error={}", dir_path, str(e)) - raise - - def delete_file(self, file_path): - """ - 删除文件 - :param file_path: 文件路径 - :return: DataFrame({'deleted': bool, 'path': str}) - """ - file_path = self._resolve_path(file_path) - try: - exists = self.file_exists(file_path).iloc[0]['exists'] - if exists: - os.remove(file_path) - self.log.debug("文件删除成功 | path={}", file_path) - return self._to_dataframe({'deleted': [True], 'path': [str(file_path)]}) - return self._to_dataframe({'deleted': [False], 'path': [str(file_path)]}) - except Exception as e: - self.log.error("文件删除失败 | path={} error={}", file_path, str(e)) - raise - - def delete_dir(self, dir_path): - """ - 删除目录及其内容 - :param dir_path: 目录路径 - :return: DataFrame({'deleted': bool, 'path': str}) - """ - dir_path = self._resolve_path(dir_path) - try: - exists = self.dir_exists(dir_path).iloc[0]['exists'] - if exists: - shutil.rmtree(dir_path) - self.log.debug("目录删除成功 | path={}", dir_path) - return self._to_dataframe({'deleted': [True], 'path': [str(dir_path)]}) - return self._to_dataframe({'deleted': [False], 'path': [str(dir_path)]}) - except Exception as e: - self.log.error("目录删除失败 | path={} error={}", dir_path, str(e)) - raise - - def copy_file(self, src_path, dst_path): - """ - 复制文件 - :param src_path: 源文件路径 - :param dst_path: 目标文件路径 - :return: DataFrame({'copied': bool, 'source': str, 'destination': str}) + 复制文件(跨平台兼容) + :return: 包含source和destination字段的结果字典 """ src_path = self._resolve_path(src_path) dst_path = self._resolve_path(dst_path) try: - self.create_dir(os.path.dirname(dst_path)) + if not src_path.exists(): + return self._format_result( + False, + "源文件不存在", + { + 'source': str(src_path), + 'destination': str(dst_path) + } + ) + + # 确保目标目录存在 + self.create_dir(dst_path.parent) + shutil.copy2(src_path, dst_path) - self.log.debug("文件复制成功 | src={} dst={}", src_path, dst_path) - return self._to_dataframe({ - 'copied': [True], - 'source': [str(src_path)], - 'destination': [str(dst_path)] - }) + return self._format_result( + True, + "文件复制成功", + { + 'source': str(src_path), + 'destination': str(dst_path), + 'file_size': dst_path.stat().st_size + } + ) except Exception as e: - self.log.error("文件复制失败 | src={} dst={} error={}", - src_path, dst_path, str(e)) - raise + return self._format_result( + False, + f"文件复制失败: {str(e)}", + { + 'source': str(src_path), + 'destination': str(dst_path) + } + ) - def move_file(self, src_path, dst_path): + def move_file(self, + src_path: Union[str, Path], + dst_path: Union[str, Path]) -> Dict[str, Any]: """ - 移动/重命名文件 - :param src_path: 源文件路径 - :param dst_path: 目标文件路径 - :return: DataFrame({'moved': bool, 'source': str, 'destination': str}) + 移动/重命名文件(跨平台兼容) + :return: 包含source和destination字段的结果字典 """ src_path = self._resolve_path(src_path) dst_path = self._resolve_path(dst_path) try: - self.create_dir(os.path.dirname(dst_path)) + if not src_path.exists(): + return self._format_result( + False, + "源文件不存在", + { + 'source': str(src_path), + 'destination': str(dst_path) + } + ) + + # 确保目标目录存在 + self.create_dir(dst_path.parent) + shutil.move(src_path, dst_path) - self.log.debug("文件移动成功 | src={} dst={}", src_path, dst_path) - return self._to_dataframe({ - 'moved': [True], - 'source': [str(src_path)], - 'destination': [str(dst_path)] - }) + return self._format_result( + True, + "文件移动成功", + { + 'source': str(src_path), + 'destination': str(dst_path) + } + ) except Exception as e: - self.log.error("文件移动失败 | src={} dst={} error={}", - src_path, dst_path, str(e)) - raise + return self._format_result( + False, + f"文件移动失败: {str(e)}", + { + 'source': str(src_path), + 'destination': str(dst_path) + } + ) - def list_files(self, dir_path, recursive=False, pattern='*'): + def zip_files(self, + file_paths: List[Union[str, Path]], + zip_path: Union[str, Path]) -> Dict[str, Any]: """ - 列出目录中的文件 - :param dir_path: 目录路径 - :param recursive: 是否递归查找 - :param pattern: 文件匹配模式 - :return: DataFrame({'file_path': str, 'file_name': str, 'extension': str}) - """ - dir_path = self._resolve_path(dir_path) - try: - if recursive: - files = [str(f) for f in Path(dir_path).rglob(pattern) if f.is_file()] - else: - files = [str(f) for f in Path(dir_path).glob(pattern) if f.is_file()] - - result = [] - for f in files: - p = Path(f) - result.append({ - 'file_path': str(p), - 'file_name': p.name, - 'extension': p.suffix.lower().lstrip('.') - }) - - df = self._to_dataframe(result) - self.log.trace("列出目录文件 | path={} recursive={} count={}", - dir_path, recursive, len(df)) - return df - except Exception as e: - self.log.error("列出文件失败 | path={} error={}", dir_path, str(e)) - raise - - def list_dirs(self, dir_path, recursive=False): - """ - 列出目录中的子目录 - :param dir_path: 目录路径 - :param recursive: 是否递归查找 - :return: DataFrame({'dir_path': str, 'dir_name': str}) - """ - dir_path = self._resolve_path(dir_path) - try: - if recursive: - dirs = [str(d) for d in Path(dir_path).rglob('*') if d.is_dir()] - else: - dirs = [str(d) for d in Path(dir_path).glob('*') if d.is_dir()] - - result = [{'dir_path': d, 'dir_name': Path(d).name} for d in dirs] - df = self._to_dataframe(result) - self.log.trace("列出子目录 | path={} recursive={} count={}", - dir_path, recursive, len(df)) - return df - except Exception as e: - self.log.error("列出目录失败 | path={} error={}", dir_path, str(e)) - raise - - def get_file_size(self, file_path): - """ - 获取文件大小(字节) - :param file_path: 文件路径 - :return: DataFrame({'file_path': str, 'size_bytes': int, 'size_mb': float}) - """ - file_path = self._resolve_path(file_path) - try: - size_bytes = os.path.getsize(file_path) - result = { - 'file_path': str(file_path), - 'size_bytes': size_bytes, - 'size_mb': round(size_bytes / 1024 / 1024, 4) - } - df = self._to_dataframe(result) - self.log.trace("获取文件大小 | path={} size={} bytes", file_path, size_bytes) - return df - except Exception as e: - self.log.error("获取文件大小失败 | path={} error={}", file_path, str(e)) - raise - - def get_file_modified_time(self, file_path): - """ - 获取文件修改时间 - :param file_path: 文件路径 - :return: DataFrame({'file_path': str, 'modified_time': datetime, 'timestamp': float}) - """ - file_path = self._resolve_path(file_path) - try: - mtime = datetime.fromtimestamp(os.path.getmtime(file_path)) - result = { - 'file_path': str(file_path), - 'modified_time': mtime, - 'timestamp': mtime.timestamp() - } - df = self._to_dataframe(result) - self.log.trace("获取文件修改时间 | path={} mtime={}", - file_path, mtime.isoformat()) - return df - except Exception as e: - self.log.error("获取文件修改时间失败 | path={} error={}", - file_path, str(e)) - raise - - def zip_files(self, file_paths, zip_path): - """ - 压缩多个文件到zip - :param file_paths: 要压缩的文件路径列表或DataFrame - :param zip_path: 压缩文件路径 - :return: DataFrame({'zipped': bool, 'zip_path': str, 'file_count': int}) + 压缩多个文件到zip(跨平台兼容) + :param file_paths: 要压缩的文件路径列表 + :param zip_path: 目标zip文件路径 + :return: 包含zip_path和file_count字段的结果字典 """ zip_path = self._resolve_path(zip_path) - - # 处理输入可以是DataFrame或列表 - if isinstance(file_paths, pd.DataFrame): - if 'file_path' in file_paths.columns: - file_list = file_paths['file_path'].tolist() - else: - file_list = file_paths.iloc[:, 0].tolist() - else: - file_list = file_paths - try: - self.create_dir(os.path.dirname(zip_path)) - file_count = 0 + # 确保目标目录存在 + self.create_dir(zip_path.parent) + with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: - for file_path in file_list: + file_count = 0 + for file_path in file_paths: file_path = self._resolve_path(file_path) - if self.file_exists(file_path).iloc[0]['exists']: - zipf.write(file_path, os.path.basename(file_path)) + if file_path.exists(): + zipf.write(file_path, file_path.name) file_count += 1 - result = { - 'zipped': True, - 'zip_path': str(zip_path), - 'file_count': file_count - } - self.log.info("文件压缩成功 | zip={} files={}", zip_path, file_count) - return self._to_dataframe(result) + return self._format_result( + True, + "文件压缩成功", + { + 'zip_path': str(zip_path), + 'file_count': file_count, + 'zip_size': os.path.getsize(zip_path) + } + ) except Exception as e: - self.log.error("文件压缩失败 | zip={} error={}", zip_path, str(e)) - raise + return self._format_result( + False, + f"文件压缩失败: {str(e)}", + { + 'zip_path': str(zip_path) + } + ) - def zip_dir(self, dir_path, zip_path): + def unzip(self, + zip_path: Union[str, Path], + extract_to: Optional[Union[str, Path]] = None) -> Dict[str, Any]: """ - 压缩整个目录到zip - :param dir_path: 要压缩的目录路径 - :param zip_path: 压缩文件路径 - :return: DataFrame({'zipped': bool, 'zip_path': str, 'dir_path': str, 'file_count': int}) - """ - dir_path = self._resolve_path(dir_path) - zip_path = self._resolve_path(zip_path) - try: - self.create_dir(os.path.dirname(zip_path)) - file_count = 0 - with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: - for root, dirs, files in os.walk(dir_path): - for file in files: - file_path = os.path.join(root, file) - arcname = os.path.relpath(file_path, dir_path) - zipf.write(file_path, arcname) - file_count += 1 - - result = { - 'zipped': True, - 'zip_path': str(zip_path), - 'dir_path': str(dir_path), - 'file_count': file_count - } - self.log.info("目录压缩成功 | zip={} dir={} files={}", - zip_path, dir_path, file_count) - return self._to_dataframe(result) - except Exception as e: - self.log.error("目录压缩失败 | zip={} error={}", zip_path, str(e)) - raise - - def unzip(self, zip_path, extract_to=None): - """ - 解压zip文件 - :param zip_path: zip文件路径 - :param extract_to: 解压目标目录,默认为zip文件所在目录 - :return: DataFrame({'unzipped': bool, 'zip_path': str, 'extract_to': str, 'file_count': int}) + 解压zip文件(跨平台兼容) + :param extract_to: 解压目标目录(默认为zip文件所在目录) + :return: 包含extract_to和file_count字段的结果字典 """ zip_path = self._resolve_path(zip_path) - if extract_to is None: - extract_to = os.path.dirname(zip_path) - else: - extract_to = self._resolve_path(extract_to) + extract_to = self._resolve_path(extract_to) if extract_to else zip_path.parent try: + if not zip_path.exists(): + return self._format_result( + False, + "ZIP文件不存在", + { + 'zip_path': str(zip_path), + 'extract_to': str(extract_to) + } + ) + + # 确保目标目录存在 self.create_dir(extract_to) + with zipfile.ZipFile(zip_path, 'r') as zipf: file_list = zipf.namelist() zipf.extractall(extract_to) - result = { - 'unzipped': True, - 'zip_path': str(zip_path), - 'extract_to': str(extract_to), - 'file_count': len(file_list) - } - self.log.info("文件解压成功 | zip={} extract_to={} files={}", - zip_path, extract_to, len(file_list)) - return self._to_dataframe(result) - except Exception as e: - self.log.error("文件解压失败 | zip={} error={}", zip_path, str(e)) - raise - - def compress_large_log(self, log_path, max_size_mb=20): - """ - 压缩过大的日志文件 - :param log_path: 日志文件路径 - :param max_size_mb: 最大大小(MB),超过则压缩 - :return: DataFrame({'compressed': bool, 'original_path': str, 'zip_path': str, 'original_size_mb': float}) - """ - log_path = self._resolve_path(log_path) - if not self.file_exists(log_path).iloc[0]['exists']: - return self._to_dataframe({'compressed': [False]}) - - max_size_bytes = max_size_mb * 1024 * 1024 - size_info = self.get_file_size(log_path) - current_size = size_info.iloc[0]['size_bytes'] - - if current_size > max_size_bytes: - try: - timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') - zip_path = f"{log_path}_{timestamp}.zip" - self.zip_files([log_path], zip_path) - self.delete_file(log_path) - - result = { - 'compressed': True, - 'original_path': str(log_path), - 'zip_path': zip_path, - 'original_size_mb': round(current_size/1024/1024, 2) + return self._format_result( + True, + "文件解压成功", + { + 'extract_to': str(extract_to), + 'file_count': len(file_list) } - self.log.info("日志文件压缩 | original={} compressed={} original_size={} MB", - log_path, zip_path, result['original_size_mb']) - return self._to_dataframe(result) - except Exception as e: - self.log.error("日志压缩失败 | path={} error={}", log_path, str(e)) - raise + ) + except Exception as e: + return self._format_result( + False, + f"文件解压失败: {str(e)}", + { + 'zip_path': str(zip_path), + 'extract_to': str(extract_to) + } + ) - return self._to_dataframe({'compressed': [False]}) - def get_file_extension(self, file_path): - """ - 获取文件扩展名 - :param file_path: 文件路径 - :return: 文件扩展名字符串(小写,不带点) - """ - file_path = self._resolve_path(file_path) - ext = Path(file_path).suffix.lower().lstrip('.') - self.log.trace("获取文件扩展名 | path={} ext={}", file_path, ext) - return ext # 直接返回字符串而不是DataFrame +# ---------------------------- 测试用例 ---------------------------- +if __name__ == "__main__": + # 初始化处理器(自动处理跨平台路径) + handler = FileHandler("test_data") - def change_file_extension(self, file_path, new_extension): - """ - 修改文件扩展名 - :param file_path: 文件路径 - :param new_extension: 新扩展名(不带点) - :return: DataFrame({'original_path': str, 'new_path': str}) - """ - file_path = self._resolve_path(file_path) - new_path = str(Path(file_path).with_suffix(f'.{new_extension}')) - result = {'original_path': str(file_path), 'new_path': new_path} - self.log.debug("修改文件扩展名 | original={} new={}", file_path, new_path) - return self._to_dataframe(result) + # 测试路径标准化 + test_paths = [ + "normal/path", + "windows\\style\\path", + "mixed/path\\with\\both" + ] - def join_path(self, *paths): - """ - 拼接路径 - :param paths: 多个路径部分 - :return: DataFrame({'joined_path': str}) - """ - joined_path = str(Path(*paths)) - self.log.trace("路径拼接 | parts={} result={}", paths, joined_path) - return self._to_dataframe({'joined_path': [joined_path]}) \ No newline at end of file + print("=== 路径标准化测试 ===") + for path in test_paths: + resolved = handler._resolve_path(path) + print(f"原始路径: {path} -> 标准化: {resolved} (类型: {type(resolved)})") + + # 测试目录操作 + print("\n=== 目录操作测试 ===") + dir_result = handler.create_dir("test_dir") + print(dir_result) + + # 测试文件操作 + print("\n=== 文件操作测试 ===") + test_data = [{"name": "Alice", "age": 25}, {"name": "Bob", "age": 30}] + write_result = handler.write_file("test_dir/data.json", test_data) + print(write_result) + + # 测试文件读取 + try: + df = handler.read_file("test_dir/data.json") + print("\n读取文件内容:") + print(df) + except Exception as e: + print(f"\n文件读取失败: {str(e)}") + + # 测试列表文件 + print("\n=== 文件列表测试 ===") + list_result = handler.list_files("test_dir") + print(list_result) + + # 测试压缩解压 + print("\n=== 压缩解压测试 ===") + zip_result = handler.zip_files( + ["test_dir/data.json"], + "test_archive.zip" + ) + print(zip_result) + + unzip_result = handler.unzip( + "test_archive.zip", + "extracted_files" + ) + print(unzip_result) + + # 清理测试数据 + print("\n=== 清理测试数据 ===") + print(handler.delete_file("test_dir/data.json")) + print(handler.delete_dir("test_dir")) + print(handler.delete_file("test_archive.zip")) + print(handler.delete_dir("extracted_files")) \ No newline at end of file