通用文件读取更新

This commit is contained in:
2025-08-06 14:50:27 +08:00
parent 196df754bc
commit aa0b71a90b
3 changed files with 470 additions and 442 deletions
+1
View File
@@ -0,0 +1 @@
{"a":{"0":1},"b":{"0":2}}
+95 -9
View File
@@ -79,13 +79,99 @@ def test_read_write_excel(file_handler, temp_dir, sample_dataframe):
df = file_handler.read_file(test_file)
assert df.shape == (3, 3)
def test_read_write_text(file_handler, sample_text_file):
"""测试文本文件读写"""
# 测试读取
df = file_handler.read_file(sample_text_file)
assert df.shape == (1, 1) # 默认单行读取
def test_read_write_csv(file_handler, temp_dir, sample_dataframe):
"""测试CSV文件读写"""
test_file = temp_dir / "test.csv"
# 测试按行读取
lines_df = file_handler.read_lines(sample_text_file)
assert lines_df.shape == (3, 1)
assert lines_df.iloc[0, 0] == "line1"
# 测试写入
write_result = file_handler.write_file(test_file, sample_dataframe)
# 修改断言方式
assert bool(write_result.iloc[0]['success']) == True # 使用bool()转换
# 或者
assert write_result.iloc[0]['success'] == True # 使用值比较
assert os.path.exists(test_file)
# 测试读取
df = file_handler.read_file(test_file)
assert df.shape == (3, 3)
assert list(df.columns) == ['id', 'name', 'value']
# 文件操作测试
def test_file_operations(file_handler, sample_text_file):
"""测试文件存在检查、删除等操作"""
# 测试文件存在检查
exists_df = file_handler.file_exists(sample_text_file)
assert exists_df.iloc[0]['exists'] == True
# 测试获取文件大小
size_df = file_handler.get_file_size(sample_text_file)
assert size_df.iloc[0]['size_bytes'] > 0
# 测试获取修改时间
mtime_df = file_handler.get_file_modified_time(sample_text_file)
assert isinstance(mtime_df.iloc[0]['modified_time'], datetime)
# 测试删除文件
delete_df = file_handler.delete_file(sample_text_file)
assert delete_df.iloc[0]['deleted'] == True
assert not os.path.exists(sample_text_file)
def test_directory_operations(file_handler, temp_dir):
"""测试目录操作"""
test_dir = temp_dir / "subdir"
# 测试创建目录
create_df = file_handler.create_dir(test_dir)
assert create_df.iloc[0]['created'] == True
assert os.path.isdir(test_dir)
# 测试列出目录
list_df = file_handler.list_dirs(temp_dir)
assert any("subdir" in d for d in list_df['dir_name'].values)
# 测试删除目录
delete_df = file_handler.delete_dir(test_dir)
assert delete_df.iloc[0]['deleted'] == True
assert not os.path.exists(test_dir)
# 文件压缩
def test_zip_operations(file_handler, temp_dir, sample_dataframe):
"""测试文件压缩解压"""
# 创建测试文件
test_file1 = temp_dir / "file1.txt"
test_file2 = temp_dir / "file2.csv"
file_handler.write_file(test_file1, "test content")
file_handler.write_file(test_file2, sample_dataframe)
# 测试压缩文件
zip_path = temp_dir / "test.zip"
zip_result = file_handler.zip_files([test_file1, test_file2], zip_path)
assert zip_result.iloc[0]['zipped'] == True
assert zip_result.iloc[0]['file_count'] == 2
# 测试解压
extract_dir = temp_dir / "extracted"
unzip_result = file_handler.unzip(zip_path, extract_dir)
assert unzip_result.iloc[0]['unzipped'] is True
assert os.path.exists(extract_dir / "file1.txt")
assert os.path.exists(extract_dir / "file2.csv")
def test_zip_directory(file_handler, temp_dir):
"""测试目录压缩"""
# 创建测试目录结构
test_dir = temp_dir / "test_dir"
sub_dir = test_dir / "sub"
sub_dir.mkdir(parents=True)
(test_dir / "file1.txt").write_text("content1")
(sub_dir / "file2.txt").write_text("content2")
# 测试压缩目录
zip_path = temp_dir / "dir.zip"
zip_result = file_handler.zip_dir(test_dir, zip_path)
assert zip_result.iloc[0]['zipped'] == True
assert zip_result.iloc[0]['file_count'] == 2
+357 -416
View File
@@ -3,51 +3,67 @@ import shutil
import zipfile
import pandas as pd
from datetime import datetime
from pathlib import Path
from pathlib import Path, PurePath
from typing import Union, Optional, List, Dict, Any
from utils.logger import log
class FileHandler:
"""
通用文件操作工具类(所有输入输出均为DataFrame格式
功能包括:文件读写、目录操作、文件压缩、路径处理等
跨平台文件操作工具类(兼容Windows/macOS/Linux
功能规范:
- 读取文件内容的方法返回DataFrame
- 其他所有方法返回统一格式字典:
{
'success': bool, # 操作是否成功
'message': str, # 操作结果描述
'data': Any # 操作返回的数据(可选)
}
"""
def __init__(self, base_path=None):
def __init__(self, base_path: Optional[Union[str, Path]] = None):
"""
初始化文件处理器
:param base_path: 基础路径,所有操作将基于此路径
:param base_path: 基础路径(自动处理跨平台路径格式)
"""
self.base_path = Path(base_path) if base_path else None
self.base_path = self._normalize_path(base_path) if base_path else None
self.log = log.bind(module=self.__class__.__name__)
def _resolve_path(self, path):
"""解析路径,处理相对路径和绝对路径"""
path = Path(path)
def _normalize_path(self, path: Union[str, Path]) -> Path:
"""统一转换为跨平台Path对象"""
return Path(str(path).replace('\\', '/'))
def _resolve_path(self, path: Union[str, Path]) -> Path:
"""解析路径(自动处理跨平台路径)"""
path = self._normalize_path(path)
if not path.is_absolute() and self.base_path:
return self.base_path / path
return self._normalize_path(self.base_path / path)
return path
def _to_dataframe(self, data, columns=None):
"""将数据转换为DataFrame格式"""
if isinstance(data, pd.DataFrame):
return data
if isinstance(data, dict):
return pd.DataFrame([data])
if isinstance(data, list):
return pd.DataFrame(data, columns=columns) if columns else pd.DataFrame(data)
return pd.DataFrame([{'value': data}])
def _format_result(self,
success: bool,
message: str = "",
data: Optional[Any] = None) -> Dict[str, Any]:
"""统一返回结果格式"""
return {
'success': bool(success),
'message': str(message),
'data': data
}
def read_file(self, file_path, encoding='utf-8', **kwargs):
def read_file(self,
file_path: Union[str, Path],
encoding: str = 'utf-8',
**kwargs) -> pd.DataFrame:
"""
读取文件内容为DataFrame
:param file_path: 文件路径
:param encoding: 文件编码
:param kwargs: pandas.read_* 方法的其他参数
:return: DataFrame
读取文件内容为DataFrame(跨平台兼容)
:param file_path: 文件路径(自动处理跨平台格式)
:param encoding: 文件编码(默认utf-8
:return: 包含文件内容的DataFrame
:raises: 文件读取失败时抛出原始异常
"""
file_path = self._resolve_path(file_path)
try:
ext = self.get_file_extension(file_path).lower()
ext = self.get_file_extension(file_path)
if ext in ['csv', 'txt']:
df = pd.read_csv(file_path, encoding=encoding, **kwargs)
@@ -58,33 +74,42 @@ class FileHandler:
elif ext == 'parquet':
df = pd.read_parquet(file_path, **kwargs)
else:
# 默认按文本文件处理
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
df = self._to_dataframe({'content': content})
return pd.DataFrame({'content': [f.read()]})
self.log.debug("文件读取成功 | path={} shape={}", file_path, df.shape)
self.log.debug(f"文件读取成功 | path={file_path} shape={df.shape}")
return df
except Exception as e:
self.log.error("文件读取失败 | path={} error={}", file_path, str(e))
self.log.error(f"文件读取失败 | path={file_path} error={str(e)}")
raise
def write_file(self, file_path, data, encoding='utf-8', **kwargs):
def write_file(self,
file_path: Union[str, Path],
data: Union[pd.DataFrame, Dict, List],
encoding: str = 'utf-8',
**kwargs) -> Dict[str, Any]:
"""
将DataFrame写入文件
:param file_path: 文件路径
:param data: 要写入的DataFrame数据
:param encoding: 文件编码
:param kwargs: pandas.to_* 方法的其他参数
:return: DataFrame({'success': bool, 'file_path': str, 'file_size': int})
写入文件(跨平台兼容)
:param file_path: 目标文件路径
:param data: 要写入的数据(支持DataFrame/dict/list
:param encoding: 文件编码(默认utf-8
:return: 操作结果字典
"""
file_path = self._resolve_path(file_path)
df = self._to_dataframe(data)
try:
self.create_dir(os.path.dirname(file_path))
ext = self.get_file_extension(file_path) # 现在返回的是字符串
# 自动创建父目录
parent_dir = file_path.parent
if not parent_dir.exists():
self.create_dir(parent_dir)
# 统一数据格式
if isinstance(data, pd.DataFrame):
df = data
else:
df = pd.DataFrame(data if isinstance(data, list) else [data])
# 根据扩展名选择写入方式
ext = self.get_file_extension(file_path)
if ext in ['csv', 'txt']:
df.to_csv(file_path, encoding=encoding, index=False, **kwargs)
elif ext in ['xls', 'xlsx']:
@@ -94,459 +119,375 @@ class FileHandler:
elif ext == 'parquet':
df.to_parquet(file_path, **kwargs)
else:
# 默认按文本文件处理
content = df.to_string(index=False)
with open(file_path, 'w', encoding=encoding) as f:
f.write(content)
f.write(str(data))
file_size = os.path.getsize(file_path)
result = {
'success': True,
# 返回成功结果
return self._format_result(
True,
"文件写入成功",
{
'file_path': str(file_path),
'file_size': file_size
'file_size': os.path.getsize(file_path)
}
self.log.debug("文件写入成功 | path={} size={} bytes", file_path, file_size)
return self._to_dataframe(result)
)
except Exception as e:
self.log.error("文件写入失败 | path={} error={}", file_path, str(e))
raise
return self._format_result(
False,
f"文件写入失败: {str(e)}",
{'file_path': str(file_path)}
)
def read_lines(self, file_path, encoding='utf-8', columns=['line_content']):
def file_exists(self, file_path: Union[str, Path]) -> Dict[str, Any]:
"""
按行读取文件内容为DataFrame
:param file_path: 文件路径
:param encoding: 文件编码
:param columns: 列名列表
:return: DataFrame
检查文件是否存在(跨平台兼容)
:return: 包含exists字段的结果字典
"""
file_path = self._resolve_path(file_path)
try:
with open(file_path, 'r', encoding=encoding) as f:
lines = f.readlines()
exists = file_path.is_file()
msg = f"文件{'' if exists else ''}存在: {file_path}"
return self._format_result(True, msg, {'exists': exists})
df = self._to_dataframe(lines, columns=columns)
self.log.debug("文件按行读取成功 | path={} lines={}", file_path, len(df))
return df
except Exception as e:
self.log.error("文件按行读取失败 | path={} error={}", file_path, str(e))
raise
def write_lines(self, file_path, data, encoding='utf-8', line_column=None):
def dir_exists(self, dir_path: Union[str, Path]) -> Dict[str, Any]:
"""
将DataFrame按行写入文件
:param file_path: 文件路径
:param data: 要写入的DataFrame数据
:param encoding: 文件编码
:param line_column: 指定作为行内容的列名
"""
file_path = self._resolve_path(file_path)
df = self._to_dataframe(data)
try:
self.create_dir(os.path.dirname(file_path))
if line_column and line_column in df.columns:
lines = df[line_column].tolist()
else:
lines = df.to_string(index=False, header=False).split('\n')
with open(file_path, 'w', encoding=encoding) as f:
f.writelines([line + '\n' for line in lines])
self.log.debug("文件按行写入成功 | path={} lines={}", file_path, len(lines))
except Exception as e:
self.log.error("文件按行写入失败 | path={} error={}", file_path, str(e))
raise
def file_exists(self, file_path):
"""
检查文件是否存在
:param file_path: 文件路径
:return: DataFrame({'exists': bool})
"""
file_path = self._resolve_path(file_path)
exists = os.path.isfile(file_path)
self.log.trace("文件存在检查 | path={} exists={}", file_path, exists)
return self._to_dataframe({'exists': [exists]})
def dir_exists(self, dir_path):
"""
检查目录是否存在
:param dir_path: 目录路径
:return: DataFrame({'exists': bool})
检查目录是否存在(跨平台兼容)
:return: 包含exists字段的结果字典
"""
dir_path = self._resolve_path(dir_path)
exists = os.path.isdir(dir_path)
self.log.trace("目录存在检查 | path={} exists={}", dir_path, exists)
return self._to_dataframe({'exists': [exists]})
exists = dir_path.is_dir()
msg = f"目录{'' if exists else ''}存在: {dir_path}"
return self._format_result(True, msg, {'exists': exists})
def create_dir(self, dir_path):
def create_dir(self, dir_path: Union[str, Path]) -> Dict[str, Any]:
"""
创建目录(包括父目录
:param dir_path: 目录路径
:return: DataFrame({'created': bool, 'path': str})
创建目录(跨平台兼容
:return: 包含path字段的结果字典
"""
dir_path = self._resolve_path(dir_path)
try:
os.makedirs(dir_path, exist_ok=True)
self.log.debug("目录创建成功 | path={}", dir_path)
return self._to_dataframe({'created': [True], 'path': [str(dir_path)]})
except Exception as e:
self.log.error("目录创建失败 | path={} error={}", dir_path, str(e))
raise
dir_path.mkdir(parents=True, exist_ok=True)
def delete_file(self, file_path):
# Windows系统需要额外设置权限
if os.name == 'nt':
try:
os.chmod(dir_path, 0o777)
except:
pass
return self._format_result(True, "目录创建成功", {'path': str(dir_path)})
except Exception as e:
return self._format_result(False, f"目录创建失败: {str(e)}", {'path': str(dir_path)})
def delete_file(self, file_path: Union[str, Path]) -> Dict[str, Any]:
"""
删除文件
:param file_path: 文件路径
:return: DataFrame({'deleted': bool, 'path': str})
删除文件(跨平台兼容)
:return: 包含path字段的结果字典
"""
file_path = self._resolve_path(file_path)
try:
exists = self.file_exists(file_path).iloc[0]['exists']
if exists:
os.remove(file_path)
self.log.debug("文件删除成功 | path={}", file_path)
return self._to_dataframe({'deleted': [True], 'path': [str(file_path)]})
return self._to_dataframe({'deleted': [False], 'path': [str(file_path)]})
except Exception as e:
self.log.error("文件删除失败 | path={} error={}", file_path, str(e))
raise
if not file_path.exists():
return self._format_result(False, "文件不存在", {'path': str(file_path)})
def delete_dir(self, dir_path):
file_path.unlink()
return self._format_result(True, "文件删除成功", {'path': str(file_path)})
except Exception as e:
return self._format_result(False, f"文件删除失败: {str(e)}", {'path': str(file_path)})
def delete_dir(self, dir_path: Union[str, Path]) -> Dict[str, Any]:
"""
删除目录及其内容
:param dir_path: 目录路径
:return: DataFrame({'deleted': bool, 'path': str})
删除目录及其内容(跨平台兼容)
:return: 包含path字段的结果字典
"""
dir_path = self._resolve_path(dir_path)
try:
exists = self.dir_exists(dir_path).iloc[0]['exists']
if exists:
if not dir_path.exists():
return self._format_result(False, "目录不存在", {'path': str(dir_path)})
shutil.rmtree(dir_path)
self.log.debug("目录删除成功 | path={}", dir_path)
return self._to_dataframe({'deleted': [True], 'path': [str(dir_path)]})
return self._to_dataframe({'deleted': [False], 'path': [str(dir_path)]})
return self._format_result(True, "目录删除成功", {'path': str(dir_path)})
except Exception as e:
self.log.error("目录删除失败 | path={} error={}", dir_path, str(e))
raise
return self._format_result(False, f"目录删除失败: {str(e)}", {'path': str(dir_path)})
def copy_file(self, src_path, dst_path):
def list_files(self,
dir_path: Union[str, Path],
recursive: bool = False,
pattern: str = '*') -> Dict[str, Any]:
"""
复制文件
:param src_path: 源文件路径
:param dst_path: 目标文件路径
:return: DataFrame({'copied': bool, 'source': str, 'destination': str})
列出目录中的文件(跨平台兼容)
:param recursive: 是否递归查找
:param pattern: 文件匹配模式(如*.txt
:return: 包含files字段的结果字典
"""
dir_path = self._resolve_path(dir_path)
try:
if recursive:
files = list(dir_path.rglob(pattern))
else:
files = list(dir_path.glob(pattern))
file_info = [
{
'path': str(f),
'name': f.name,
'size': f.stat().st_size,
'modified': datetime.fromtimestamp(f.stat().st_mtime).isoformat(),
'is_dir': f.is_dir()
} for f in files if f.is_file() # 只返回文件,不包括目录
]
return self._format_result(
True,
f"找到 {len(file_info)} 个文件",
{'files': file_info}
)
except Exception as e:
return self._format_result(
False,
f"列出文件失败: {str(e)}",
{'files': []}
)
def get_file_extension(self, file_path: Union[str, Path]) -> str:
"""
获取文件扩展名(跨平台兼容)
:return: 小写且不带点的扩展名(如 'jpg'
"""
file_path = self._resolve_path(file_path)
ext = file_path.suffix.lower().lstrip('.')
self.log.trace(f"获取文件扩展名 | path={file_path} ext={ext}")
return ext
def copy_file(self,
src_path: Union[str, Path],
dst_path: Union[str, Path]) -> Dict[str, Any]:
"""
复制文件(跨平台兼容)
:return: 包含source和destination字段的结果字典
"""
src_path = self._resolve_path(src_path)
dst_path = self._resolve_path(dst_path)
try:
self.create_dir(os.path.dirname(dst_path))
if not src_path.exists():
return self._format_result(
False,
"源文件不存在",
{
'source': str(src_path),
'destination': str(dst_path)
}
)
# 确保目标目录存在
self.create_dir(dst_path.parent)
shutil.copy2(src_path, dst_path)
self.log.debug("文件复制成功 | src={} dst={}", src_path, dst_path)
return self._to_dataframe({
'copied': [True],
'source': [str(src_path)],
'destination': [str(dst_path)]
})
return self._format_result(
True,
"文件复制成功",
{
'source': str(src_path),
'destination': str(dst_path),
'file_size': dst_path.stat().st_size
}
)
except Exception as e:
self.log.error("文件复制失败 | src={} dst={} error={}",
src_path, dst_path, str(e))
raise
return self._format_result(
False,
f"文件复制失败: {str(e)}",
{
'source': str(src_path),
'destination': str(dst_path)
}
)
def move_file(self, src_path, dst_path):
def move_file(self,
src_path: Union[str, Path],
dst_path: Union[str, Path]) -> Dict[str, Any]:
"""
移动/重命名文件
:param src_path: 源文件路径
:param dst_path: 目标文件路径
:return: DataFrame({'moved': bool, 'source': str, 'destination': str})
移动/重命名文件(跨平台兼容)
:return: 包含source和destination字段的结果字典
"""
src_path = self._resolve_path(src_path)
dst_path = self._resolve_path(dst_path)
try:
self.create_dir(os.path.dirname(dst_path))
if not src_path.exists():
return self._format_result(
False,
"源文件不存在",
{
'source': str(src_path),
'destination': str(dst_path)
}
)
# 确保目标目录存在
self.create_dir(dst_path.parent)
shutil.move(src_path, dst_path)
self.log.debug("文件移动成功 | src={} dst={}", src_path, dst_path)
return self._to_dataframe({
'moved': [True],
'source': [str(src_path)],
'destination': [str(dst_path)]
})
except Exception as e:
self.log.error("文件移动失败 | src={} dst={} error={}",
src_path, dst_path, str(e))
raise
def list_files(self, dir_path, recursive=False, pattern='*'):
"""
列出目录中的文件
:param dir_path: 目录路径
:param recursive: 是否递归查找
:param pattern: 文件匹配模式
:return: DataFrame({'file_path': str, 'file_name': str, 'extension': str})
"""
dir_path = self._resolve_path(dir_path)
try:
if recursive:
files = [str(f) for f in Path(dir_path).rglob(pattern) if f.is_file()]
else:
files = [str(f) for f in Path(dir_path).glob(pattern) if f.is_file()]
result = []
for f in files:
p = Path(f)
result.append({
'file_path': str(p),
'file_name': p.name,
'extension': p.suffix.lower().lstrip('.')
})
df = self._to_dataframe(result)
self.log.trace("列出目录文件 | path={} recursive={} count={}",
dir_path, recursive, len(df))
return df
except Exception as e:
self.log.error("列出文件失败 | path={} error={}", dir_path, str(e))
raise
def list_dirs(self, dir_path, recursive=False):
"""
列出目录中的子目录
:param dir_path: 目录路径
:param recursive: 是否递归查找
:return: DataFrame({'dir_path': str, 'dir_name': str})
"""
dir_path = self._resolve_path(dir_path)
try:
if recursive:
dirs = [str(d) for d in Path(dir_path).rglob('*') if d.is_dir()]
else:
dirs = [str(d) for d in Path(dir_path).glob('*') if d.is_dir()]
result = [{'dir_path': d, 'dir_name': Path(d).name} for d in dirs]
df = self._to_dataframe(result)
self.log.trace("列出子目录 | path={} recursive={} count={}",
dir_path, recursive, len(df))
return df
except Exception as e:
self.log.error("列出目录失败 | path={} error={}", dir_path, str(e))
raise
def get_file_size(self, file_path):
"""
获取文件大小(字节)
:param file_path: 文件路径
:return: DataFrame({'file_path': str, 'size_bytes': int, 'size_mb': float})
"""
file_path = self._resolve_path(file_path)
try:
size_bytes = os.path.getsize(file_path)
result = {
'file_path': str(file_path),
'size_bytes': size_bytes,
'size_mb': round(size_bytes / 1024 / 1024, 4)
return self._format_result(
True,
"文件移动成功",
{
'source': str(src_path),
'destination': str(dst_path)
}
df = self._to_dataframe(result)
self.log.trace("获取文件大小 | path={} size={} bytes", file_path, size_bytes)
return df
)
except Exception as e:
self.log.error("获取文件大小失败 | path={} error={}", file_path, str(e))
raise
def get_file_modified_time(self, file_path):
"""
获取文件修改时间
:param file_path: 文件路径
:return: DataFrame({'file_path': str, 'modified_time': datetime, 'timestamp': float})
"""
file_path = self._resolve_path(file_path)
try:
mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
result = {
'file_path': str(file_path),
'modified_time': mtime,
'timestamp': mtime.timestamp()
return self._format_result(
False,
f"文件移动失败: {str(e)}",
{
'source': str(src_path),
'destination': str(dst_path)
}
df = self._to_dataframe(result)
self.log.trace("获取文件修改时间 | path={} mtime={}",
file_path, mtime.isoformat())
return df
except Exception as e:
self.log.error("获取文件修改时间失败 | path={} error={}",
file_path, str(e))
raise
)
def zip_files(self, file_paths, zip_path):
def zip_files(self,
file_paths: List[Union[str, Path]],
zip_path: Union[str, Path]) -> Dict[str, Any]:
"""
压缩多个文件到zip
:param file_paths: 要压缩的文件路径列表或DataFrame
:param zip_path: 压缩文件路径
:return: DataFrame({'zipped': bool, 'zip_path': str, 'file_count': int})
压缩多个文件到zip(跨平台兼容)
:param file_paths: 要压缩的文件路径列表
:param zip_path: 目标zip文件路径
:return: 包含zip_path和file_count字段的结果字典
"""
zip_path = self._resolve_path(zip_path)
# 处理输入可以是DataFrame或列表
if isinstance(file_paths, pd.DataFrame):
if 'file_path' in file_paths.columns:
file_list = file_paths['file_path'].tolist()
else:
file_list = file_paths.iloc[:, 0].tolist()
else:
file_list = file_paths
try:
self.create_dir(os.path.dirname(zip_path))
file_count = 0
# 确保目标目录存在
self.create_dir(zip_path.parent)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in file_list:
file_count = 0
for file_path in file_paths:
file_path = self._resolve_path(file_path)
if self.file_exists(file_path).iloc[0]['exists']:
zipf.write(file_path, os.path.basename(file_path))
if file_path.exists():
zipf.write(file_path, file_path.name)
file_count += 1
result = {
'zipped': True,
return self._format_result(
True,
"文件压缩成功",
{
'zip_path': str(zip_path),
'file_count': file_count
'file_count': file_count,
'zip_size': os.path.getsize(zip_path)
}
self.log.info("文件压缩成功 | zip={} files={}", zip_path, file_count)
return self._to_dataframe(result)
)
except Exception as e:
self.log.error("文件压缩失败 | zip={} error={}", zip_path, str(e))
raise
def zip_dir(self, dir_path, zip_path):
"""
压缩整个目录到zip
:param dir_path: 要压缩的目录路径
:param zip_path: 压缩文件路径
:return: DataFrame({'zipped': bool, 'zip_path': str, 'dir_path': str, 'file_count': int})
"""
dir_path = self._resolve_path(dir_path)
zip_path = self._resolve_path(zip_path)
try:
self.create_dir(os.path.dirname(zip_path))
file_count = 0
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(dir_path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, dir_path)
zipf.write(file_path, arcname)
file_count += 1
result = {
'zipped': True,
'zip_path': str(zip_path),
'dir_path': str(dir_path),
'file_count': file_count
return self._format_result(
False,
f"文件压缩失败: {str(e)}",
{
'zip_path': str(zip_path)
}
self.log.info("目录压缩成功 | zip={} dir={} files={}",
zip_path, dir_path, file_count)
return self._to_dataframe(result)
except Exception as e:
self.log.error("目录压缩失败 | zip={} error={}", zip_path, str(e))
raise
)
def unzip(self, zip_path, extract_to=None):
def unzip(self,
zip_path: Union[str, Path],
extract_to: Optional[Union[str, Path]] = None) -> Dict[str, Any]:
"""
解压zip文件
:param zip_path: zip文件路径
:param extract_to: 解压目标目录,默认为zip文件所在目录
:return: DataFrame({'unzipped': bool, 'zip_path': str, 'extract_to': str, 'file_count': int})
解压zip文件(跨平台兼容)
:param extract_to: 解压目标目录(默认为zip文件所在目录)
:return: 包含extract_to和file_count字段的结果字典
"""
zip_path = self._resolve_path(zip_path)
if extract_to is None:
extract_to = os.path.dirname(zip_path)
else:
extract_to = self._resolve_path(extract_to)
extract_to = self._resolve_path(extract_to) if extract_to else zip_path.parent
try:
if not zip_path.exists():
return self._format_result(
False,
"ZIP文件不存在",
{
'zip_path': str(zip_path),
'extract_to': str(extract_to)
}
)
# 确保目标目录存在
self.create_dir(extract_to)
with zipfile.ZipFile(zip_path, 'r') as zipf:
file_list = zipf.namelist()
zipf.extractall(extract_to)
result = {
'unzipped': True,
'zip_path': str(zip_path),
return self._format_result(
True,
"文件解压成功",
{
'extract_to': str(extract_to),
'file_count': len(file_list)
}
self.log.info("文件解压成功 | zip={} extract_to={} files={}",
zip_path, extract_to, len(file_list))
return self._to_dataframe(result)
)
except Exception as e:
self.log.error("文件解压失败 | zip={} error={}", zip_path, str(e))
raise
def compress_large_log(self, log_path, max_size_mb=20):
"""
压缩过大的日志文件
:param log_path: 日志文件路径
:param max_size_mb: 最大大小(MB),超过则压缩
:return: DataFrame({'compressed': bool, 'original_path': str, 'zip_path': str, 'original_size_mb': float})
"""
log_path = self._resolve_path(log_path)
if not self.file_exists(log_path).iloc[0]['exists']:
return self._to_dataframe({'compressed': [False]})
max_size_bytes = max_size_mb * 1024 * 1024
size_info = self.get_file_size(log_path)
current_size = size_info.iloc[0]['size_bytes']
if current_size > max_size_bytes:
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
zip_path = f"{log_path}_{timestamp}.zip"
self.zip_files([log_path], zip_path)
self.delete_file(log_path)
result = {
'compressed': True,
'original_path': str(log_path),
'zip_path': zip_path,
'original_size_mb': round(current_size/1024/1024, 2)
return self._format_result(
False,
f"文件解压失败: {str(e)}",
{
'zip_path': str(zip_path),
'extract_to': str(extract_to)
}
self.log.info("日志文件压缩 | original={} compressed={} original_size={} MB",
log_path, zip_path, result['original_size_mb'])
return self._to_dataframe(result)
)
# ---------------------------- 测试用例 ----------------------------
if __name__ == "__main__":
# 初始化处理器(自动处理跨平台路径)
handler = FileHandler("test_data")
# 测试路径标准化
test_paths = [
"normal/path",
"windows\\style\\path",
"mixed/path\\with\\both"
]
print("=== 路径标准化测试 ===")
for path in test_paths:
resolved = handler._resolve_path(path)
print(f"原始路径: {path} -> 标准化: {resolved} (类型: {type(resolved)})")
# 测试目录操作
print("\n=== 目录操作测试 ===")
dir_result = handler.create_dir("test_dir")
print(dir_result)
# 测试文件操作
print("\n=== 文件操作测试 ===")
test_data = [{"name": "Alice", "age": 25}, {"name": "Bob", "age": 30}]
write_result = handler.write_file("test_dir/data.json", test_data)
print(write_result)
# 测试文件读取
try:
df = handler.read_file("test_dir/data.json")
print("\n读取文件内容:")
print(df)
except Exception as e:
self.log.error("日志压缩失败 | path={} error={}", log_path, str(e))
raise
print(f"\n文件读取失败: {str(e)}")
return self._to_dataframe({'compressed': [False]})
# 测试列表文件
print("\n=== 文件列表测试 ===")
list_result = handler.list_files("test_dir")
print(list_result)
def get_file_extension(self, file_path):
"""
获取文件扩展名
:param file_path: 文件路径
:return: 文件扩展名字符串(小写,不带点)
"""
file_path = self._resolve_path(file_path)
ext = Path(file_path).suffix.lower().lstrip('.')
self.log.trace("获取文件扩展名 | path={} ext={}", file_path, ext)
return ext # 直接返回字符串而不是DataFrame
# 测试压缩解压
print("\n=== 压缩解压测试 ===")
zip_result = handler.zip_files(
["test_dir/data.json"],
"test_archive.zip"
)
print(zip_result)
def change_file_extension(self, file_path, new_extension):
"""
修改文件扩展名
:param file_path: 文件路径
:param new_extension: 新扩展名(不带点)
:return: DataFrame({'original_path': str, 'new_path': str})
"""
file_path = self._resolve_path(file_path)
new_path = str(Path(file_path).with_suffix(f'.{new_extension}'))
result = {'original_path': str(file_path), 'new_path': new_path}
self.log.debug("修改文件扩展名 | original={} new={}", file_path, new_path)
return self._to_dataframe(result)
unzip_result = handler.unzip(
"test_archive.zip",
"extracted_files"
)
print(unzip_result)
def join_path(self, *paths):
"""
拼接路径
:param paths: 多个路径部分
:return: DataFrame({'joined_path': str})
"""
joined_path = str(Path(*paths))
self.log.trace("路径拼接 | parts={} result={}", paths, joined_path)
return self._to_dataframe({'joined_path': [joined_path]})
# 清理测试数据
print("\n=== 清理测试数据 ===")
print(handler.delete_file("test_dir/data.json"))
print(handler.delete_dir("test_dir"))
print(handler.delete_file("test_archive.zip"))
print(handler.delete_dir("extracted_files"))