357 lines
12 KiB
Python
357 lines
12 KiB
Python
import re
|
|
import json
|
|
import os
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger('sensitive_filter')
|
|
logger.setLevel(logging.INFO)
|
|
|
|
class SensitiveDataFilter:
|
|
"""
|
|
敏感数据过滤器 - 用于检测和屏蔽输出内容中的敏感信息
|
|
|
|
功能:
|
|
1. 自动识别并过滤手机号、邮箱、身份证号、信用卡号等敏感信息
|
|
2. 支持自定义敏感信息模式和替换文本
|
|
3. 提供批量处理和实时过滤功能
|
|
"""
|
|
|
|
_instance = None
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
cls._instance = super(SensitiveDataFilter, cls).__new__(cls)
|
|
cls._instance._initialized = False
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
if self._initialized:
|
|
return
|
|
|
|
# 默认配置
|
|
self.config = {
|
|
'enabled': os.getenv('ENABLE_SENSITIVE_DATA_FILTER', 'true').lower() == 'true',
|
|
'patterns': {
|
|
'phone': r'\b1[3-9]\d{9}\b',
|
|
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
|
|
'id_card': r'\b[1-9]\d{5}(19|20)\d{2}(0[1-9]|1[0-2])(0[1-9]|[12]\d|3[01])\d{3}[\dXx]\b',
|
|
'credit_card': r'\b\d{4}[ -]?\d{4}[ -]?\d{4}[ -]?\d{4}\b',
|
|
'address': r'(北京|上海|广州|深圳|天津|重庆|南京|杭州|武汉|成都|西安)市.*?(路|街|道|巷).*?(号)'
|
|
},
|
|
'replacements': {
|
|
'phone': '***********',
|
|
'email': '******@*****',
|
|
'id_card': '******************',
|
|
'credit_card': '****************',
|
|
'address': '[地址已隐藏]'
|
|
}
|
|
}
|
|
|
|
# 加载自定义配置
|
|
self._load_config()
|
|
|
|
# 编译正则表达式
|
|
self._compile_patterns()
|
|
|
|
self._initialized = True
|
|
|
|
logger.info("敏感数据过滤器初始化完成")
|
|
if self.config['enabled']:
|
|
logger.info(f"已启用以下类型的敏感数据过滤: {', '.join(self.config['patterns'].keys())}")
|
|
else:
|
|
logger.info("敏感数据过滤器已禁用")
|
|
|
|
def _load_config(self):
|
|
"""加载自定义配置"""
|
|
# 配置文件路径
|
|
data_dir = os.getenv('DATA_DIR', 'data')
|
|
config_path = os.path.join(data_dir, 'security', 'sensitive_filter.json')
|
|
|
|
if os.path.exists(config_path):
|
|
try:
|
|
with open(config_path, 'r', encoding='utf-8') as f:
|
|
custom_config = json.load(f)
|
|
|
|
# 更新配置
|
|
if 'enabled' in custom_config:
|
|
self.config['enabled'] = custom_config['enabled']
|
|
|
|
if 'patterns' in custom_config:
|
|
for key, pattern in custom_config['patterns'].items():
|
|
self.config['patterns'][key] = pattern
|
|
|
|
if 'replacements' in custom_config:
|
|
for key, replacement in custom_config['replacements'].items():
|
|
self.config['replacements'][key] = replacement
|
|
|
|
logger.info(f"已加载自定义敏感数据过滤配置: {config_path}")
|
|
except Exception as e:
|
|
logger.error(f"加载敏感数据过滤配置失败: {e}")
|
|
|
|
def _compile_patterns(self):
|
|
"""编译正则表达式"""
|
|
self.compiled_patterns = {}
|
|
for key, pattern in self.config['patterns'].items():
|
|
try:
|
|
self.compiled_patterns[key] = re.compile(pattern)
|
|
logger.debug(f"已编译敏感数据模式: {key} - {pattern}")
|
|
except re.error as e:
|
|
logger.error(f"编译敏感数据模式失败: {key} - {pattern}: {e}")
|
|
|
|
def filter_text(self, text):
|
|
"""
|
|
过滤文本中的敏感信息
|
|
|
|
参数:
|
|
text: 要过滤的文本
|
|
|
|
返回:
|
|
过滤后的文本
|
|
"""
|
|
if not self.config['enabled'] or not text:
|
|
return text
|
|
|
|
filtered_text = text
|
|
for key, pattern in self.compiled_patterns.items():
|
|
replacement = self.config['replacements'].get(key, '[FILTERED]')
|
|
filtered_text = pattern.sub(replacement, filtered_text)
|
|
|
|
return filtered_text
|
|
|
|
def filter_dict(self, data, *skip_keys):
|
|
"""
|
|
过滤字典中的敏感信息
|
|
|
|
参数:
|
|
data: 要过滤的字典
|
|
skip_keys: 要跳过的键(不进行过滤)
|
|
|
|
返回:
|
|
过滤后的字典
|
|
"""
|
|
if not self.config['enabled'] or not data:
|
|
return data
|
|
|
|
if not isinstance(data, dict):
|
|
if isinstance(data, str):
|
|
return self.filter_text(data)
|
|
return data
|
|
|
|
filtered_data = {}
|
|
for key, value in data.items():
|
|
if key in skip_keys:
|
|
filtered_data[key] = value
|
|
continue
|
|
|
|
if isinstance(value, dict):
|
|
filtered_data[key] = self.filter_dict(value, *skip_keys)
|
|
elif isinstance(value, list):
|
|
filtered_data[key] = [
|
|
self.filter_dict(item, *skip_keys) if isinstance(item, (dict, list)) else
|
|
self.filter_text(item) if isinstance(item, str) else item
|
|
for item in value
|
|
]
|
|
elif isinstance(value, str):
|
|
filtered_data[key] = self.filter_text(value)
|
|
else:
|
|
filtered_data[key] = value
|
|
|
|
return filtered_data
|
|
|
|
def filter_list(self, data, *skip_keys):
|
|
"""
|
|
过滤列表中的敏感信息
|
|
|
|
参数:
|
|
data: 要过滤的列表
|
|
skip_keys: 如果列表项是字典,要跳过的键
|
|
|
|
返回:
|
|
过滤后的列表
|
|
"""
|
|
if not self.config['enabled'] or not data:
|
|
return data
|
|
|
|
if not isinstance(data, list):
|
|
if isinstance(data, dict):
|
|
return self.filter_dict(data, *skip_keys)
|
|
if isinstance(data, str):
|
|
return self.filter_text(data)
|
|
return data
|
|
|
|
return [
|
|
self.filter_dict(item, *skip_keys) if isinstance(item, dict) else
|
|
self.filter_list(item, *skip_keys) if isinstance(item, list) else
|
|
self.filter_text(item) if isinstance(item, str) else item
|
|
for item in data
|
|
]
|
|
|
|
def is_sensitive_info(self, text, info_type=None):
|
|
"""
|
|
检查文本是否包含敏感信息
|
|
|
|
参数:
|
|
text: 要检查的文本
|
|
info_type: 指定要检查的敏感信息类型,如果为None则检查所有类型
|
|
|
|
返回:
|
|
包含敏感信息返回True,否则返回False
|
|
"""
|
|
if not self.config['enabled'] or not text:
|
|
return False
|
|
|
|
if info_type:
|
|
if info_type not in self.compiled_patterns:
|
|
logger.warning(f"未知的敏感信息类型: {info_type}")
|
|
return False
|
|
return bool(self.compiled_patterns[info_type].search(text))
|
|
|
|
for pattern in self.compiled_patterns.values():
|
|
if pattern.search(text):
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_sensitive_info_types(self, text):
|
|
"""
|
|
获取文本中包含的敏感信息类型
|
|
|
|
参数:
|
|
text: 要检查的文本
|
|
|
|
返回:
|
|
包含的敏感信息类型列表
|
|
"""
|
|
if not self.config['enabled'] or not text:
|
|
return []
|
|
|
|
types = []
|
|
for key, pattern in self.compiled_patterns.items():
|
|
if pattern.search(text):
|
|
types.append(key)
|
|
|
|
return types
|
|
|
|
def enable(self):
|
|
"""启用敏感数据过滤器"""
|
|
self.config['enabled'] = True
|
|
logger.info("敏感数据过滤器已启用")
|
|
|
|
def disable(self):
|
|
"""禁用敏感数据过滤器"""
|
|
self.config['enabled'] = False
|
|
logger.info("敏感数据过滤器已禁用")
|
|
|
|
def is_enabled(self):
|
|
"""检查敏感数据过滤器是否启用"""
|
|
return self.config['enabled']
|
|
|
|
def add_pattern(self, key, pattern, replacement='[FILTERED]'):
|
|
"""
|
|
添加自定义敏感信息模式
|
|
|
|
参数:
|
|
key: 敏感信息类型标识
|
|
pattern: 正则表达式字符串
|
|
replacement: 替换文本
|
|
"""
|
|
try:
|
|
# 测试是否是有效的正则表达式
|
|
re.compile(pattern)
|
|
|
|
# 更新配置
|
|
self.config['patterns'][key] = pattern
|
|
self.config['replacements'][key] = replacement
|
|
|
|
# 重新编译正则表达式
|
|
self._compile_patterns()
|
|
|
|
logger.info(f"已添加敏感信息模式: {key}")
|
|
return True
|
|
except re.error as e:
|
|
logger.error(f"添加敏感信息模式失败: {key} - {pattern}: {e}")
|
|
return False
|
|
|
|
def remove_pattern(self, key):
|
|
"""
|
|
移除敏感信息模式
|
|
|
|
参数:
|
|
key: 敏感信息类型标识
|
|
"""
|
|
if key in self.config['patterns']:
|
|
del self.config['patterns'][key]
|
|
|
|
if key in self.config['replacements']:
|
|
del self.config['replacements'][key]
|
|
|
|
if key in self.compiled_patterns:
|
|
del self.compiled_patterns[key]
|
|
|
|
logger.info(f"已移除敏感信息模式: {key}")
|
|
return True
|
|
|
|
logger.warning(f"未找到敏感信息模式: {key}")
|
|
return False
|
|
|
|
def save_config(self):
|
|
"""保存当前配置到文件"""
|
|
data_dir = os.getenv('DATA_DIR', 'data')
|
|
security_dir = os.path.join(data_dir, 'security')
|
|
os.makedirs(security_dir, exist_ok=True)
|
|
|
|
config_path = os.path.join(security_dir, 'sensitive_filter.json')
|
|
|
|
try:
|
|
with open(config_path, 'w', encoding='utf-8') as f:
|
|
json.dump(self.config, f, ensure_ascii=False, indent=2)
|
|
|
|
logger.info(f"敏感数据过滤配置已保存到: {config_path}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"保存敏感数据过滤配置失败: {e}")
|
|
return False
|
|
|
|
# 创建全局敏感数据过滤器实例
|
|
sensitive_filter = SensitiveDataFilter()
|
|
|
|
# 提供便捷的过滤函数
|
|
def filter_text(text):
|
|
"""过滤文本中的敏感信息"""
|
|
return sensitive_filter.filter_text(text)
|
|
|
|
def filter_dict(data, *skip_keys):
|
|
"""过滤字典中的敏感信息"""
|
|
return sensitive_filter.filter_dict(data, *skip_keys)
|
|
|
|
def filter_list(data, *skip_keys):
|
|
"""过滤列表中的敏感信息"""
|
|
return sensitive_filter.filter_list(data, *skip_keys)
|
|
|
|
def is_sensitive_info(text, info_type=None):
|
|
"""检查文本是否包含敏感信息"""
|
|
return sensitive_filter.is_sensitive_info(text, info_type)
|
|
|
|
# 示例用法
|
|
if __name__ == "__main__":
|
|
# 测试文本
|
|
test_text = """
|
|
联系人: 张三
|
|
电话: 13812345678
|
|
邮箱: zhangsan@example.com
|
|
身份证: 110101199001011234
|
|
地址: 北京市海淀区中关村大街20号
|
|
信用卡: 6225 1234 5678 9012
|
|
"""
|
|
|
|
# 过滤敏感信息
|
|
filtered_text = filter_text(test_text)
|
|
print("原始文本:")
|
|
print(test_text)
|
|
print("\n过滤后:")
|
|
print(filtered_text)
|
|
|
|
# 检查敏感信息类型
|
|
types = sensitive_filter.get_sensitive_info_types(test_text)
|
|
print(f"\n包含的敏感信息类型: {types}") |