import pytest import pandas as pd import os from pathlib import Path from utils.file_handler import FileHandler from datetime import datetime @pytest.fixture def temp_dir(tmp_path): """创建临时测试目录""" test_dir = tmp_path / "test_files" test_dir.mkdir() return test_dir @pytest.fixture def file_handler(temp_dir): """创建FileHandler实例""" return FileHandler(temp_dir) @pytest.fixture def sample_dataframe(): """创建测试用DataFrame""" return pd.DataFrame({ 'id': [1, 2, 3], 'name': ['Alice', 'Bob', 'Charlie'], 'value': [10.5, 20.3, 30.1] }) @pytest.fixture def sample_text_file(temp_dir): """创建测试文本文件""" file_path = temp_dir / "test.txt" with open(file_path, 'w') as f: f.write("line1\nline2\nline3") return file_path # 开始测试 def test_read_write_csv(file_handler, temp_dir, sample_dataframe): """测试CSV文件读写""" test_file = temp_dir / "test.csv" # 测试写入 write_result = file_handler.write_file(test_file, sample_dataframe) # 修改断言方式 assert bool(write_result.iloc[0]['success']) == True # 使用bool()转换 # 或者 assert write_result.iloc[0]['success'] == True # 使用值比较 assert os.path.exists(test_file) # 测试读取 df = file_handler.read_file(test_file) assert df.shape == (3, 3) assert list(df.columns) == ['id', 'name', 'value'] def test_read_write_json(file_handler, temp_dir, sample_dataframe): """测试JSON文件读写""" test_file = temp_dir / "test.json" # 测试写入 write_result = file_handler.write_file(test_file, sample_dataframe) assert write_result.iloc[0]['success'] == True # 测试读取 df = file_handler.read_file(test_file) assert df.shape == (3, 3) def test_read_write_excel(file_handler, temp_dir, sample_dataframe): """测试Excel文件读写""" test_file = temp_dir / "test.xlsx" # 测试写入 write_result = file_handler.write_file(test_file, sample_dataframe) assert write_result.iloc[0]['success'] == True # 测试读取 df = file_handler.read_file(test_file) assert df.shape == (3, 3) def test_read_write_csv(file_handler, temp_dir, sample_dataframe): """测试CSV文件读写""" test_file = temp_dir / "test.csv" # 测试写入 write_result = file_handler.write_file(test_file, sample_dataframe) # 修改断言方式 assert bool(write_result.iloc[0]['success']) == True # 使用bool()转换 # 或者 assert write_result.iloc[0]['success'] == True # 使用值比较 assert os.path.exists(test_file) # 测试读取 df = file_handler.read_file(test_file) assert df.shape == (3, 3) assert list(df.columns) == ['id', 'name', 'value'] # 文件操作测试 def test_file_operations(file_handler, sample_text_file): """测试文件存在检查、删除等操作""" # 测试文件存在检查 exists_df = file_handler.file_exists(sample_text_file) assert exists_df.iloc[0]['exists'] == True # 测试获取文件大小 size_df = file_handler.get_file_size(sample_text_file) assert size_df.iloc[0]['size_bytes'] > 0 # 测试获取修改时间 mtime_df = file_handler.get_file_modified_time(sample_text_file) assert isinstance(mtime_df.iloc[0]['modified_time'], datetime) # 测试删除文件 delete_df = file_handler.delete_file(sample_text_file) assert delete_df.iloc[0]['deleted'] == True assert not os.path.exists(sample_text_file) def test_directory_operations(file_handler, temp_dir): """测试目录操作""" test_dir = temp_dir / "subdir" # 测试创建目录 create_df = file_handler.create_dir(test_dir) assert create_df.iloc[0]['created'] == True assert os.path.isdir(test_dir) # 测试列出目录 list_df = file_handler.list_dirs(temp_dir) assert any("subdir" in d for d in list_df['dir_name'].values) # 测试删除目录 delete_df = file_handler.delete_dir(test_dir) assert delete_df.iloc[0]['deleted'] == True assert not os.path.exists(test_dir) # 文件压缩 def test_zip_operations(file_handler, temp_dir, sample_dataframe): """测试文件压缩解压""" # 创建测试文件 test_file1 = temp_dir / "file1.txt" test_file2 = temp_dir / "file2.csv" file_handler.write_file(test_file1, "test content") file_handler.write_file(test_file2, sample_dataframe) # 测试压缩文件 zip_path = temp_dir / "test.zip" zip_result = file_handler.zip_files([test_file1, test_file2], zip_path) assert zip_result.iloc[0]['zipped'] == True assert zip_result.iloc[0]['file_count'] == 2 # 测试解压 extract_dir = temp_dir / "extracted" unzip_result = file_handler.unzip(zip_path, extract_dir) assert unzip_result.iloc[0]['unzipped'] is True assert os.path.exists(extract_dir / "file1.txt") assert os.path.exists(extract_dir / "file2.csv") def test_zip_directory(file_handler, temp_dir): """测试目录压缩""" # 创建测试目录结构 test_dir = temp_dir / "test_dir" sub_dir = test_dir / "sub" sub_dir.mkdir(parents=True) (test_dir / "file1.txt").write_text("content1") (sub_dir / "file2.txt").write_text("content2") # 测试压缩目录 zip_path = temp_dir / "dir.zip" zip_result = file_handler.zip_dir(test_dir, zip_path) assert zip_result.iloc[0]['zipped'] == True assert zip_result.iloc[0]['file_count'] == 2