513 lines
18 KiB
Python
513 lines
18 KiB
Python
import os
|
||
import shutil
|
||
import zipfile
|
||
import pickle
|
||
import pandas as pd
|
||
from datetime import datetime
|
||
from pathlib import Path, PurePath
|
||
from typing import Union, Optional, List, Dict, Any, Callable
|
||
from utils.logger import log
|
||
|
||
class FileHandler:
|
||
"""
|
||
跨平台文件操作工具类(兼容Windows/macOS/Linux)
|
||
功能规范:
|
||
- 读取文件内容的方法返回DataFrame
|
||
- 其他所有方法返回统一格式字典:
|
||
{
|
||
'success': bool, # 操作是否成功
|
||
'message': str, # 操作结果描述
|
||
'data': Any # 操作返回的数据(可选)
|
||
}
|
||
"""
|
||
|
||
def __init__(self, base_path: Optional[Union[str, Path]] = None):
|
||
"""
|
||
初始化文件处理器
|
||
:param base_path: 基础路径(自动处理跨平台路径格式)
|
||
"""
|
||
self.base_path = self._normalize_path(base_path) if base_path else None
|
||
self.log = log.bind(module=self.__class__.__name__)
|
||
|
||
def _normalize_path(self, path: Union[str, Path]) -> Path:
|
||
"""统一转换为跨平台Path对象"""
|
||
return Path(str(path).replace('\\', '/'))
|
||
|
||
def _resolve_path(self, path: Union[str, Path]) -> Path:
|
||
"""解析路径(自动处理跨平台路径)"""
|
||
path = self._normalize_path(path)
|
||
if not path.is_absolute() and self.base_path:
|
||
return self._normalize_path(self.base_path / path)
|
||
return path
|
||
|
||
def _format_result(self,
|
||
success: bool,
|
||
message: str = "",
|
||
data: Optional[Any] = None) -> Dict[str, Any]:
|
||
"""统一返回结果格式"""
|
||
return {
|
||
'success': bool(success),
|
||
'message': str(message),
|
||
'data': data
|
||
}
|
||
|
||
def read_file(self,
|
||
file_path: Union[str, Path],
|
||
encoding: str = 'utf-8',
|
||
**kwargs) -> pd.DataFrame:
|
||
"""
|
||
读取文件内容为DataFrame(跨平台兼容)
|
||
:param file_path: 文件路径(自动处理跨平台格式)
|
||
:param encoding: 文件编码(默认utf-8)
|
||
:return: 包含文件内容的DataFrame
|
||
:raises: 文件读取失败时抛出原始异常
|
||
"""
|
||
file_path = self._resolve_path(file_path)
|
||
try:
|
||
ext = self.get_file_extension(file_path)
|
||
|
||
if ext in ['csv', 'txt']:
|
||
df = pd.read_csv(file_path, encoding=encoding, **kwargs)
|
||
elif ext in ['xls', 'xlsx']:
|
||
df = pd.read_excel(file_path, **kwargs)
|
||
elif ext == 'json':
|
||
df = pd.read_json(file_path, encoding=encoding, **kwargs)
|
||
elif ext in ['pkl', 'pickle']:
|
||
# 统一将pickle内容转为DataFrame返回
|
||
obj = pd.read_pickle(file_path)
|
||
if isinstance(obj, pd.DataFrame):
|
||
df = obj
|
||
elif isinstance(obj, list):
|
||
df = pd.DataFrame(obj)
|
||
elif isinstance(obj, dict):
|
||
df = pd.DataFrame([obj])
|
||
else:
|
||
df = pd.DataFrame({'content': [obj]})
|
||
elif ext == 'parquet':
|
||
df = pd.read_parquet(file_path, **kwargs)
|
||
else:
|
||
with open(file_path, 'r', encoding=encoding) as f:
|
||
return pd.DataFrame({'content': [f.read()]})
|
||
|
||
self.log.debug(f"文件读取成功 | path={file_path} shape={df.shape}")
|
||
return df
|
||
except Exception as e:
|
||
self.log.error(f"文件读取失败 | path={file_path} error={str(e)}")
|
||
raise
|
||
|
||
def write_file(self,
|
||
file_path: Union[str, Path],
|
||
data: Union[pd.DataFrame, Dict, List],
|
||
encoding: str = 'utf-8',
|
||
**kwargs) -> Dict[str, Any]:
|
||
"""
|
||
写入文件(跨平台兼容)
|
||
:param file_path: 目标文件路径
|
||
:param data: 要写入的数据(支持DataFrame/dict/list)
|
||
:param encoding: 文件编码(默认utf-8)
|
||
:return: 操作结果字典
|
||
"""
|
||
file_path = self._resolve_path(file_path)
|
||
try:
|
||
# 自动创建父目录
|
||
parent_dir = file_path.parent
|
||
if not parent_dir.exists():
|
||
self.create_dir(parent_dir)
|
||
|
||
# 根据扩展名选择写入方式
|
||
ext = self.get_file_extension(file_path)
|
||
|
||
if ext in ['pkl', 'pickle']:
|
||
# 直接按原始对象进行pickle序列化
|
||
with open(file_path, 'wb') as f:
|
||
pickle.dump(data, f)
|
||
else:
|
||
# 统一数据格式到DataFrame
|
||
if isinstance(data, pd.DataFrame):
|
||
df = data
|
||
else:
|
||
df = pd.DataFrame(data if isinstance(data, list) else [data])
|
||
|
||
if ext in ['csv', 'txt']:
|
||
df.to_csv(file_path, encoding=encoding, index=False, **kwargs)
|
||
elif ext in ['xls', 'xlsx']:
|
||
df.to_excel(file_path, index=False, **kwargs)
|
||
elif ext == 'json':
|
||
df.to_json(file_path, force_ascii=False, **kwargs)
|
||
elif ext == 'parquet':
|
||
df.to_parquet(file_path, **kwargs)
|
||
else:
|
||
with open(file_path, 'w', encoding=encoding) as f:
|
||
f.write(str(data))
|
||
|
||
# 返回成功结果
|
||
return self._format_result(
|
||
True,
|
||
"文件写入成功",
|
||
{
|
||
'file_path': str(file_path),
|
||
'file_size': os.path.getsize(file_path)
|
||
}
|
||
)
|
||
except Exception as e:
|
||
return self._format_result(
|
||
False,
|
||
f"文件写入失败: {str(e)}",
|
||
{'file_path': str(file_path)}
|
||
)
|
||
|
||
def file_exists(self, file_path: Union[str, Path]) -> Dict[str, Any]:
|
||
"""
|
||
检查文件是否存在(跨平台兼容)
|
||
:return: 包含exists字段的结果字典
|
||
"""
|
||
file_path = self._resolve_path(file_path)
|
||
exists = file_path.is_file()
|
||
msg = f"文件{'' if exists else '不'}存在: {file_path}"
|
||
return self._format_result(True, msg, {'exists': exists})
|
||
|
||
def dir_exists(self, dir_path: Union[str, Path]) -> Dict[str, Any]:
|
||
"""
|
||
检查目录是否存在(跨平台兼容)
|
||
:return: 包含exists字段的结果字典
|
||
"""
|
||
dir_path = self._resolve_path(dir_path)
|
||
exists = dir_path.is_dir()
|
||
msg = f"目录{'' if exists else '不'}存在: {dir_path}"
|
||
return self._format_result(True, msg, {'exists': exists})
|
||
|
||
def create_dir(self, dir_path: Union[str, Path]) -> Dict[str, Any]:
|
||
"""
|
||
创建目录(跨平台兼容)
|
||
:return: 包含path字段的结果字典
|
||
"""
|
||
dir_path = self._resolve_path(dir_path)
|
||
try:
|
||
dir_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Windows系统需要额外设置权限
|
||
if os.name == 'nt':
|
||
try:
|
||
os.chmod(dir_path, 0o777)
|
||
except:
|
||
pass
|
||
|
||
return self._format_result(True, "目录创建成功", {'path': str(dir_path)})
|
||
except Exception as e:
|
||
return self._format_result(False, f"目录创建失败: {str(e)}", {'path': str(dir_path)})
|
||
|
||
def delete_file(self, file_path: Union[str, Path]) -> Dict[str, Any]:
|
||
"""
|
||
删除文件(跨平台兼容)
|
||
:return: 包含path字段的结果字典
|
||
"""
|
||
file_path = self._resolve_path(file_path)
|
||
try:
|
||
if not file_path.exists():
|
||
return self._format_result(False, "文件不存在", {'path': str(file_path)})
|
||
|
||
file_path.unlink()
|
||
return self._format_result(True, "文件删除成功", {'path': str(file_path)})
|
||
except Exception as e:
|
||
return self._format_result(False, f"文件删除失败: {str(e)}", {'path': str(file_path)})
|
||
|
||
def delete_dir(self, dir_path: Union[str, Path]) -> Dict[str, Any]:
|
||
"""
|
||
删除目录及其内容(跨平台兼容)
|
||
:return: 包含path字段的结果字典
|
||
"""
|
||
dir_path = self._resolve_path(dir_path)
|
||
try:
|
||
if not dir_path.exists():
|
||
return self._format_result(False, "目录不存在", {'path': str(dir_path)})
|
||
|
||
shutil.rmtree(dir_path)
|
||
return self._format_result(True, "目录删除成功", {'path': str(dir_path)})
|
||
except Exception as e:
|
||
return self._format_result(False, f"目录删除失败: {str(e)}", {'path': str(dir_path)})
|
||
|
||
def list_files(self,
|
||
dir_path: Union[str, Path],
|
||
recursive: bool = False,
|
||
pattern: str = '*') -> Dict[str, Any]:
|
||
"""
|
||
列出目录中的文件(跨平台兼容)
|
||
:param recursive: 是否递归查找
|
||
:param pattern: 文件匹配模式(如*.txt)
|
||
:return: 包含files字段的结果字典
|
||
"""
|
||
dir_path = self._resolve_path(dir_path)
|
||
try:
|
||
if recursive:
|
||
files = list(dir_path.rglob(pattern))
|
||
else:
|
||
files = list(dir_path.glob(pattern))
|
||
|
||
file_info = [
|
||
{
|
||
'path': str(f),
|
||
'name': f.name,
|
||
'size': f.stat().st_size,
|
||
'modified': datetime.fromtimestamp(f.stat().st_mtime).isoformat(),
|
||
'is_dir': f.is_dir()
|
||
} for f in files if f.is_file() # 只返回文件,不包括目录
|
||
]
|
||
|
||
return self._format_result(
|
||
True,
|
||
f"找到 {len(file_info)} 个文件",
|
||
{'files': file_info}
|
||
)
|
||
except Exception as e:
|
||
return self._format_result(
|
||
False,
|
||
f"列出文件失败: {str(e)}",
|
||
{'files': []}
|
||
)
|
||
|
||
def get_file_extension(self, file_path: Union[str, Path]) -> str:
|
||
"""
|
||
获取文件扩展名(跨平台兼容)
|
||
:return: 小写且不带点的扩展名(如 'jpg')
|
||
"""
|
||
file_path = self._resolve_path(file_path)
|
||
ext = file_path.suffix.lower().lstrip('.')
|
||
self.log.trace(f"获取文件扩展名 | path={file_path} ext={ext}")
|
||
return ext
|
||
|
||
def copy_file(self,
|
||
src_path: Union[str, Path],
|
||
dst_path: Union[str, Path]) -> Dict[str, Any]:
|
||
"""
|
||
复制文件(跨平台兼容)
|
||
:return: 包含source和destination字段的结果字典
|
||
"""
|
||
src_path = self._resolve_path(src_path)
|
||
dst_path = self._resolve_path(dst_path)
|
||
try:
|
||
if not src_path.exists():
|
||
return self._format_result(
|
||
False,
|
||
"源文件不存在",
|
||
{
|
||
'source': str(src_path),
|
||
'destination': str(dst_path)
|
||
}
|
||
)
|
||
|
||
# 确保目标目录存在
|
||
self.create_dir(dst_path.parent)
|
||
|
||
shutil.copy2(src_path, dst_path)
|
||
return self._format_result(
|
||
True,
|
||
"文件复制成功",
|
||
{
|
||
'source': str(src_path),
|
||
'destination': str(dst_path),
|
||
'file_size': dst_path.stat().st_size
|
||
}
|
||
)
|
||
except Exception as e:
|
||
return self._format_result(
|
||
False,
|
||
f"文件复制失败: {str(e)}",
|
||
{
|
||
'source': str(src_path),
|
||
'destination': str(dst_path)
|
||
}
|
||
)
|
||
|
||
def move_file(self,
|
||
src_path: Union[str, Path],
|
||
dst_path: Union[str, Path]) -> Dict[str, Any]:
|
||
"""
|
||
移动/重命名文件(跨平台兼容)
|
||
:return: 包含source和destination字段的结果字典
|
||
"""
|
||
src_path = self._resolve_path(src_path)
|
||
dst_path = self._resolve_path(dst_path)
|
||
try:
|
||
if not src_path.exists():
|
||
return self._format_result(
|
||
False,
|
||
"源文件不存在",
|
||
{
|
||
'source': str(src_path),
|
||
'destination': str(dst_path)
|
||
}
|
||
)
|
||
|
||
# 确保目标目录存在
|
||
self.create_dir(dst_path.parent)
|
||
|
||
shutil.move(src_path, dst_path)
|
||
return self._format_result(
|
||
True,
|
||
"文件移动成功",
|
||
{
|
||
'source': str(src_path),
|
||
'destination': str(dst_path)
|
||
}
|
||
)
|
||
except Exception as e:
|
||
return self._format_result(
|
||
False,
|
||
f"文件移动失败: {str(e)}",
|
||
{
|
||
'source': str(src_path),
|
||
'destination': str(dst_path)
|
||
}
|
||
)
|
||
|
||
def zip_files(self,
|
||
file_paths: List[Union[str, Path]],
|
||
zip_path: Union[str, Path]) -> Dict[str, Any]:
|
||
"""
|
||
压缩多个文件到zip(跨平台兼容)
|
||
:param file_paths: 要压缩的文件路径列表
|
||
:param zip_path: 目标zip文件路径
|
||
:return: 包含zip_path和file_count字段的结果字典
|
||
"""
|
||
zip_path = self._resolve_path(zip_path)
|
||
try:
|
||
# 确保目标目录存在
|
||
self.create_dir(zip_path.parent)
|
||
|
||
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||
file_count = 0
|
||
for file_path in file_paths:
|
||
file_path = self._resolve_path(file_path)
|
||
if file_path.exists():
|
||
zipf.write(file_path, file_path.name)
|
||
file_count += 1
|
||
|
||
return self._format_result(
|
||
True,
|
||
"文件压缩成功",
|
||
{
|
||
'zip_path': str(zip_path),
|
||
'file_count': file_count,
|
||
'zip_size': os.path.getsize(zip_path)
|
||
}
|
||
)
|
||
except Exception as e:
|
||
return self._format_result(
|
||
False,
|
||
f"文件压缩失败: {str(e)}",
|
||
{
|
||
'zip_path': str(zip_path)
|
||
}
|
||
)
|
||
|
||
def unzip(self,
|
||
zip_path: Union[str, Path],
|
||
extract_to: Optional[Union[str, Path]] = None) -> Dict[str, Any]:
|
||
"""
|
||
解压zip文件(跨平台兼容)
|
||
:param extract_to: 解压目标目录(默认为zip文件所在目录)
|
||
:return: 包含extract_to和file_count字段的结果字典
|
||
"""
|
||
zip_path = self._resolve_path(zip_path)
|
||
extract_to = self._resolve_path(extract_to) if extract_to else zip_path.parent
|
||
|
||
try:
|
||
if not zip_path.exists():
|
||
return self._format_result(
|
||
False,
|
||
"ZIP文件不存在",
|
||
{
|
||
'zip_path': str(zip_path),
|
||
'extract_to': str(extract_to)
|
||
}
|
||
)
|
||
|
||
# 确保目标目录存在
|
||
self.create_dir(extract_to)
|
||
|
||
with zipfile.ZipFile(zip_path, 'r') as zipf:
|
||
file_list = zipf.namelist()
|
||
zipf.extractall(extract_to)
|
||
|
||
return self._format_result(
|
||
True,
|
||
"文件解压成功",
|
||
{
|
||
'extract_to': str(extract_to),
|
||
'file_count': len(file_list)
|
||
}
|
||
)
|
||
except Exception as e:
|
||
return self._format_result(
|
||
False,
|
||
f"文件解压失败: {str(e)}",
|
||
{
|
||
'zip_path': str(zip_path),
|
||
'extract_to': str(extract_to)
|
||
}
|
||
)
|
||
|
||
|
||
# ---------------------------- 测试用例 ----------------------------
|
||
if __name__ == "__main__":
|
||
# 初始化处理器(自动处理跨平台路径)
|
||
project_root = next(p for p in Path(__file__).resolve().parents if
|
||
(p / '.git').exists() or (p / 'pyproject.toml').exists() or (p / 'requirements.txt').exists())
|
||
handler = FileHandler(project_root / "test")
|
||
|
||
# 测试路径标准化
|
||
test_paths = [
|
||
"normal/path",
|
||
"windows\\style\\path",
|
||
"mixed/path\\with\\both"
|
||
]
|
||
|
||
print("=== 路径标准化测试 ===")
|
||
for path in test_paths:
|
||
resolved = handler._resolve_path(path)
|
||
print(f"原始路径: {path} -> 标准化: {resolved} (类型: {type(resolved)})")
|
||
|
||
# 测试目录操作
|
||
print("\n=== 目录操作测试 ===")
|
||
dir_result = handler.create_dir("test_dir")
|
||
print(dir_result)
|
||
|
||
# 测试文件操作
|
||
print("\n=== 文件操作测试 ===")
|
||
test_data = [{"name": "Alice", "age": 25}, {"name": "Bob", "age": 30}]
|
||
write_result = handler.write_file("test_dir/data.json", test_data)
|
||
print(write_result)
|
||
|
||
# 测试文件读取
|
||
try:
|
||
df = handler.read_file("test_dir/data.json")
|
||
print("\n读取文件内容:")
|
||
print(df)
|
||
except Exception as e:
|
||
print(f"\n文件读取失败: {str(e)}")
|
||
|
||
# 测试列表文件
|
||
print("\n=== 文件列表测试 ===")
|
||
list_result = handler.list_files("test_dir")
|
||
print(list_result)
|
||
|
||
# 测试压缩解压
|
||
print("\n=== 压缩解压测试 ===")
|
||
zip_result = handler.zip_files(
|
||
["test_dir/data.json"],
|
||
"test_archive.zip"
|
||
)
|
||
print(zip_result)
|
||
|
||
unzip_result = handler.unzip(
|
||
"test_archive.zip",
|
||
"extracted_files"
|
||
)
|
||
print(unzip_result)
|
||
|
||
# 清理测试数据
|
||
print("\n=== 清理测试数据 ===")
|
||
print(handler.delete_file("test_dir/data.json"))
|
||
print(handler.delete_dir("test_dir"))
|
||
print(handler.delete_file("test_archive.zip"))
|
||
print(handler.delete_dir("extracted_files")) |