初始版本

This commit is contained in:
z66
2025-12-26 13:42:22 +08:00
parent ddb90d6c20
commit b495bc1dca
43 changed files with 2179 additions and 20 deletions
+98
View File
@@ -0,0 +1,98 @@
# 数据库模块说明
本目录包含数据库相关的所有代码。
## 文件说明
- `app/models/` - 数据库模型定义(9个表)
- `app/db/session.py` - 数据库连接和配置
- `app/db/init_db.py` - 数据库初始化脚本
## 安装依赖
```bash
pip install -r ../requirements.txt
```
## 初始化数据库
### 首次创建数据库
运行以下命令创建数据库表:
```bash
cd backend
python -m app.db.init_db
```
或者从项目根目录运行:
```bash
python -m backend.app.db.init_db
```
执行后会在项目根目录生成 `blogweb.db` SQLite 数据库文件。
### 模型更新后更新数据库
⚠️ **重要提示**`create_all()` 只会创建**不存在的表**,**不会修改已存在表的结构**。
如果 models 有更新(添加字段、修改字段类型等),有两种方式:
#### 方式1:重置数据库(开发环境推荐)
⚠️ **会删除所有数据**,适合开发环境:
```bash
python -m app.db.init_db --reset
```
#### 方式2:使用数据库迁移工具(生产环境推荐)
对于生产环境,建议使用 **Alembic** 进行数据库迁移:
```bash
# 安装 Alembic
pip install alembic
# 初始化迁移环境
alembic init alembic
# 生成迁移脚本
alembic revision --autogenerate -m "描述变更"
# 执行迁移
alembic upgrade head
```
## 数据库表结构
根据 `数据库设计说明.md` 创建了以下9个表:
1. **users** - 用户账户信息
2. **todos** - 待办事项列表
3. **posts** - 博客文章
4. **transactions** - 个人记账记录
5. **media** - 书影音收藏条目
6. **tags** - 媒体标签
7. **media_tags** - 媒体与标签的多对多关联表
8. **chat_messages** - 聊天室消息记录
9. **uploads** - 用户上传的文件元数据
## 使用示例
在 FastAPI 应用中使用数据库:
```python
from app.db.session import get_db
from app.models import User, Todo
from sqlmodel import Session, select
# 在路由中使用
@app.get("/users")
def get_users(session: Session = Depends(get_db)):
statement = select(User)
users = session.exec(statement).all()
return users
```
+30
View File
@@ -0,0 +1,30 @@
"""
Backend 模块
"""
from .app.db.session import engine, get_db
from .app.db.init_db import init_db, reset_db
from .app.models import (
User, Todo, Post, Transaction, Media, Tag, MediaTag,
ChatMessage, Upload
)
# 为了向后兼容,保留 get_session 别名
get_session = get_db
__all__ = [
"engine",
"init_db",
"reset_db",
"get_db",
"get_session",
"User",
"Todo",
"Post",
"Transaction",
"Media",
"Tag",
"MediaTag",
"ChatMessage",
"Upload",
]
+4
View File
@@ -0,0 +1,4 @@
"""
FastAPI 应用包
"""
+4
View File
@@ -0,0 +1,4 @@
"""
API 路由模块
"""
+4
View File
@@ -0,0 +1,4 @@
"""
API v1 路由
"""
+14
View File
@@ -0,0 +1,14 @@
"""
API v1 路由聚合
"""
from fastapi import APIRouter
from app.api.api_v1.endpoints import auth, todos, posts, users
api_router = APIRouter()
# 注册各个功能模块的路由
api_router.include_router(auth.router, prefix="/auth", tags=["认证"])
api_router.include_router(users.router, prefix="/users", tags=["用户"])
api_router.include_router(todos.router, prefix="/todos", tags=["待办事项"])
api_router.include_router(posts.router, prefix="/posts", tags=["博客"])
@@ -0,0 +1,4 @@
"""
API 端点模块
"""
+142
View File
@@ -0,0 +1,142 @@
"""
博客文章相关 API 端点
"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session, select
from typing import List
from app.db.session import get_db
from app.models.post import Post
from app.models.user import User
from app.schemas.post import Post as PostSchema, PostCreate, PostUpdate
from app.api.api_v1.endpoints.users import get_current_user
router = APIRouter()
@router.post("/", response_model=PostSchema, status_code=status.HTTP_201_CREATED)
def create_post(
post_in: PostCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""创建博客文章"""
# 检查 slug 是否已存在(同一用户)
statement = select(Post).where(
Post.slug == post_in.slug,
Post.user_id == current_user.id
)
existing_post = db.exec(statement).first()
if existing_post:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="该 slug 已存在"
)
db_post = Post(
title=post_in.title,
slug=post_in.slug,
content=post_in.content,
user_id=current_user.id
)
db.add(db_post)
db.commit()
db.refresh(db_post)
return db_post
@router.get("/", response_model=List[PostSchema])
def read_posts(
skip: int = 0,
limit: int = 100,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取当前用户的博客文章列表"""
statement = (
select(Post)
.where(Post.user_id == current_user.id)
.offset(skip)
.limit(limit)
)
posts = db.exec(statement).all()
return posts
@router.get("/{post_id}", response_model=PostSchema)
def read_post(
post_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取单个博客文章"""
statement = select(Post).where(
Post.id == post_id,
Post.user_id == current_user.id
)
post = db.exec(statement).first()
if post is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文章不存在"
)
return post
@router.put("/{post_id}", response_model=PostSchema)
def update_post(
post_id: int,
post_in: PostUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""更新博客文章"""
statement = select(Post).where(
Post.id == post_id,
Post.user_id == current_user.id
)
post = db.exec(statement).first()
if post is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文章不存在"
)
if post_in.title is not None:
post.title = post_in.title
if post_in.slug is not None:
post.slug = post_in.slug
if post_in.content is not None:
post.content = post_in.content
from datetime import datetime
post.updated_at = datetime.now()
db.add(post)
db.commit()
db.refresh(post)
return post
@router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_post(
post_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""删除博客文章"""
statement = select(Post).where(
Post.id == post_id,
Post.user_id == current_user.id
)
post = db.exec(statement).first()
if post is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文章不存在"
)
db.delete(post)
db.commit()
return None
+123
View File
@@ -0,0 +1,123 @@
"""
待办事项相关 API 端点
"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session, select
from typing import List
from app.db.session import get_db
from app.models.todo import Todo
from app.models.user import User
from app.schemas.todo import Todo as TodoSchema, TodoCreate, TodoUpdate
from app.api.api_v1.endpoints.users import get_current_user
router = APIRouter()
@router.post("/", response_model=TodoSchema, status_code=status.HTTP_201_CREATED)
def create_todo(
todo_in: TodoCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""创建待办事项"""
db_todo = Todo(
title=todo_in.title,
user_id=current_user.id
)
db.add(db_todo)
db.commit()
db.refresh(db_todo)
return db_todo
@router.get("/", response_model=List[TodoSchema])
def read_todos(
skip: int = 0,
limit: int = 100,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取当前用户的待办事项列表"""
statement = (
select(Todo)
.where(Todo.user_id == current_user.id)
.offset(skip)
.limit(limit)
)
todos = db.exec(statement).all()
return todos
@router.get("/{todo_id}", response_model=TodoSchema)
def read_todo(
todo_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取单个待办事项"""
statement = select(Todo).where(
Todo.id == todo_id,
Todo.user_id == current_user.id
)
todo = db.exec(statement).first()
if todo is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="待办事项不存在"
)
return todo
@router.put("/{todo_id}", response_model=TodoSchema)
def update_todo(
todo_id: int,
todo_in: TodoUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""更新待办事项"""
statement = select(Todo).where(
Todo.id == todo_id,
Todo.user_id == current_user.id
)
todo = db.exec(statement).first()
if todo is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="待办事项不存在"
)
if todo_in.title is not None:
todo.title = todo_in.title
if todo_in.done is not None:
todo.done = todo_in.done
db.add(todo)
db.commit()
db.refresh(todo)
return todo
@router.delete("/{todo_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_todo(
todo_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""删除待办事项"""
statement = select(Todo).where(
Todo.id == todo_id,
Todo.user_id == current_user.id
)
todo = db.exec(statement).first()
if todo is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="待办事项不存在"
)
db.delete(todo)
db.commit()
return None
+63
View File
@@ -0,0 +1,63 @@
"""
用户相关 API 端点
"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session, select
from typing import List
from app.db.session import get_db
from app.models.user import User
from app.schemas.user import User as UserSchema
from app.core.security import decode_access_token
from fastapi.security import OAuth2PasswordBearer
router = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
"""获取当前登录用户"""
payload = decode_access_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
statement = select(User).where(User.username == username)
user = db.exec(statement).first()
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户不存在",
headers={"WWW-Authenticate": "Bearer"},
)
return user
@router.get("/me", response_model=UserSchema)
async def read_users_me(current_user: User = Depends(get_current_user)):
"""获取当前用户信息"""
return current_user
@router.get("/", response_model=List[UserSchema])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""获取用户列表(示例端点)"""
statement = select(User).offset(skip).limit(limit)
users = db.exec(statement).all()
return users
+13
View File
@@ -0,0 +1,13 @@
"""
核心配置模块
"""
from .config import settings
from .security import get_password_hash, verify_password, create_access_token
__all__ = [
"settings",
"get_password_hash",
"verify_password",
"create_access_token",
]
+44
View File
@@ -0,0 +1,44 @@
"""
安全相关功能:密码哈希、JWT 令牌等
"""
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from .config import settings
# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(password: str) -> str:
"""生成密码哈希"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""创建 JWT 访问令牌"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def decode_access_token(token: str) -> Optional[dict]:
"""解码 JWT 令牌"""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
return None
+16
View File
@@ -0,0 +1,16 @@
"""
数据库模块
"""
from .base import Base
from .session import engine, SessionLocal, get_db
from .init_db import init_db, reset_db
__all__ = [
"Base",
"engine",
"SessionLocal",
"get_db",
"init_db",
"reset_db",
]
+71
View File
@@ -0,0 +1,71 @@
"""
数据库初始化脚本
运行此脚本创建数据库表结构
用法:
python -m app.db.init_db # 创建表(如果表已存在则跳过)
python -m app.db.init_db --reset # 删除所有表后重新创建(⚠️ 会丢失数据)
"""
import sys
import argparse
from sqlmodel import SQLModel
from app.db.session import engine
from app.db.base import Base # 这会导入所有模型
# 导入所有模型以确保表被注册
from app.models import (
User, Todo, Post, Transaction, Media, Tag, MediaTag,
ChatMessage, Upload
)
def init_db() -> None:
"""初始化数据库,创建所有表"""
SQLModel.metadata.create_all(engine)
print("✅ 数据库表创建完成")
def reset_db() -> None:
"""重置数据库:删除所有表后重新创建(⚠️ 会丢失所有数据)"""
print("⚠️ 警告:将删除所有表和数据!")
SQLModel.metadata.drop_all(engine)
print("✅ 已删除所有表")
SQLModel.metadata.create_all(engine)
print("✅ 已重新创建所有表")
def main() -> None:
"""初始化数据库"""
parser = argparse.ArgumentParser(description="初始化数据库")
parser.add_argument(
"--reset",
action="store_true",
help="删除所有表后重新创建(⚠️ 会丢失所有数据)"
)
args = parser.parse_args()
if args.reset:
print("🔄 重置模式:将删除所有表后重新创建...")
reset_db()
else:
print("📦 创建模式:创建不存在的表...")
init_db()
print(f"\n✅ 数据库表操作完成!")
print(f"数据库文件位置: {engine.url}")
# 显示创建的表
print("\n数据库中的表:")
tables = [
"users", "todos", "posts", "transactions",
"media", "tags", "media_tags",
"chat_messages", "uploads"
]
for table in tables:
print(f" - {table}")
if __name__ == "__main__":
main()
+26
View File
@@ -0,0 +1,26 @@
"""
数据库会话管理
"""
from sqlalchemy.engine import Engine
from sqlalchemy import create_engine
from sqlmodel import Session
from typing import Generator
from app.core.config import settings
# 创建数据库引擎
engine: Engine = create_engine(
settings.DATABASE_URL,
connect_args={"check_same_thread": False}, # SQLite 需要此参数
echo=True # 开发环境显示SQL语句,生产环境设为False
)
def get_db() -> Generator[Session, None, None]:
"""获取数据库会话(用于依赖注入)"""
with Session(engine) as session:
yield session
# 为了向后兼容,保留 SessionLocal 别名
SessionLocal = Session
+21
View File
@@ -0,0 +1,21 @@
# 初始数据模块
此目录用于存放数据库初始化时的示例数据脚本。
## 使用说明
在数据库初始化后,可以运行此模块中的脚本来插入示例数据,方便开发和测试。
## 示例
```python
from app.db.session import SessionLocal
from app.models.user import User
from app.core.security import get_password_hash
def init_data():
db = SessionLocal()
# 创建示例用户等
...
```
+5
View File
@@ -0,0 +1,5 @@
"""
初始数据模块
用于数据库初始化时插入示例数据
"""
+45
View File
@@ -0,0 +1,45 @@
"""
FastAPI 应用主入口
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.config import settings
from app.api.api_v1.api import api_router
# 创建 FastAPI 应用实例
app = FastAPI(
title=settings.PROJECT_NAME,
version=settings.VERSION,
openapi_url=f"{settings.API_V1_STR}/openapi.json",
)
# 配置 CORS
if settings.BACKEND_CORS_ORIGINS:
app.add_middleware(
CORSMiddleware,
allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册 API 路由
app.include_router(api_router, prefix=settings.API_V1_STR)
@app.get("/")
def root():
"""根路径"""
return {
"message": "欢迎使用个人博客网站 API",
"version": settings.VERSION,
"docs": "/docs",
"api": settings.API_V1_STR
}
@app.get("/health")
def health_check():
"""健康检查"""
return {"status": "ok"}
+23
View File
@@ -0,0 +1,23 @@
"""
数据库模型模块
"""
from .user import User
from .todo import Todo
from .post import Post
from .transaction import Transaction
from .media import Media, Tag, MediaTag
from .chat import ChatMessage
from .upload import Upload
__all__ = [
"User",
"Todo",
"Post",
"Transaction",
"Media",
"Tag",
"MediaTag",
"ChatMessage",
"Upload",
]
+40
View File
@@ -0,0 +1,40 @@
"""
书影音收藏模型
"""
from datetime import datetime
from typing import Optional, TYPE_CHECKING
from sqlmodel import SQLModel, Field, Relationship
from .user import User
if TYPE_CHECKING:
from .tag import Tag
else:
Tag = "Tag"
class MediaTag(SQLModel, table=True):
"""媒体-标签关联表"""
__tablename__ = "media_tags"
media_id: int = Field(foreign_key="media.id", primary_key=True)
tag_id: int = Field(foreign_key="tags.id", primary_key=True)
class Media(SQLModel, table=True):
"""书影音收藏表"""
__tablename__ = "media"
id: Optional[int] = Field(default=None, primary_key=True)
title: str = Field(max_length=200)
media_type: str = Field(max_length=20) # book / movie / music
rating: Optional[float] = Field(default=None, ge=0.0, le=5.0)
comment: Optional[str] = Field(default=None)
external_id: Optional[str] = Field(default=None, max_length=100) # ISBN、IMDb ID等
cover_url: Optional[str] = Field(default=None, max_length=300)
created_at: datetime = Field(default_factory=datetime.now)
user_id: int = Field(foreign_key="users.id", index=True)
# 关系
user: Optional[User] = Relationship()
tags: list["Tag"] = Relationship(back_populates="media", link_model="MediaTag")
+24
View File
@@ -0,0 +1,24 @@
"""
博客文章模型
"""
from datetime import datetime
from typing import Optional
from sqlmodel import SQLModel, Field, Relationship
from .user import User
class Post(SQLModel, table=True):
"""博客文章表"""
__tablename__ = "posts"
id: Optional[int] = Field(default=None, primary_key=True)
title: str = Field(max_length=200)
slug: str = Field(max_length=200, index=True)
content: str # 支持 Markdown
created_at: datetime = Field(default_factory=datetime.now)
updated_at: Optional[datetime] = Field(default=None)
user_id: int = Field(foreign_key="users.id", index=True)
# 关系
user: Optional[User] = Relationship()
+23
View File
@@ -0,0 +1,23 @@
"""
媒体标签模型
"""
from typing import Optional, TYPE_CHECKING
from sqlmodel import SQLModel, Field, Relationship
if TYPE_CHECKING:
from .media import Media, MediaTag
else:
Media = "Media"
MediaTag = "MediaTag"
class Tag(SQLModel, table=True):
"""媒体标签表"""
__tablename__ = "tags"
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(max_length=50, unique=True, index=True)
# 关系
media: list["Media"] = Relationship(back_populates="tags", link_model="MediaTag")
+22
View File
@@ -0,0 +1,22 @@
"""
待办事项模型
"""
from datetime import datetime
from typing import Optional
from sqlmodel import SQLModel, Field, Relationship
from .user import User
class Todo(SQLModel, table=True):
"""待办事项表"""
__tablename__ = "todos"
id: Optional[int] = Field(default=None, primary_key=True)
title: str = Field(max_length=200)
done: bool = Field(default=False)
created_at: datetime = Field(default_factory=datetime.now)
user_id: int = Field(foreign_key="users.id", index=True)
# 关系(可选,用于ORM查询)
user: Optional[User] = Relationship()
+25
View File
@@ -0,0 +1,25 @@
"""
记账记录模型
"""
from datetime import date
from typing import Optional
from decimal import Decimal
from sqlmodel import SQLModel, Field, Relationship, Column
from sqlalchemy import Numeric
from .user import User
class Transaction(SQLModel, table=True):
"""记账记录表"""
__tablename__ = "transactions"
id: Optional[int] = Field(default=None, primary_key=True)
amount: Decimal = Field(sa_column=Column(Numeric(10, 2))) # 正数为收入,负数为支出
category: str = Field(max_length=50)
description: Optional[str] = Field(default=None, max_length=200)
date: date
user_id: int = Field(foreign_key="users.id", index=True)
# 关系
user: Optional[User] = Relationship()
+25
View File
@@ -0,0 +1,25 @@
"""
文件上传模型
"""
from datetime import datetime
from typing import Optional
from sqlmodel import SQLModel, Field, Relationship
from .user import User
class Upload(SQLModel, table=True):
"""文件上传记录表"""
__tablename__ = "uploads"
id: Optional[int] = Field(default=None, primary_key=True)
filename: str = Field(max_length=200)
stored_path: str = Field(max_length=300)
file_size: int # 字节数
mime_type: str = Field(max_length=100)
uploaded_at: datetime = Field(default_factory=datetime.now)
expires_at: Optional[datetime] = Field(default=None)
user_id: int = Field(foreign_key="users.id", index=True)
# 关系
user: Optional[User] = Relationship()
+18
View File
@@ -0,0 +1,18 @@
"""
用户模型
"""
from datetime import datetime
from typing import Optional
from sqlmodel import SQLModel, Field
class User(SQLModel, table=True):
"""用户表"""
__tablename__ = "users"
id: Optional[int] = Field(default=None, primary_key=True)
username: str = Field(max_length=50, unique=True, index=True)
email: Optional[str] = Field(default=None, max_length=100, unique=True, index=True)
hashed_password: str
created_at: datetime = Field(default_factory=datetime.now)
+5
View File
@@ -0,0 +1,5 @@
"""
Pydantic 模式定义
用于 API 请求和响应的数据验证
"""
+42
View File
@@ -0,0 +1,42 @@
"""
博客文章相关的 Pydantic 模式
"""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
class PostBase(BaseModel):
"""博客文章基础模式"""
title: str
slug: str
content: str
class PostCreate(PostBase):
"""创建博客文章请求模式"""
pass
class PostUpdate(BaseModel):
"""更新博客文章请求模式"""
title: Optional[str] = None
slug: Optional[str] = None
content: Optional[str] = None
class PostInDB(PostBase):
"""数据库中的博客文章模式"""
id: int
created_at: datetime
updated_at: Optional[datetime] = None
user_id: int
class Config:
from_attributes = True
class Post(PostInDB):
"""博客文章响应模式"""
pass
+39
View File
@@ -0,0 +1,39 @@
"""
待办事项相关的 Pydantic 模式
"""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
class TodoBase(BaseModel):
"""待办事项基础模式"""
title: str
class TodoCreate(TodoBase):
"""创建待办事项请求模式"""
pass
class TodoUpdate(BaseModel):
"""更新待办事项请求模式"""
title: Optional[str] = None
done: Optional[bool] = None
class TodoInDB(TodoBase):
"""数据库中的待办事项模式"""
id: int
done: bool
created_at: datetime
user_id: int
class Config:
from_attributes = True
class Todo(TodoInDB):
"""待办事项响应模式"""
pass
+50
View File
@@ -0,0 +1,50 @@
"""
用户相关的 Pydantic 模式
"""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, EmailStr
class UserBase(BaseModel):
"""用户基础模式"""
username: str
email: Optional[EmailStr] = None
class UserCreate(UserBase):
"""创建用户请求模式"""
password: str
class UserUpdate(BaseModel):
"""更新用户请求模式"""
username: Optional[str] = None
email: Optional[EmailStr] = None
password: Optional[str] = None
class UserInDB(UserBase):
"""数据库中的用户模式"""
id: int
created_at: datetime
class Config:
from_attributes = True
class User(UserInDB):
"""用户响应模式(不包含敏感信息)"""
pass
class Token(BaseModel):
"""JWT 令牌响应模式"""
access_token: str
token_type: str = "bearer"
class TokenData(BaseModel):
"""令牌数据模式"""
username: Optional[str] = None
+144
View File
@@ -0,0 +1,144 @@
"""
数据库模型定义
使用 SQLModel 定义所有表结构
"""
from datetime import datetime, date
from typing import Optional
from sqlmodel import SQLModel, Field, Relationship, Column
from sqlalchemy import Numeric
from decimal import Decimal
# ==================== 用户表 ====================
class User(SQLModel, table=True):
"""用户表"""
__tablename__ = "users"
id: Optional[int] = Field(default=None, primary_key=True)
username: str = Field(max_length=50, unique=True, index=True)
email: Optional[str] = Field(default=None, max_length=100, unique=True, index=True)
hashed_password: str
created_at: datetime = Field(default_factory=datetime.now)
# ==================== 待办事项表 ====================
class Todo(SQLModel, table=True):
"""待办事项表"""
__tablename__ = "todos"
id: Optional[int] = Field(default=None, primary_key=True)
title: str = Field(max_length=200)
done: bool = Field(default=False)
created_at: datetime = Field(default_factory=datetime.now)
user_id: int = Field(foreign_key="users.id", index=True)
# 关系(可选,用于ORM查询)
user: Optional[User] = Relationship()
# ==================== 博客文章表 ====================
class Post(SQLModel, table=True):
"""博客文章表"""
__tablename__ = "posts"
id: Optional[int] = Field(default=None, primary_key=True)
title: str = Field(max_length=200)
slug: str = Field(max_length=200, index=True)
content: str # 支持 Markdown
created_at: datetime = Field(default_factory=datetime.now)
updated_at: Optional[datetime] = Field(default=None)
user_id: int = Field(foreign_key="users.id", index=True)
# 关系
user: Optional[User] = Relationship()
# ==================== 记账记录表 ====================
class Transaction(SQLModel, table=True):
"""记账记录表"""
__tablename__ = "transactions"
id: Optional[int] = Field(default=None, primary_key=True)
amount: Decimal = Field(sa_column=Column(Numeric(10, 2))) # 正数为收入,负数为支出
category: str = Field(max_length=50)
description: Optional[str] = Field(default=None, max_length=200)
date: date
user_id: int = Field(foreign_key="users.id", index=True)
# 关系
user: Optional[User] = Relationship()
# ==================== 媒体-标签关联表(多对多)====================
class MediaTag(SQLModel, table=True):
"""媒体-标签关联表"""
__tablename__ = "media_tags"
media_id: int = Field(foreign_key="media.id", primary_key=True)
tag_id: int = Field(foreign_key="tags.id", primary_key=True)
# ==================== 书影音收藏表 ====================
class Media(SQLModel, table=True):
"""书影音收藏表"""
__tablename__ = "media"
id: Optional[int] = Field(default=None, primary_key=True)
title: str = Field(max_length=200)
media_type: str = Field(max_length=20) # book / movie / music
rating: Optional[float] = Field(default=None, ge=0.0, le=5.0)
comment: Optional[str] = Field(default=None)
external_id: Optional[str] = Field(default=None, max_length=100) # ISBN、IMDb ID等
cover_url: Optional[str] = Field(default=None, max_length=300)
created_at: datetime = Field(default_factory=datetime.now)
user_id: int = Field(foreign_key="users.id", index=True)
# 关系
user: Optional[User] = Relationship()
tags: list["Tag"] = Relationship(back_populates="media", link_model=MediaTag)
# ==================== 媒体标签表 ====================
class Tag(SQLModel, table=True):
"""媒体标签表"""
__tablename__ = "tags"
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(max_length=50, unique=True, index=True)
# 关系
media: list[Media] = Relationship(back_populates="tags", link_model=MediaTag)
# ==================== 聊天消息表 ====================
class ChatMessage(SQLModel, table=True):
"""聊天消息表"""
__tablename__ = "chat_messages"
id: Optional[int] = Field(default=None, primary_key=True)
content: str
sent_at: datetime = Field(default_factory=datetime.now)
user_id: int = Field(foreign_key="users.id", index=True)
room: str = Field(max_length=50, default="main", index=True)
# 关系
user: Optional[User] = Relationship()
# ==================== 文件上传记录表 ====================
class Upload(SQLModel, table=True):
"""文件上传记录表"""
__tablename__ = "uploads"
id: Optional[int] = Field(default=None, primary_key=True)
filename: str = Field(max_length=200)
stored_path: str = Field(max_length=300)
file_size: int # 字节数
mime_type: str = Field(max_length=100)
uploaded_at: datetime = Field(default_factory=datetime.now)
expires_at: Optional[datetime] = Field(default=None)
user_id: int = Field(foreign_key="users.id", index=True)
# 关系
user: Optional[User] = Relationship()