import re import json import os import logging from pathlib import Path logger = logging.getLogger('sensitive_filter') logger.setLevel(logging.INFO) class SensitiveDataFilter: """ 敏感数据过滤器 - 用于检测和屏蔽输出内容中的敏感信息 功能: 1. 自动识别并过滤手机号、邮箱、身份证号、信用卡号等敏感信息 2. 支持自定义敏感信息模式和替换文本 3. 提供批量处理和实时过滤功能 """ _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(SensitiveDataFilter, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return # 默认配置 self.config = { 'enabled': os.getenv('ENABLE_SENSITIVE_DATA_FILTER', 'true').lower() == 'true', 'patterns': { 'phone': r'\b1[3-9]\d{9}\b', 'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', 'id_card': r'\b[1-9]\d{5}(19|20)\d{2}(0[1-9]|1[0-2])(0[1-9]|[12]\d|3[01])\d{3}[\dXx]\b', 'credit_card': r'\b\d{4}[ -]?\d{4}[ -]?\d{4}[ -]?\d{4}\b', 'address': r'(北京|上海|广州|深圳|天津|重庆|南京|杭州|武汉|成都|西安)市.*?(路|街|道|巷).*?(号)' }, 'replacements': { 'phone': '***********', 'email': '******@*****', 'id_card': '******************', 'credit_card': '****************', 'address': '[地址已隐藏]' } } # 加载自定义配置 self._load_config() # 编译正则表达式 self._compile_patterns() self._initialized = True logger.info("敏感数据过滤器初始化完成") if self.config['enabled']: logger.info(f"已启用以下类型的敏感数据过滤: {', '.join(self.config['patterns'].keys())}") else: logger.info("敏感数据过滤器已禁用") def _load_config(self): """加载自定义配置""" # 配置文件路径 data_dir = os.getenv('DATA_DIR', 'data') config_path = os.path.join(data_dir, 'security', 'sensitive_filter.json') if os.path.exists(config_path): try: with open(config_path, 'r', encoding='utf-8') as f: custom_config = json.load(f) # 更新配置 if 'enabled' in custom_config: self.config['enabled'] = custom_config['enabled'] if 'patterns' in custom_config: for key, pattern in custom_config['patterns'].items(): self.config['patterns'][key] = pattern if 'replacements' in custom_config: for key, replacement in custom_config['replacements'].items(): self.config['replacements'][key] = replacement logger.info(f"已加载自定义敏感数据过滤配置: {config_path}") except Exception as e: logger.error(f"加载敏感数据过滤配置失败: {e}") def _compile_patterns(self): """编译正则表达式""" self.compiled_patterns = {} for key, pattern in self.config['patterns'].items(): try: self.compiled_patterns[key] = re.compile(pattern) logger.debug(f"已编译敏感数据模式: {key} - {pattern}") except re.error as e: logger.error(f"编译敏感数据模式失败: {key} - {pattern}: {e}") def filter_text(self, text): """ 过滤文本中的敏感信息 参数: text: 要过滤的文本 返回: 过滤后的文本 """ if not self.config['enabled'] or not text: return text filtered_text = text for key, pattern in self.compiled_patterns.items(): replacement = self.config['replacements'].get(key, '[FILTERED]') filtered_text = pattern.sub(replacement, filtered_text) return filtered_text def filter_dict(self, data, *skip_keys): """ 过滤字典中的敏感信息 参数: data: 要过滤的字典 skip_keys: 要跳过的键(不进行过滤) 返回: 过滤后的字典 """ if not self.config['enabled'] or not data: return data if not isinstance(data, dict): if isinstance(data, str): return self.filter_text(data) return data filtered_data = {} for key, value in data.items(): if key in skip_keys: filtered_data[key] = value continue if isinstance(value, dict): filtered_data[key] = self.filter_dict(value, *skip_keys) elif isinstance(value, list): filtered_data[key] = [ self.filter_dict(item, *skip_keys) if isinstance(item, (dict, list)) else self.filter_text(item) if isinstance(item, str) else item for item in value ] elif isinstance(value, str): filtered_data[key] = self.filter_text(value) else: filtered_data[key] = value return filtered_data def filter_list(self, data, *skip_keys): """ 过滤列表中的敏感信息 参数: data: 要过滤的列表 skip_keys: 如果列表项是字典,要跳过的键 返回: 过滤后的列表 """ if not self.config['enabled'] or not data: return data if not isinstance(data, list): if isinstance(data, dict): return self.filter_dict(data, *skip_keys) if isinstance(data, str): return self.filter_text(data) return data return [ self.filter_dict(item, *skip_keys) if isinstance(item, dict) else self.filter_list(item, *skip_keys) if isinstance(item, list) else self.filter_text(item) if isinstance(item, str) else item for item in data ] def is_sensitive_info(self, text, info_type=None): """ 检查文本是否包含敏感信息 参数: text: 要检查的文本 info_type: 指定要检查的敏感信息类型,如果为None则检查所有类型 返回: 包含敏感信息返回True,否则返回False """ if not self.config['enabled'] or not text: return False if info_type: if info_type not in self.compiled_patterns: logger.warning(f"未知的敏感信息类型: {info_type}") return False return bool(self.compiled_patterns[info_type].search(text)) for pattern in self.compiled_patterns.values(): if pattern.search(text): return True return False def get_sensitive_info_types(self, text): """ 获取文本中包含的敏感信息类型 参数: text: 要检查的文本 返回: 包含的敏感信息类型列表 """ if not self.config['enabled'] or not text: return [] types = [] for key, pattern in self.compiled_patterns.items(): if pattern.search(text): types.append(key) return types def enable(self): """启用敏感数据过滤器""" self.config['enabled'] = True logger.info("敏感数据过滤器已启用") def disable(self): """禁用敏感数据过滤器""" self.config['enabled'] = False logger.info("敏感数据过滤器已禁用") def is_enabled(self): """检查敏感数据过滤器是否启用""" return self.config['enabled'] def add_pattern(self, key, pattern, replacement='[FILTERED]'): """ 添加自定义敏感信息模式 参数: key: 敏感信息类型标识 pattern: 正则表达式字符串 replacement: 替换文本 """ try: # 测试是否是有效的正则表达式 re.compile(pattern) # 更新配置 self.config['patterns'][key] = pattern self.config['replacements'][key] = replacement # 重新编译正则表达式 self._compile_patterns() logger.info(f"已添加敏感信息模式: {key}") return True except re.error as e: logger.error(f"添加敏感信息模式失败: {key} - {pattern}: {e}") return False def remove_pattern(self, key): """ 移除敏感信息模式 参数: key: 敏感信息类型标识 """ if key in self.config['patterns']: del self.config['patterns'][key] if key in self.config['replacements']: del self.config['replacements'][key] if key in self.compiled_patterns: del self.compiled_patterns[key] logger.info(f"已移除敏感信息模式: {key}") return True logger.warning(f"未找到敏感信息模式: {key}") return False def save_config(self): """保存当前配置到文件""" data_dir = os.getenv('DATA_DIR', 'data') security_dir = os.path.join(data_dir, 'security') os.makedirs(security_dir, exist_ok=True) config_path = os.path.join(security_dir, 'sensitive_filter.json') try: with open(config_path, 'w', encoding='utf-8') as f: json.dump(self.config, f, ensure_ascii=False, indent=2) logger.info(f"敏感数据过滤配置已保存到: {config_path}") return True except Exception as e: logger.error(f"保存敏感数据过滤配置失败: {e}") return False # 创建全局敏感数据过滤器实例 sensitive_filter = SensitiveDataFilter() # 提供便捷的过滤函数 def filter_text(text): """过滤文本中的敏感信息""" return sensitive_filter.filter_text(text) def filter_dict(data, *skip_keys): """过滤字典中的敏感信息""" return sensitive_filter.filter_dict(data, *skip_keys) def filter_list(data, *skip_keys): """过滤列表中的敏感信息""" return sensitive_filter.filter_list(data, *skip_keys) def is_sensitive_info(text, info_type=None): """检查文本是否包含敏感信息""" return sensitive_filter.is_sensitive_info(text, info_type) # 示例用法 if __name__ == "__main__": # 测试文本 test_text = """ 联系人: 张三 电话: 13812345678 邮箱: zhangsan@example.com 身份证: 110101199001011234 地址: 北京市海淀区中关村大街20号 信用卡: 6225 1234 5678 9012 """ # 过滤敏感信息 filtered_text = filter_text(test_text) print("原始文本:") print(test_text) print("\n过滤后:") print(filtered_text) # 检查敏感信息类型 types = sensitive_filter.get_sensitive_info_types(test_text) print(f"\n包含的敏感信息类型: {types}")