Files
2025-10-17 17:59:28 +08:00

513 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import shutil
import zipfile
import pickle
import pandas as pd
from datetime import datetime
from pathlib import Path, PurePath
from typing import Union, Optional, List, Dict, Any, Callable
from utils.logger import log
class FileHandler:
"""
跨平台文件操作工具类(兼容Windows/macOS/Linux
功能规范:
- 读取文件内容的方法返回DataFrame
- 其他所有方法返回统一格式字典:
{
'success': bool, # 操作是否成功
'message': str, # 操作结果描述
'data': Any # 操作返回的数据(可选)
}
"""
def __init__(self, base_path: Optional[Union[str, Path]] = None):
"""
初始化文件处理器
:param base_path: 基础路径(自动处理跨平台路径格式)
"""
self.base_path = self._normalize_path(base_path) if base_path else None
self.log = log.bind(module=self.__class__.__name__)
def _normalize_path(self, path: Union[str, Path]) -> Path:
"""统一转换为跨平台Path对象"""
return Path(str(path).replace('\\', '/'))
def _resolve_path(self, path: Union[str, Path]) -> Path:
"""解析路径(自动处理跨平台路径)"""
path = self._normalize_path(path)
if not path.is_absolute() and self.base_path:
return self._normalize_path(self.base_path / path)
return path
def _format_result(self,
success: bool,
message: str = "",
data: Optional[Any] = None) -> Dict[str, Any]:
"""统一返回结果格式"""
return {
'success': bool(success),
'message': str(message),
'data': data
}
def read_file(self,
file_path: Union[str, Path],
encoding: str = 'utf-8',
**kwargs) -> pd.DataFrame:
"""
读取文件内容为DataFrame(跨平台兼容)
:param file_path: 文件路径(自动处理跨平台格式)
:param encoding: 文件编码(默认utf-8
:return: 包含文件内容的DataFrame
:raises: 文件读取失败时抛出原始异常
"""
file_path = self._resolve_path(file_path)
try:
ext = self.get_file_extension(file_path)
if ext in ['csv', 'txt']:
df = pd.read_csv(file_path, encoding=encoding, **kwargs)
elif ext in ['xls', 'xlsx']:
df = pd.read_excel(file_path, **kwargs)
elif ext == 'json':
df = pd.read_json(file_path, encoding=encoding, **kwargs)
elif ext in ['pkl', 'pickle']:
# 统一将pickle内容转为DataFrame返回
obj = pd.read_pickle(file_path)
if isinstance(obj, pd.DataFrame):
df = obj
elif isinstance(obj, list):
df = pd.DataFrame(obj)
elif isinstance(obj, dict):
df = pd.DataFrame([obj])
else:
df = pd.DataFrame({'content': [obj]})
elif ext == 'parquet':
df = pd.read_parquet(file_path, **kwargs)
else:
with open(file_path, 'r', encoding=encoding) as f:
return pd.DataFrame({'content': [f.read()]})
self.log.debug(f"文件读取成功 | path={file_path} shape={df.shape}")
return df
except Exception as e:
self.log.error(f"文件读取失败 | path={file_path} error={str(e)}")
raise
def write_file(self,
file_path: Union[str, Path],
data: Union[pd.DataFrame, Dict, List],
encoding: str = 'utf-8',
**kwargs) -> Dict[str, Any]:
"""
写入文件(跨平台兼容)
:param file_path: 目标文件路径
:param data: 要写入的数据(支持DataFrame/dict/list
:param encoding: 文件编码(默认utf-8
:return: 操作结果字典
"""
file_path = self._resolve_path(file_path)
try:
# 自动创建父目录
parent_dir = file_path.parent
if not parent_dir.exists():
self.create_dir(parent_dir)
# 根据扩展名选择写入方式
ext = self.get_file_extension(file_path)
if ext in ['pkl', 'pickle']:
# 直接按原始对象进行pickle序列化
with open(file_path, 'wb') as f:
pickle.dump(data, f)
else:
# 统一数据格式到DataFrame
if isinstance(data, pd.DataFrame):
df = data
else:
df = pd.DataFrame(data if isinstance(data, list) else [data])
if ext in ['csv', 'txt']:
df.to_csv(file_path, encoding=encoding, index=False, **kwargs)
elif ext in ['xls', 'xlsx']:
df.to_excel(file_path, index=False, **kwargs)
elif ext == 'json':
df.to_json(file_path, force_ascii=False, **kwargs)
elif ext == 'parquet':
df.to_parquet(file_path, **kwargs)
else:
with open(file_path, 'w', encoding=encoding) as f:
f.write(str(data))
# 返回成功结果
return self._format_result(
True,
"文件写入成功",
{
'file_path': str(file_path),
'file_size': os.path.getsize(file_path)
}
)
except Exception as e:
return self._format_result(
False,
f"文件写入失败: {str(e)}",
{'file_path': str(file_path)}
)
def file_exists(self, file_path: Union[str, Path]) -> Dict[str, Any]:
"""
检查文件是否存在(跨平台兼容)
:return: 包含exists字段的结果字典
"""
file_path = self._resolve_path(file_path)
exists = file_path.is_file()
msg = f"文件{'' if exists else ''}存在: {file_path}"
return self._format_result(True, msg, {'exists': exists})
def dir_exists(self, dir_path: Union[str, Path]) -> Dict[str, Any]:
"""
检查目录是否存在(跨平台兼容)
:return: 包含exists字段的结果字典
"""
dir_path = self._resolve_path(dir_path)
exists = dir_path.is_dir()
msg = f"目录{'' if exists else ''}存在: {dir_path}"
return self._format_result(True, msg, {'exists': exists})
def create_dir(self, dir_path: Union[str, Path]) -> Dict[str, Any]:
"""
创建目录(跨平台兼容)
:return: 包含path字段的结果字典
"""
dir_path = self._resolve_path(dir_path)
try:
dir_path.mkdir(parents=True, exist_ok=True)
# Windows系统需要额外设置权限
if os.name == 'nt':
try:
os.chmod(dir_path, 0o777)
except:
pass
return self._format_result(True, "目录创建成功", {'path': str(dir_path)})
except Exception as e:
return self._format_result(False, f"目录创建失败: {str(e)}", {'path': str(dir_path)})
def delete_file(self, file_path: Union[str, Path]) -> Dict[str, Any]:
"""
删除文件(跨平台兼容)
:return: 包含path字段的结果字典
"""
file_path = self._resolve_path(file_path)
try:
if not file_path.exists():
return self._format_result(False, "文件不存在", {'path': str(file_path)})
file_path.unlink()
return self._format_result(True, "文件删除成功", {'path': str(file_path)})
except Exception as e:
return self._format_result(False, f"文件删除失败: {str(e)}", {'path': str(file_path)})
def delete_dir(self, dir_path: Union[str, Path]) -> Dict[str, Any]:
"""
删除目录及其内容(跨平台兼容)
:return: 包含path字段的结果字典
"""
dir_path = self._resolve_path(dir_path)
try:
if not dir_path.exists():
return self._format_result(False, "目录不存在", {'path': str(dir_path)})
shutil.rmtree(dir_path)
return self._format_result(True, "目录删除成功", {'path': str(dir_path)})
except Exception as e:
return self._format_result(False, f"目录删除失败: {str(e)}", {'path': str(dir_path)})
def list_files(self,
dir_path: Union[str, Path],
recursive: bool = False,
pattern: str = '*') -> Dict[str, Any]:
"""
列出目录中的文件(跨平台兼容)
:param recursive: 是否递归查找
:param pattern: 文件匹配模式(如*.txt
:return: 包含files字段的结果字典
"""
dir_path = self._resolve_path(dir_path)
try:
if recursive:
files = list(dir_path.rglob(pattern))
else:
files = list(dir_path.glob(pattern))
file_info = [
{
'path': str(f),
'name': f.name,
'size': f.stat().st_size,
'modified': datetime.fromtimestamp(f.stat().st_mtime).isoformat(),
'is_dir': f.is_dir()
} for f in files if f.is_file() # 只返回文件,不包括目录
]
return self._format_result(
True,
f"找到 {len(file_info)} 个文件",
{'files': file_info}
)
except Exception as e:
return self._format_result(
False,
f"列出文件失败: {str(e)}",
{'files': []}
)
def get_file_extension(self, file_path: Union[str, Path]) -> str:
"""
获取文件扩展名(跨平台兼容)
:return: 小写且不带点的扩展名(如 'jpg'
"""
file_path = self._resolve_path(file_path)
ext = file_path.suffix.lower().lstrip('.')
self.log.trace(f"获取文件扩展名 | path={file_path} ext={ext}")
return ext
def copy_file(self,
src_path: Union[str, Path],
dst_path: Union[str, Path]) -> Dict[str, Any]:
"""
复制文件(跨平台兼容)
:return: 包含source和destination字段的结果字典
"""
src_path = self._resolve_path(src_path)
dst_path = self._resolve_path(dst_path)
try:
if not src_path.exists():
return self._format_result(
False,
"源文件不存在",
{
'source': str(src_path),
'destination': str(dst_path)
}
)
# 确保目标目录存在
self.create_dir(dst_path.parent)
shutil.copy2(src_path, dst_path)
return self._format_result(
True,
"文件复制成功",
{
'source': str(src_path),
'destination': str(dst_path),
'file_size': dst_path.stat().st_size
}
)
except Exception as e:
return self._format_result(
False,
f"文件复制失败: {str(e)}",
{
'source': str(src_path),
'destination': str(dst_path)
}
)
def move_file(self,
src_path: Union[str, Path],
dst_path: Union[str, Path]) -> Dict[str, Any]:
"""
移动/重命名文件(跨平台兼容)
:return: 包含source和destination字段的结果字典
"""
src_path = self._resolve_path(src_path)
dst_path = self._resolve_path(dst_path)
try:
if not src_path.exists():
return self._format_result(
False,
"源文件不存在",
{
'source': str(src_path),
'destination': str(dst_path)
}
)
# 确保目标目录存在
self.create_dir(dst_path.parent)
shutil.move(src_path, dst_path)
return self._format_result(
True,
"文件移动成功",
{
'source': str(src_path),
'destination': str(dst_path)
}
)
except Exception as e:
return self._format_result(
False,
f"文件移动失败: {str(e)}",
{
'source': str(src_path),
'destination': str(dst_path)
}
)
def zip_files(self,
file_paths: List[Union[str, Path]],
zip_path: Union[str, Path]) -> Dict[str, Any]:
"""
压缩多个文件到zip(跨平台兼容)
:param file_paths: 要压缩的文件路径列表
:param zip_path: 目标zip文件路径
:return: 包含zip_path和file_count字段的结果字典
"""
zip_path = self._resolve_path(zip_path)
try:
# 确保目标目录存在
self.create_dir(zip_path.parent)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
file_count = 0
for file_path in file_paths:
file_path = self._resolve_path(file_path)
if file_path.exists():
zipf.write(file_path, file_path.name)
file_count += 1
return self._format_result(
True,
"文件压缩成功",
{
'zip_path': str(zip_path),
'file_count': file_count,
'zip_size': os.path.getsize(zip_path)
}
)
except Exception as e:
return self._format_result(
False,
f"文件压缩失败: {str(e)}",
{
'zip_path': str(zip_path)
}
)
def unzip(self,
zip_path: Union[str, Path],
extract_to: Optional[Union[str, Path]] = None) -> Dict[str, Any]:
"""
解压zip文件(跨平台兼容)
:param extract_to: 解压目标目录(默认为zip文件所在目录)
:return: 包含extract_to和file_count字段的结果字典
"""
zip_path = self._resolve_path(zip_path)
extract_to = self._resolve_path(extract_to) if extract_to else zip_path.parent
try:
if not zip_path.exists():
return self._format_result(
False,
"ZIP文件不存在",
{
'zip_path': str(zip_path),
'extract_to': str(extract_to)
}
)
# 确保目标目录存在
self.create_dir(extract_to)
with zipfile.ZipFile(zip_path, 'r') as zipf:
file_list = zipf.namelist()
zipf.extractall(extract_to)
return self._format_result(
True,
"文件解压成功",
{
'extract_to': str(extract_to),
'file_count': len(file_list)
}
)
except Exception as e:
return self._format_result(
False,
f"文件解压失败: {str(e)}",
{
'zip_path': str(zip_path),
'extract_to': str(extract_to)
}
)
# ---------------------------- 测试用例 ----------------------------
if __name__ == "__main__":
# 初始化处理器(自动处理跨平台路径)
project_root = next(p for p in Path(__file__).resolve().parents if
(p / '.git').exists() or (p / 'pyproject.toml').exists() or (p / 'requirements.txt').exists())
handler = FileHandler(project_root / "test")
# 测试路径标准化
test_paths = [
"normal/path",
"windows\\style\\path",
"mixed/path\\with\\both"
]
print("=== 路径标准化测试 ===")
for path in test_paths:
resolved = handler._resolve_path(path)
print(f"原始路径: {path} -> 标准化: {resolved} (类型: {type(resolved)})")
# 测试目录操作
print("\n=== 目录操作测试 ===")
dir_result = handler.create_dir("test_dir")
print(dir_result)
# 测试文件操作
print("\n=== 文件操作测试 ===")
test_data = [{"name": "Alice", "age": 25}, {"name": "Bob", "age": 30}]
write_result = handler.write_file("test_dir/data.json", test_data)
print(write_result)
# 测试文件读取
try:
df = handler.read_file("test_dir/data.json")
print("\n读取文件内容:")
print(df)
except Exception as e:
print(f"\n文件读取失败: {str(e)}")
# 测试列表文件
print("\n=== 文件列表测试 ===")
list_result = handler.list_files("test_dir")
print(list_result)
# 测试压缩解压
print("\n=== 压缩解压测试 ===")
zip_result = handler.zip_files(
["test_dir/data.json"],
"test_archive.zip"
)
print(zip_result)
unzip_result = handler.unzip(
"test_archive.zip",
"extracted_files"
)
print(unzip_result)
# 清理测试数据
print("\n=== 清理测试数据 ===")
print(handler.delete_file("test_dir/data.json"))
print(handler.delete_dir("test_dir"))
print(handler.delete_file("test_archive.zip"))
print(handler.delete_dir("extracted_files"))