diff --git a/views/user/user.py b/views/user/user.py index cca4b95..a044285 100644 --- a/views/user/user.py +++ b/views/user/user.py @@ -1,6 +1,6 @@ import time import hashlib -from flask import Blueprint, redirect, render_template, request, Flask, session, current_app +from flask import Blueprint, redirect, render_template, request, Flask, session, current_app, make_response from datetime import datetime, timedelta import re from utils.query import query @@ -8,40 +8,138 @@ from utils.errorResponse import errorResponse from utils.logger import app_logger as logging from functools import wraps import secrets +from flask_limiter import Limiter +from flask_limiter.util import get_remote_address +import redis +import json +import bleach +from argon2 import PasswordHasher +from argon2.exceptions import VerifyMismatchError +import html + +# 创建Argon2密码哈希器 +ph = PasswordHasher() + +# Redis连接 +redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) + +# 创建限流器 +limiter = Limiter( + key_func=get_remote_address, + default_limits=["200 per day", "50 per hour"] +) ub = Blueprint('user', __name__, url_prefix='/user', template_folder='templates') +def sanitize_input(text): + """清理用户输入,防止XSS攻击""" + if text is None: + return None + return bleach.clean(str(text), strip=True) + +def validate_csrf_token(): + """验证CSRF令牌""" + token = request.form.get('csrf_token') + stored_token = session.get('csrf_token') + if not token or not stored_token or token != stored_token: + return False + return True + +def get_client_info(): + """获取客户端信息""" + return { + 'ip': request.remote_addr, + 'user_agent': str(request.user_agent.string), + 'platform': str(request.user_agent.platform), + 'browser': str(request.user_agent.browser), + } + +def is_suspicious_ip(ip): + """检查IP是否可疑""" + key = f"login_attempts:{ip}" + attempts = redis_client.get(key) + if attempts and int(attempts) >= 5: # 5次失败尝试 + return True + return False + +def record_failed_attempt(ip): + """记录失败的登录尝试""" + key = f"login_attempts:{ip}" + pipe = redis_client.pipeline() + pipe.incr(key) + pipe.expire(key, 1800) # 30分钟后重置 + pipe.execute() + +def clear_login_attempts(ip): + """清除登录尝试记录""" + redis_client.delete(f"login_attempts:{ip}") + +def set_secure_headers(response): + """设置安全响应头""" + response.headers['X-Content-Type-Options'] = 'nosniff' + response.headers['X-Frame-Options'] = 'SAMEORIGIN' + response.headers['X-XSS-Protection'] = '1; mode=block' + response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains' + response.headers['Content-Security-Policy'] = "default-src 'self'" + return response + def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'username' not in session: return redirect('/user/login') + + # 验证会话完整性 + if 'client_info' not in session or 'session_id' not in session: + session.clear() + return redirect('/user/login') + + # 验证客户端信息 + current_client = get_client_info() + stored_client = session['client_info'] + + if (current_client['ip'] != stored_client['ip'] or + current_client['user_agent'] != stored_client['user_agent']): + session.clear() + return redirect('/user/login') + + # 验证会话ID + stored_session_id = redis_client.get(f"session:{session['username']}") + if not stored_session_id or stored_session_id != session['session_id']: + session.clear() + return redirect('/user/login') + return f(*args, **kwargs) return decorated_function -# 密码加密函数 -def hash_password(password: str, salt: str = None) -> tuple: +def hash_password(password: str) -> str: """ - 使用 SHA256 对密码进行加盐哈希 + 使用Argon2id算法哈希密码 :param password: 用户输入的密码 - :param salt: 可选的盐值 - :return: (哈希后的密码, 盐值) + :return: 哈希后的密码 """ - if not salt: - salt = secrets.token_hex(16) - hash_obj = hashlib.sha256() - hash_obj.update(salt.encode('utf-8')) - hash_obj.update(password.encode('utf-8')) - return hash_obj.hexdigest(), salt + return ph.hash(password) + +def verify_password(stored_hash: str, password: str) -> bool: + """ + 验证密码 + :param stored_hash: 存储的密码哈希 + :param password: 用户输入的密码 + :return: 是否匹配 + """ + try: + return ph.verify(stored_hash, password) + except VerifyMismatchError: + return False def validate_password(password: str) -> bool: """ 验证密码强度 """ - if len(password) < 8: + if len(password) < 12: # 增加最小长度要求 return False if not re.search(r"[A-Z]", password): return False @@ -51,96 +149,185 @@ def validate_password(password: str) -> bool: return False if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password): return False + # 检查常见密码模式 + common_patterns = ['password', '123456', 'qwerty'] + if any(pattern in password.lower() for pattern in common_patterns): + return False return True @ub.route('/login', methods=['GET', 'POST']) +@limiter.limit("5 per minute") def login(): - """ - 处理用户登录请求 - """ + """处理用户登录请求""" if request.method == 'GET': - return render_template('login_and_register.html') + response = make_response(render_template('login_and_register.html')) + return set_secure_headers(response) try: - username = request.form.get('username') - password = request.form.get('password') + if request.method == 'POST' and not validate_csrf_token(): + logging.warning("CSRF验证失败") + return errorResponse('无效的请求') + + client_ip = request.remote_addr + + if is_suspicious_ip(client_ip): + logging.warning(f"可疑IP尝试登录: {client_ip}") + return errorResponse('由于多次失败尝试,请30分钟后再试') + + username = sanitize_input(request.form.get('username')) + password = request.form.get('password') # 密码不需要sanitize if not username or not password: logging.warning("登录失败:用户名或密码为空") - return render_template('login_and_register.html', msg='用户名和密码不能为空') + return errorResponse('用户名和密码不能为空') - # 查询用户和盐值 - sql = "SELECT password, salt FROM user WHERE username = %s" + # 查询用户信息 + sql = "SELECT password, status FROM user WHERE username = %s" result = query(sql, [username], "select") if result: stored_password = result[0]['password'] - salt = result[0]['salt'] + status = result[0]['status'] - # 验证密码 - hashed_input, _ = hash_password(password, salt) + if status != 'active': + logging.warning(f"已禁用的账户尝试登录: {username}") + return errorResponse('账户已被禁用') - if hashed_input == stored_password: + if verify_password(stored_password, password): session.clear() + session.regenerate() + + # 生成唯一会话ID + session_id = secrets.token_hex(32) + client_info = get_client_info() + + # 存储会话信息 session['username'] = username session['login_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') session['csrf_token'] = secrets.token_hex(32) + session['client_info'] = client_info + session['session_id'] = session_id session.permanent = True current_app.permanent_session_lifetime = timedelta(hours=2) + # 在Redis中存储会话ID + redis_client.setex( + f"session:{username}", + int(current_app.permanent_session_lifetime.total_seconds()), + session_id + ) + + clear_login_attempts(client_ip) + + # 记录登录历史 + login_history_sql = ''' + INSERT INTO login_history + (username, login_time, ip_address, user_agent, success, attempt_count) + VALUES (%s, %s, %s, %s, %s, %s) + ''' + query(login_history_sql, [ + username, + datetime.now(), + client_info['ip'], + client_info['user_agent'], + True, + redis_client.get(f"login_attempts:{client_ip}") or 0 + ]) + logging.info(f"用户 {username} 登录成功") - return redirect('/page/home') + response = make_response(redirect('/page/home')) + return set_secure_headers(response) - # 使用相同的响应防止用户枚举 + record_failed_attempt(client_ip) logging.warning(f"登录失败:用户名或密码错误") - return render_template('login_and_register.html', msg='用户名或密码错误') + return errorResponse('用户名或密码错误') except Exception as e: logging.error(f"登录过程发生错误: {e}") - return render_template('login_and_register.html', msg='登录失败,请稍后重试') + return errorResponse('登录失败,请稍后重试') @ub.route('/register', methods=['GET', 'POST']) +@limiter.limit("3 per hour") def register(): if request.method == 'GET': - return render_template('login_and_register.html') + response = make_response(render_template('login_and_register.html')) + return set_secure_headers(response) try: - username = request.form.get('username') - password = request.form.get('password') + if request.method == 'POST' and not validate_csrf_token(): + logging.warning("CSRF验证失败") + return errorResponse('无效的请求') - if not username or not password: - return errorResponse('用户名和密码不能为空') + username = sanitize_input(request.form.get('username')) + password = request.form.get('password') + email = sanitize_input(request.form.get('email')) + + if not username or not password or not email: + return errorResponse('用户名、密码和邮箱不能为空') # 验证用户名格式 if not re.match(r'^[a-zA-Z0-9_]{4,20}$', username): return errorResponse('用户名只能包含字母、数字和下划线,长度4-20位') + # 验证邮箱格式 + if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email): + return errorResponse('邮箱格式不正确') + # 验证密码强度 if not validate_password(password): - return errorResponse('密码必须包含大小写字母、数字和特殊字符,且长度至少8位') + return errorResponse('密码必须包含大小写字母、数字和特殊字符,且长度至少12位') - # 使用事务处理竞态条件 try: - # 检查用户名是否存在 - check_sql = "SELECT COUNT(*) as count FROM user WHERE username = %s" - result = query(check_sql, [username], "select") + # 检查用户名和邮箱是否存在 + check_sql = """ + SELECT + (SELECT COUNT(*) FROM user WHERE LOWER(username) = LOWER(%s)) as username_count, + (SELECT COUNT(*) FROM user WHERE LOWER(email) = LOWER(%s)) as email_count + """ + result = query(check_sql, [username.lower(), email.lower()], "select") - if result[0]['count'] > 0: + if result[0]['username_count'] > 0: return errorResponse('该用户名已被注册') + + if result[0]['email_count'] > 0: + return errorResponse('该邮箱已被注册') - # 生成密码哈希和盐值 - hashed_password, salt = hash_password(password) + # 哈希密码 + hashed_password = hash_password(password) # 插入新用户 insert_sql = ''' - INSERT INTO user(username, password, salt, createTime) - VALUES(%s, %s, %s, %s) + INSERT INTO user(username, password, email, status, createTime, last_password_change) + VALUES(%s, %s, %s, %s, %s, %s) ''' - current_time = datetime.now().strftime('%Y-%m-%d') - query(insert_sql, [username, hashed_password, salt, current_time]) + current_time = datetime.now() + query(insert_sql, [ + username, + hashed_password, + email, + 'active', + current_time, + current_time + ]) + + # 记录注册信息 + client_info = get_client_info() + register_history_sql = ''' + INSERT INTO register_history + (username, register_time, ip_address, user_agent, email) + VALUES (%s, %s, %s, %s, %s) + ''' + query(register_history_sql, [ + username, + current_time, + client_info['ip'], + client_info['user_agent'], + email + ]) logging.info(f"新用户注册成功: {username}") - return redirect('/user/login') + response = make_response(redirect('/user/login')) + return set_secure_headers(response) except Exception as e: logging.error(f"注册过程发生错误: {e}") @@ -156,9 +343,30 @@ def logout(): """用户登出""" try: username = session.get('username') + client_info = session.get('client_info', {}) + + # 记录登出历史 + logout_history_sql = ''' + INSERT INTO logout_history + (username, logout_time, ip_address, user_agent, session_id) + VALUES (%s, %s, %s, %s, %s) + ''' + query(logout_history_sql, [ + username, + datetime.now(), + client_info.get('ip'), + client_info.get('user_agent'), + session.get('session_id') + ]) + + # 删除Redis中的会话 + redis_client.delete(f"session:{username}") + session.clear() logging.info(f"用户 {username} 成功登出") - return redirect('/user/login') + response = make_response(redirect('/user/login')) + return set_secure_headers(response) except Exception as e: logging.error(f"登出过程发生错误: {e}") - return redirect('/user/login') + response = make_response(redirect('/user/login')) + return set_secure_headers(response)