from datetime import timedelta from typing import Annotated, Any from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import HTMLResponse from fastapi.security import OAuth2PasswordRequestForm from app import crud from app.api.deps import CurrentUser, SessionDep, get_current_active_superuser from app.core import security from app.core.config import settings from app.core.security import get_password_hash from app.models import Message, NewPassword, Token, UserPublic from app.utils import ( generate_password_reset_token, generate_reset_password_email, send_email, verify_password_reset_token, ) router = APIRouter(tags=["login"]) @router.post("/login/access-token") def login_access_token( session: SessionDep, form_data: Annotated[OAuth2PasswordRequestForm, Depends()] ) -> Token: """ OAuth2 兼容的令牌登录,获取用于后续请求的访问令牌 """ user = crud.authenticate( session=session, email=form_data.username, password=form_data.password ) if not user: raise HTTPException(status_code=400, detail="邮箱或密码错误") elif not user.is_active: raise HTTPException(status_code=400, detail="用户未激活") access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) return Token( access_token=security.create_access_token( user.id, expires_delta=access_token_expires ) ) @router.post("/login/test-token", response_model=UserPublic) def test_token(current_user: CurrentUser) -> Any: """ 测试访问令牌 """ return current_user @router.post("/password-recovery/{email}") def recover_password(email: str, session: SessionDep) -> Message: """ 密码找回 """ user = crud.get_user_by_email(session=session, email=email) if not user: raise HTTPException( status_code=404, detail="系统中不存在该邮箱的用户。", ) password_reset_token = generate_password_reset_token(email=email) email_data = generate_reset_password_email( email_to=user.email, email=email, token=password_reset_token ) send_email( email_to=user.email, subject=email_data.subject, html_content=email_data.html_content, ) return Message(message="密码找回邮件已发送") @router.post("/reset-password/") def reset_password(session: SessionDep, body: NewPassword) -> Message: """ 重置密码 """ email = verify_password_reset_token(token=body.token) if not email: raise HTTPException(status_code=400, detail="无效的令牌") user = crud.get_user_by_email(session=session, email=email) if not user: raise HTTPException( status_code=404, detail="系统中不存在该邮箱的用户。", ) elif not user.is_active: raise HTTPException(status_code=400, detail="用户未激活") hashed_password = get_password_hash(password=body.new_password) user.hashed_password = hashed_password session.add(user) session.commit() return Message(message="密码更新成功") @router.post( "/password-recovery-html-content/{email}", dependencies=[Depends(get_current_active_superuser)], response_class=HTMLResponse, ) def recover_password_html_content(email: str, session: SessionDep) -> Any: """ 获取密码找回邮件的 HTML 内容 """ user = crud.get_user_by_email(session=session, email=email) if not user: raise HTTPException( status_code=404, detail="系统中不存在该用户名的用户。", ) password_reset_token = generate_password_reset_token(email=email) email_data = generate_reset_password_email( email_to=user.email, email=email, token=password_reset_token ) return HTMLResponse( content=email_data.html_content, headers={"subject:": email_data.subject} )