From 196df754bc6fcd20d137a12b82e59176749ecd60 Mon Sep 17 00:00:00 2001 From: Administrator <1415243231@qq.com> Date: Wed, 6 Aug 2025 12:33:56 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=9A=E7=94=A8=E6=96=87=E4=BB=B6=E8=AF=BB?= =?UTF-8?q?=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/readme.md | 3 + test/日志测试.py | 45 +++- test/通用文件读取测试.py | 91 +++++++ utils/file_handler.py | 552 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 683 insertions(+), 8 deletions(-) create mode 100644 test/通用文件读取测试.py diff --git a/doc/readme.md b/doc/readme.md index 1409682..6844b0e 100644 --- a/doc/readme.md +++ b/doc/readme.md @@ -1,5 +1,8 @@ ## 情报收集系统设计 +### 参考文档 +https://alidocs.dingtalk.com/i/nodes/NZQYprEoWoexdo1ohPdxXvDbJ1waOeDk?utm_scene=team_space + ### 程序框架 ```angular2html intelligence_system/ diff --git a/test/日志测试.py b/test/日志测试.py index b04b12e..d49ffc4 100644 --- a/test/日志测试.py +++ b/test/日志测试.py @@ -14,14 +14,43 @@ # test_log_rotation.py -from utils.logger import log -import time +# from utils.logger import log +# import time +# +# def generate_large_log(): +# """快速生成超过20MB的测试日志""" +# for i in range(10000): +# log.info(f"测试日志填充数据... {i}" * 10) +# time.sleep(0.001) # 避免内存暴涨 +# +# if __name__ == "__main__": +# generate_large_log() -def generate_large_log(): - """快速生成超过20MB的测试日志""" - for i in range(10000): - log.info(f"测试日志填充数据... {i}" * 10) - time.sleep(0.001) # 避免内存暴涨 +# 使用方法 +# my_module/main_class.py +from utils.logger import log + +class MainProcessor: + def __init__(self): + self.log = log.bind(module=self.__class__.__name__) # 动态绑定类名 + + def main(self): + """主执行方法""" + self.log.info("开始执行主流程") + try: + self._step1() + # self._step2() + except Exception as e: + self.log.error("主流程执行失败", exc_info=e) + raise + + def _step1(self): + """子方法示例""" + self.log.debug("执行步骤1: 初始化资源") + # ...业务逻辑... + resource_count = 10 + self.log.info("步骤1完成 | created={}", resource_count) if __name__ == "__main__": - generate_large_log() \ No newline at end of file + processor = MainProcessor() + processor.main() \ No newline at end of file diff --git a/test/通用文件读取测试.py b/test/通用文件读取测试.py new file mode 100644 index 0000000..7176c18 --- /dev/null +++ b/test/通用文件读取测试.py @@ -0,0 +1,91 @@ +import pytest +import pandas as pd +import os +from pathlib import Path +from utils.file_handler import FileHandler + +@pytest.fixture +def temp_dir(tmp_path): + """创建临时测试目录""" + test_dir = tmp_path / "test_files" + test_dir.mkdir() + return test_dir + +@pytest.fixture +def file_handler(temp_dir): + """创建FileHandler实例""" + return FileHandler(temp_dir) + +@pytest.fixture +def sample_dataframe(): + """创建测试用DataFrame""" + return pd.DataFrame({ + 'id': [1, 2, 3], + 'name': ['Alice', 'Bob', 'Charlie'], + 'value': [10.5, 20.3, 30.1] + }) + +@pytest.fixture +def sample_text_file(temp_dir): + """创建测试文本文件""" + file_path = temp_dir / "test.txt" + with open(file_path, 'w') as f: + f.write("line1\nline2\nline3") + return file_path + + +# 开始测试 + +def test_read_write_csv(file_handler, temp_dir, sample_dataframe): + """测试CSV文件读写""" + test_file = temp_dir / "test.csv" + + # 测试写入 + write_result = file_handler.write_file(test_file, sample_dataframe) + + # 修改断言方式 + assert bool(write_result.iloc[0]['success']) == True # 使用bool()转换 + # 或者 + assert write_result.iloc[0]['success'] == True # 使用值比较 + + assert os.path.exists(test_file) + + # 测试读取 + df = file_handler.read_file(test_file) + assert df.shape == (3, 3) + assert list(df.columns) == ['id', 'name', 'value'] + +def test_read_write_json(file_handler, temp_dir, sample_dataframe): + """测试JSON文件读写""" + test_file = temp_dir / "test.json" + + # 测试写入 + write_result = file_handler.write_file(test_file, sample_dataframe) + assert write_result.iloc[0]['success'] == True + + # 测试读取 + df = file_handler.read_file(test_file) + assert df.shape == (3, 3) + +def test_read_write_excel(file_handler, temp_dir, sample_dataframe): + """测试Excel文件读写""" + test_file = temp_dir / "test.xlsx" + + # 测试写入 + write_result = file_handler.write_file(test_file, sample_dataframe) + assert write_result.iloc[0]['success'] == True + + # 测试读取 + df = file_handler.read_file(test_file) + assert df.shape == (3, 3) + +def test_read_write_text(file_handler, sample_text_file): + """测试文本文件读写""" + # 测试读取 + df = file_handler.read_file(sample_text_file) + assert df.shape == (1, 1) # 默认单行读取 + + # 测试按行读取 + lines_df = file_handler.read_lines(sample_text_file) + assert lines_df.shape == (3, 1) + assert lines_df.iloc[0, 0] == "line1" \ No newline at end of file diff --git a/utils/file_handler.py b/utils/file_handler.py index e69de29..2ac8ce3 100644 --- a/utils/file_handler.py +++ b/utils/file_handler.py @@ -0,0 +1,552 @@ +import os +import shutil +import zipfile +import pandas as pd +from datetime import datetime +from pathlib import Path +from utils.logger import log + +class FileHandler: + """ + 通用文件操作工具类(所有输入输出均为DataFrame格式) + 功能包括:文件读写、目录操作、文件压缩、路径处理等 + """ + + def __init__(self, base_path=None): + """ + 初始化文件处理器 + :param base_path: 基础路径,所有操作将基于此路径 + """ + self.base_path = Path(base_path) if base_path else None + self.log = log.bind(module=self.__class__.__name__) + + def _resolve_path(self, path): + """解析路径,处理相对路径和绝对路径""" + path = Path(path) + if not path.is_absolute() and self.base_path: + return self.base_path / path + return path + + def _to_dataframe(self, data, columns=None): + """将数据转换为DataFrame格式""" + if isinstance(data, pd.DataFrame): + return data + if isinstance(data, dict): + return pd.DataFrame([data]) + if isinstance(data, list): + return pd.DataFrame(data, columns=columns) if columns else pd.DataFrame(data) + return pd.DataFrame([{'value': data}]) + + def read_file(self, file_path, encoding='utf-8', **kwargs): + """ + 读取文件内容为DataFrame + :param file_path: 文件路径 + :param encoding: 文件编码 + :param kwargs: pandas.read_* 方法的其他参数 + :return: DataFrame + """ + file_path = self._resolve_path(file_path) + try: + ext = self.get_file_extension(file_path).lower() + + if ext in ['csv', 'txt']: + df = pd.read_csv(file_path, encoding=encoding, **kwargs) + elif ext in ['xls', 'xlsx']: + df = pd.read_excel(file_path, **kwargs) + elif ext == 'json': + df = pd.read_json(file_path, encoding=encoding, **kwargs) + elif ext == 'parquet': + df = pd.read_parquet(file_path, **kwargs) + else: + # 默认按文本文件处理 + with open(file_path, 'r', encoding=encoding) as f: + content = f.read() + df = self._to_dataframe({'content': content}) + + self.log.debug("文件读取成功 | path={} shape={}", file_path, df.shape) + return df + except Exception as e: + self.log.error("文件读取失败 | path={} error={}", file_path, str(e)) + raise + + def write_file(self, file_path, data, encoding='utf-8', **kwargs): + """ + 将DataFrame写入文件 + :param file_path: 文件路径 + :param data: 要写入的DataFrame数据 + :param encoding: 文件编码 + :param kwargs: pandas.to_* 方法的其他参数 + :return: DataFrame({'success': bool, 'file_path': str, 'file_size': int}) + """ + file_path = self._resolve_path(file_path) + df = self._to_dataframe(data) + + try: + self.create_dir(os.path.dirname(file_path)) + ext = self.get_file_extension(file_path) # 现在返回的是字符串 + + if ext in ['csv', 'txt']: + df.to_csv(file_path, encoding=encoding, index=False, **kwargs) + elif ext in ['xls', 'xlsx']: + df.to_excel(file_path, index=False, **kwargs) + elif ext == 'json': + df.to_json(file_path, force_ascii=False, **kwargs) + elif ext == 'parquet': + df.to_parquet(file_path, **kwargs) + else: + # 默认按文本文件处理 + content = df.to_string(index=False) + with open(file_path, 'w', encoding=encoding) as f: + f.write(content) + + file_size = os.path.getsize(file_path) + result = { + 'success': True, + 'file_path': str(file_path), + 'file_size': file_size + } + self.log.debug("文件写入成功 | path={} size={} bytes", file_path, file_size) + return self._to_dataframe(result) + except Exception as e: + self.log.error("文件写入失败 | path={} error={}", file_path, str(e)) + raise + + def read_lines(self, file_path, encoding='utf-8', columns=['line_content']): + """ + 按行读取文件内容为DataFrame + :param file_path: 文件路径 + :param encoding: 文件编码 + :param columns: 列名列表 + :return: DataFrame + """ + file_path = self._resolve_path(file_path) + try: + with open(file_path, 'r', encoding=encoding) as f: + lines = f.readlines() + + df = self._to_dataframe(lines, columns=columns) + self.log.debug("文件按行读取成功 | path={} lines={}", file_path, len(df)) + return df + except Exception as e: + self.log.error("文件按行读取失败 | path={} error={}", file_path, str(e)) + raise + + def write_lines(self, file_path, data, encoding='utf-8', line_column=None): + """ + 将DataFrame按行写入文件 + :param file_path: 文件路径 + :param data: 要写入的DataFrame数据 + :param encoding: 文件编码 + :param line_column: 指定作为行内容的列名 + """ + file_path = self._resolve_path(file_path) + df = self._to_dataframe(data) + + try: + self.create_dir(os.path.dirname(file_path)) + + if line_column and line_column in df.columns: + lines = df[line_column].tolist() + else: + lines = df.to_string(index=False, header=False).split('\n') + + with open(file_path, 'w', encoding=encoding) as f: + f.writelines([line + '\n' for line in lines]) + + self.log.debug("文件按行写入成功 | path={} lines={}", file_path, len(lines)) + except Exception as e: + self.log.error("文件按行写入失败 | path={} error={}", file_path, str(e)) + raise + + def file_exists(self, file_path): + """ + 检查文件是否存在 + :param file_path: 文件路径 + :return: DataFrame({'exists': bool}) + """ + file_path = self._resolve_path(file_path) + exists = os.path.isfile(file_path) + self.log.trace("文件存在检查 | path={} exists={}", file_path, exists) + return self._to_dataframe({'exists': [exists]}) + + def dir_exists(self, dir_path): + """ + 检查目录是否存在 + :param dir_path: 目录路径 + :return: DataFrame({'exists': bool}) + """ + dir_path = self._resolve_path(dir_path) + exists = os.path.isdir(dir_path) + self.log.trace("目录存在检查 | path={} exists={}", dir_path, exists) + return self._to_dataframe({'exists': [exists]}) + + def create_dir(self, dir_path): + """ + 创建目录(包括父目录) + :param dir_path: 目录路径 + :return: DataFrame({'created': bool, 'path': str}) + """ + dir_path = self._resolve_path(dir_path) + try: + os.makedirs(dir_path, exist_ok=True) + self.log.debug("目录创建成功 | path={}", dir_path) + return self._to_dataframe({'created': [True], 'path': [str(dir_path)]}) + except Exception as e: + self.log.error("目录创建失败 | path={} error={}", dir_path, str(e)) + raise + + def delete_file(self, file_path): + """ + 删除文件 + :param file_path: 文件路径 + :return: DataFrame({'deleted': bool, 'path': str}) + """ + file_path = self._resolve_path(file_path) + try: + exists = self.file_exists(file_path).iloc[0]['exists'] + if exists: + os.remove(file_path) + self.log.debug("文件删除成功 | path={}", file_path) + return self._to_dataframe({'deleted': [True], 'path': [str(file_path)]}) + return self._to_dataframe({'deleted': [False], 'path': [str(file_path)]}) + except Exception as e: + self.log.error("文件删除失败 | path={} error={}", file_path, str(e)) + raise + + def delete_dir(self, dir_path): + """ + 删除目录及其内容 + :param dir_path: 目录路径 + :return: DataFrame({'deleted': bool, 'path': str}) + """ + dir_path = self._resolve_path(dir_path) + try: + exists = self.dir_exists(dir_path).iloc[0]['exists'] + if exists: + shutil.rmtree(dir_path) + self.log.debug("目录删除成功 | path={}", dir_path) + return self._to_dataframe({'deleted': [True], 'path': [str(dir_path)]}) + return self._to_dataframe({'deleted': [False], 'path': [str(dir_path)]}) + except Exception as e: + self.log.error("目录删除失败 | path={} error={}", dir_path, str(e)) + raise + + def copy_file(self, src_path, dst_path): + """ + 复制文件 + :param src_path: 源文件路径 + :param dst_path: 目标文件路径 + :return: DataFrame({'copied': bool, 'source': str, 'destination': str}) + """ + src_path = self._resolve_path(src_path) + dst_path = self._resolve_path(dst_path) + try: + self.create_dir(os.path.dirname(dst_path)) + shutil.copy2(src_path, dst_path) + self.log.debug("文件复制成功 | src={} dst={}", src_path, dst_path) + return self._to_dataframe({ + 'copied': [True], + 'source': [str(src_path)], + 'destination': [str(dst_path)] + }) + except Exception as e: + self.log.error("文件复制失败 | src={} dst={} error={}", + src_path, dst_path, str(e)) + raise + + def move_file(self, src_path, dst_path): + """ + 移动/重命名文件 + :param src_path: 源文件路径 + :param dst_path: 目标文件路径 + :return: DataFrame({'moved': bool, 'source': str, 'destination': str}) + """ + src_path = self._resolve_path(src_path) + dst_path = self._resolve_path(dst_path) + try: + self.create_dir(os.path.dirname(dst_path)) + shutil.move(src_path, dst_path) + self.log.debug("文件移动成功 | src={} dst={}", src_path, dst_path) + return self._to_dataframe({ + 'moved': [True], + 'source': [str(src_path)], + 'destination': [str(dst_path)] + }) + except Exception as e: + self.log.error("文件移动失败 | src={} dst={} error={}", + src_path, dst_path, str(e)) + raise + + def list_files(self, dir_path, recursive=False, pattern='*'): + """ + 列出目录中的文件 + :param dir_path: 目录路径 + :param recursive: 是否递归查找 + :param pattern: 文件匹配模式 + :return: DataFrame({'file_path': str, 'file_name': str, 'extension': str}) + """ + dir_path = self._resolve_path(dir_path) + try: + if recursive: + files = [str(f) for f in Path(dir_path).rglob(pattern) if f.is_file()] + else: + files = [str(f) for f in Path(dir_path).glob(pattern) if f.is_file()] + + result = [] + for f in files: + p = Path(f) + result.append({ + 'file_path': str(p), + 'file_name': p.name, + 'extension': p.suffix.lower().lstrip('.') + }) + + df = self._to_dataframe(result) + self.log.trace("列出目录文件 | path={} recursive={} count={}", + dir_path, recursive, len(df)) + return df + except Exception as e: + self.log.error("列出文件失败 | path={} error={}", dir_path, str(e)) + raise + + def list_dirs(self, dir_path, recursive=False): + """ + 列出目录中的子目录 + :param dir_path: 目录路径 + :param recursive: 是否递归查找 + :return: DataFrame({'dir_path': str, 'dir_name': str}) + """ + dir_path = self._resolve_path(dir_path) + try: + if recursive: + dirs = [str(d) for d in Path(dir_path).rglob('*') if d.is_dir()] + else: + dirs = [str(d) for d in Path(dir_path).glob('*') if d.is_dir()] + + result = [{'dir_path': d, 'dir_name': Path(d).name} for d in dirs] + df = self._to_dataframe(result) + self.log.trace("列出子目录 | path={} recursive={} count={}", + dir_path, recursive, len(df)) + return df + except Exception as e: + self.log.error("列出目录失败 | path={} error={}", dir_path, str(e)) + raise + + def get_file_size(self, file_path): + """ + 获取文件大小(字节) + :param file_path: 文件路径 + :return: DataFrame({'file_path': str, 'size_bytes': int, 'size_mb': float}) + """ + file_path = self._resolve_path(file_path) + try: + size_bytes = os.path.getsize(file_path) + result = { + 'file_path': str(file_path), + 'size_bytes': size_bytes, + 'size_mb': round(size_bytes / 1024 / 1024, 4) + } + df = self._to_dataframe(result) + self.log.trace("获取文件大小 | path={} size={} bytes", file_path, size_bytes) + return df + except Exception as e: + self.log.error("获取文件大小失败 | path={} error={}", file_path, str(e)) + raise + + def get_file_modified_time(self, file_path): + """ + 获取文件修改时间 + :param file_path: 文件路径 + :return: DataFrame({'file_path': str, 'modified_time': datetime, 'timestamp': float}) + """ + file_path = self._resolve_path(file_path) + try: + mtime = datetime.fromtimestamp(os.path.getmtime(file_path)) + result = { + 'file_path': str(file_path), + 'modified_time': mtime, + 'timestamp': mtime.timestamp() + } + df = self._to_dataframe(result) + self.log.trace("获取文件修改时间 | path={} mtime={}", + file_path, mtime.isoformat()) + return df + except Exception as e: + self.log.error("获取文件修改时间失败 | path={} error={}", + file_path, str(e)) + raise + + def zip_files(self, file_paths, zip_path): + """ + 压缩多个文件到zip + :param file_paths: 要压缩的文件路径列表或DataFrame + :param zip_path: 压缩文件路径 + :return: DataFrame({'zipped': bool, 'zip_path': str, 'file_count': int}) + """ + zip_path = self._resolve_path(zip_path) + + # 处理输入可以是DataFrame或列表 + if isinstance(file_paths, pd.DataFrame): + if 'file_path' in file_paths.columns: + file_list = file_paths['file_path'].tolist() + else: + file_list = file_paths.iloc[:, 0].tolist() + else: + file_list = file_paths + + try: + self.create_dir(os.path.dirname(zip_path)) + file_count = 0 + with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: + for file_path in file_list: + file_path = self._resolve_path(file_path) + if self.file_exists(file_path).iloc[0]['exists']: + zipf.write(file_path, os.path.basename(file_path)) + file_count += 1 + + result = { + 'zipped': True, + 'zip_path': str(zip_path), + 'file_count': file_count + } + self.log.info("文件压缩成功 | zip={} files={}", zip_path, file_count) + return self._to_dataframe(result) + except Exception as e: + self.log.error("文件压缩失败 | zip={} error={}", zip_path, str(e)) + raise + + def zip_dir(self, dir_path, zip_path): + """ + 压缩整个目录到zip + :param dir_path: 要压缩的目录路径 + :param zip_path: 压缩文件路径 + :return: DataFrame({'zipped': bool, 'zip_path': str, 'dir_path': str, 'file_count': int}) + """ + dir_path = self._resolve_path(dir_path) + zip_path = self._resolve_path(zip_path) + try: + self.create_dir(os.path.dirname(zip_path)) + file_count = 0 + with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: + for root, dirs, files in os.walk(dir_path): + for file in files: + file_path = os.path.join(root, file) + arcname = os.path.relpath(file_path, dir_path) + zipf.write(file_path, arcname) + file_count += 1 + + result = { + 'zipped': True, + 'zip_path': str(zip_path), + 'dir_path': str(dir_path), + 'file_count': file_count + } + self.log.info("目录压缩成功 | zip={} dir={} files={}", + zip_path, dir_path, file_count) + return self._to_dataframe(result) + except Exception as e: + self.log.error("目录压缩失败 | zip={} error={}", zip_path, str(e)) + raise + + def unzip(self, zip_path, extract_to=None): + """ + 解压zip文件 + :param zip_path: zip文件路径 + :param extract_to: 解压目标目录,默认为zip文件所在目录 + :return: DataFrame({'unzipped': bool, 'zip_path': str, 'extract_to': str, 'file_count': int}) + """ + zip_path = self._resolve_path(zip_path) + if extract_to is None: + extract_to = os.path.dirname(zip_path) + else: + extract_to = self._resolve_path(extract_to) + + try: + self.create_dir(extract_to) + with zipfile.ZipFile(zip_path, 'r') as zipf: + file_list = zipf.namelist() + zipf.extractall(extract_to) + + result = { + 'unzipped': True, + 'zip_path': str(zip_path), + 'extract_to': str(extract_to), + 'file_count': len(file_list) + } + self.log.info("文件解压成功 | zip={} extract_to={} files={}", + zip_path, extract_to, len(file_list)) + return self._to_dataframe(result) + except Exception as e: + self.log.error("文件解压失败 | zip={} error={}", zip_path, str(e)) + raise + + def compress_large_log(self, log_path, max_size_mb=20): + """ + 压缩过大的日志文件 + :param log_path: 日志文件路径 + :param max_size_mb: 最大大小(MB),超过则压缩 + :return: DataFrame({'compressed': bool, 'original_path': str, 'zip_path': str, 'original_size_mb': float}) + """ + log_path = self._resolve_path(log_path) + if not self.file_exists(log_path).iloc[0]['exists']: + return self._to_dataframe({'compressed': [False]}) + + max_size_bytes = max_size_mb * 1024 * 1024 + size_info = self.get_file_size(log_path) + current_size = size_info.iloc[0]['size_bytes'] + + if current_size > max_size_bytes: + try: + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + zip_path = f"{log_path}_{timestamp}.zip" + self.zip_files([log_path], zip_path) + self.delete_file(log_path) + + result = { + 'compressed': True, + 'original_path': str(log_path), + 'zip_path': zip_path, + 'original_size_mb': round(current_size/1024/1024, 2) + } + self.log.info("日志文件压缩 | original={} compressed={} original_size={} MB", + log_path, zip_path, result['original_size_mb']) + return self._to_dataframe(result) + except Exception as e: + self.log.error("日志压缩失败 | path={} error={}", log_path, str(e)) + raise + + return self._to_dataframe({'compressed': [False]}) + + def get_file_extension(self, file_path): + """ + 获取文件扩展名 + :param file_path: 文件路径 + :return: 文件扩展名字符串(小写,不带点) + """ + file_path = self._resolve_path(file_path) + ext = Path(file_path).suffix.lower().lstrip('.') + self.log.trace("获取文件扩展名 | path={} ext={}", file_path, ext) + return ext # 直接返回字符串而不是DataFrame + + def change_file_extension(self, file_path, new_extension): + """ + 修改文件扩展名 + :param file_path: 文件路径 + :param new_extension: 新扩展名(不带点) + :return: DataFrame({'original_path': str, 'new_path': str}) + """ + file_path = self._resolve_path(file_path) + new_path = str(Path(file_path).with_suffix(f'.{new_extension}')) + result = {'original_path': str(file_path), 'new_path': new_path} + self.log.debug("修改文件扩展名 | original={} new={}", file_path, new_path) + return self._to_dataframe(result) + + def join_path(self, *paths): + """ + 拼接路径 + :param paths: 多个路径部分 + :return: DataFrame({'joined_path': str}) + """ + joined_path = str(Path(*paths)) + self.log.trace("路径拼接 | parts={} result={}", paths, joined_path) + return self._to_dataframe({'joined_path': [joined_path]}) \ No newline at end of file