Complete Guide to FastAPI and SQLAlchemy 2.0
📂 Stage: Stage 3 - Data Persistence (Database)
🔗 Related chapters: async-await-principles · redis-integration · database-migration-alembic
This tutorial will take you step by step to build an asynchronous database access layer based on FastAPI + SQLAlchemy 2.0. We will go from environment-setup and configuration management to actual CRUD and transaction processing, while avoiding common asynchronous pitfalls. Even if you are coming into contact with ORM for the first time, you can follow the code step by step.
1. Why choose SQLAlchemy 2.0?
SQLAlchemy 2.0's support for asynchronous is no longer "patched" but natively integrated, making it even more powerful in asynchronous web frameworks.
Install dependencies
# 异步核心 + 数据库驱动(这里以 PostgreSQL 为例)
pip install fastapi sqlalchemy[asyncio] asyncpg aiosqlite pydantic-settings passlib[bcrypt]
💡 If you use MySQL, you canasyncpgReplace withaiomysql, and change the connection string tomysql+aiomysql://。
2. Infrastructure: from configuration to engine
A good project structure can make maintenance more efficient. Let's first manage the database configuration, engine, and session factory in a unified manner.
2.1 Configuration Management
create.envFiles store sensitive information for later usepydantic-settingsLoading makes it easy to switch between different environments.
# config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
database_url: str = "postgresql+asyncpg://user:pass@localhost/fastapi_db"
debug: bool = False # 开发时可打开,打印 SQL
pool_size: int = 20 # 连接池常驻连接数
max_overflow: int = 10 # 超出 pool_size 时最多再创建的连接数
pool_pre_ping: bool = True # 每次从池中取连接时先测试连通性
class Config:
env_file = ".env"
@lru_cache()
def get_settings() -> Settings:
return Settings()
2.2 Asynchronous engine and session
The following paragraph is the core of asynchronous database access: the engine is responsible for managing the connection pool, and the session factory generates independentAsyncSession。
# database.py
from sqlalchemy.ext.asyncio import (
AsyncSession, create_async_engine, async_sessionmaker
)
from sqlalchemy.orm import DeclarativeBase
from config import get_settings
settings = get_settings()
# 创建异步引擎
engine = create_async_engine(
settings.database_url,
echo=settings.debug, # True 时会在终端打印每一条 SQL
pool_size=settings.pool_size,
max_overflow=settings.max_overflow,
pool_pre_ping=settings.pool_pre_ping
)
# 异步会话工厂
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # 提交后对象不会过期,可以在后续继续使用
autocommit=False,
autoflush=False # 手动控制 flush,避免意外行为
)
# 所有模型继承的基类
class Base(DeclarativeBase):
pass
# FastAPI 依赖:为每个请求提供独立的数据库会话
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
yield session
2.3 Integrated into FastAPI life cycle
uselifespanCreate the table when starting and release the connection pool when shutting down. For production environments, it is recommended to use Alembic to manage table structure migration.
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from database import engine, Base
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动:创建所有表(生产环境请用 Alembic 脚本替代)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
# 关闭:释放连接池资源
await engine.dispose()
app = FastAPI(lifespan=lifespan)
3. Model Definition: Embracing the 2.0 Style
SQLAlchemy 2.0 is recommendedMapped[T]addmapped_columnThe writing method is both clear and convenient for type checking.
We take the three models of User – Post – Tag as an example to show the one-to-many and many-to-many relationship configuration.
# models.py
from sqlalchemy import (
String, Integer, Boolean, DateTime, Text, ForeignKey, Table, Column
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from database import Base
from datetime import datetime
from typing import List, TYPE_CHECKING
if TYPE_CHECKING:
from models import Post, Comment
# 多对多关联表(帖子 ↔ 标签)
post_tags = Table(
"post_tags",
Base.metadata,
Column("post_id", Integer, ForeignKey("posts.id", ondelete="CASCADE"), primary_key=True),
Column("tag_id", Integer, ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True)
)
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(50), nullable=False)
email: Mapped[str] = mapped_column(String(100), unique=True, index=True, nullable=False)
hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), onupdate=func.now())
# 一对多:一个用户拥有多篇文章
posts: Mapped[List["Post"]] = relationship(
"Post", back_populates="author", lazy="selectin",
cascade="all, delete-orphan"
)
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
title: Mapped[str] = mapped_column(String(200), nullable=False)
content: Mapped[str] = mapped_column(Text, nullable=False)
published: Mapped[bool] = mapped_column(Boolean, default=False)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
# 多对一:每篇文章属于一个作者
author: Mapped["User"] = relationship("User", back_populates="posts")
# 多对多:一篇文章可以有多个标签
tags: Mapped[List["Tag"]] = relationship("Tag", secondary=post_tags, lazy="selectin")
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(30), unique=True, nullable=False)
📘 lazy="selectin"Will be automatically used when the parent object is queriedINThe statement loads all related data at once, effectively avoiding the N+1 problem. But it should also be weighed according to the scenario. If there is no need to associate data in some situations, it can be changed tolazy="select"(Load on demand).
4. Repository mode: Make data access cleaner
Encapsulate SQL statements in Repository, the routing function only cares about business logic, and you can easily mock out database operations during testing.
# repositories/base.py
from typing import TypeVar, Generic, Optional, List, Dict
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from database import Base
T = TypeVar('T', bound=Base)
class BaseRepository(Generic[T]):
"""通用的 CRUD 仓库,类型绑定到具体的模型"""
def __init__(self, model: T, db: AsyncSession):
self.model = model
self.db = db
async def get_by_id(self, obj_id: int) -> Optional[T]:
result = await self.db.execute(
select(self.model).where(self.model.id == obj_id)
)
return result.scalar_one_or_none()
async def get_multi(self, skip: int = 0, limit: int = 20) -> List[T]:
result = await self.db.execute(
select(self.model).offset(skip).limit(limit)
)
return list(result.scalars().all())
async def create(self, obj_in: Dict) -> T:
obj = self.model(**obj_in)
self.db.add(obj)
await self.db.commit()
await self.db.refresh(obj) # 刷新以获取数据库生成的默认值(如 id、时间)
return obj
async def delete(self, obj: T) -> None:
await self.db.delete(obj)
await self.db.commit()
5. Writing API routes: from Schema to endpoints
First use Pydantic to define the request and response model, and then call Repository in the route.
5.1 Pydantic mode
# schemas.py
from pydantic import BaseModel, EmailStr
from datetime import datetime
class UserBase(BaseModel):
name: str
email: EmailStr
class UserCreate(UserBase):
password: str
class UserPublic(UserBase):
id: int
is_active: bool
created_at: datetime
class Config:
from_attributes = True # 允许从 ORM 对象直接转换
5.2 User routing (CRUD example)
# routers/users.py
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from passlib.context import CryptContext
from database import get_db
from repositories.base import BaseRepository
from models import User
from schemas import UserCreate, UserPublic
router = APIRouter(prefix="/users", tags=["用户管理"])
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_user_repo(db: AsyncSession = Depends(get_db)) -> BaseRepository[User]:
return BaseRepository(User, db)
@router.get("/", response_model=list[UserPublic])
async def list_users(
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
repo: BaseRepository[User] = Depends(get_user_repo)
):
return await repo.get_multi(skip, limit)
@router.post("/", response_model=UserPublic, status_code=status.HTTP_201_CREATED)
async def create_user(
data: UserCreate,
repo: BaseRepository[User] = Depends(get_user_repo)
):
# 实际项目中应在这里检查邮箱是否已存在
hashed = pwd_context.hash(data.password)
user_data = data.model_dump(exclude={"password"})
user_data["hashed_password"] = hashed
return await repo.create(user_data)
access/users?skip=0&limit=10You can get the paginated user list; to/usersSend a POST to create a user.
6. Pitfall guide: N+1 queries and transactions
In an asynchronous environment, lazy load will automatically issue additional queries, which can easily cause N+1 problems. Example:
❌ Wrong approach (will trigger N+1 query):
posts = await repo.get_multi()
for post in posts:
print(post.author.name) # 每循环一次都会查一次 author
✅ Correct approach: Use when queryingselectinloadPreload associated objects.
from sqlalchemy.orm import selectinload
result = await db.execute(
select(Post).options(selectinload(Post.author))
)
posts = list(result.scalars().all())
for post in posts:
print(post.author.name) # 所有 author 已在一条 SQL 中加载完毕
6.2 Correctly handle asynchronous transactions
In asynchronous SQLAlchemy, it is recommended to useasync with db.begin()To manage transaction boundaries, it will automatically commit or rollback.
async def transfer_money(db: AsyncSession, from_id: int, to_id: int, amount: float):
async with db.begin():
from_user = await db.get(User, from_id)
to_user = await db.get(User, to_id)
from_user.balance -= amount
to_user.balance += amount
# 离开 with 块时如果没有异常,自动提交;有异常则自动回滚
🧰 Why notcommit() / rollback()?
Manual calls are prone to omissions, especially in multi-layer nested logic.db.begin()The context manager will ensure that the transaction is processed correctly regardless of success or failure.
7. Quick review of core components
This solution can be directly used as the skeleton of a medium-sized project. On this basis, you can connect to Alembic to manage database migration, use Redis for caching, add distributed locks to handle concurrency, etc. Subsequent chapters will continue to delve into these practical skills, so please stay tuned.
🎉 Congratulations on completing the complete construction of FastAPI + SQLAlchemy 2.0 asynchronous ORM! Give it a try, and if you encounter any problems, please share them in the comment area.