71 lines
2.0 KiB
Python
71 lines
2.0 KiB
Python
"""
|
|
通用数据库工具(异步)
|
|
|
|
此模块提供基于 SQLAlchemy 2.x 异步引擎的数据库访问封装,支持 MySQL 与 PostgreSQL。
|
|
数据模型定义位置:
|
|
- 无(本模块仅提供连接与查询工具,不定义数据模型)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
from typing import Any, Dict, Iterable, List, Optional, Union
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
|
|
from sqlalchemy import text
|
|
from InsightEngine.utils.config import settings
|
|
|
|
__all__ = [
|
|
"get_async_engine",
|
|
"fetch_all",
|
|
]
|
|
|
|
|
|
_engine: Optional[AsyncEngine] = None
|
|
|
|
|
|
def _build_database_url() -> str:
|
|
dialect: str = (settings.DB_DIALECT or "mysql").lower()
|
|
host: str = settings.DB_HOST or ""
|
|
port: str = str(settings.DB_PORT or "")
|
|
user: str = settings.DB_USER or ""
|
|
password: str = settings.DB_PASSWORD or ""
|
|
db_name: str = settings.DB_NAME or ""
|
|
|
|
if os.getenv("DATABASE_URL"):
|
|
return os.getenv("DATABASE_URL") # 直接使用外部提供的完整URL
|
|
|
|
if dialect in ("postgresql", "postgres"):
|
|
# PostgreSQL 使用 asyncpg 驱动
|
|
return f"postgresql+asyncpg://{user}:{password}@{host}:{port}/{db_name}"
|
|
|
|
# 默认 MySQL 使用 aiomysql 驱动
|
|
return f"mysql+aiomysql://{user}:{password}@{host}:{port}/{db_name}"
|
|
|
|
|
|
def get_async_engine() -> AsyncEngine:
|
|
global _engine
|
|
if _engine is None:
|
|
database_url: str = _build_database_url()
|
|
_engine = create_async_engine(
|
|
database_url,
|
|
pool_pre_ping=True,
|
|
pool_recycle=1800,
|
|
)
|
|
return _engine
|
|
|
|
|
|
async def fetch_all(query: str, params: Optional[Union[Iterable[Any], Dict[str, Any]]] = None) -> List[Dict[str, Any]]:
|
|
"""
|
|
执行只读查询并返回字典列表。
|
|
"""
|
|
engine: AsyncEngine = get_async_engine()
|
|
async with engine.connect() as conn:
|
|
result = await conn.execute(text(query), params or {})
|
|
rows = result.mappings().all()
|
|
# 将 RowMapping 转换为普通字典
|
|
return [dict(row) for row in rows]
|
|
|
|
|