Complete Guide to FastAPI and SQLAlchemy 2.0

📂 Stage: Stage 3 - Data Persistence (Database) 🔗 Related chapters: async-await-principles · redis-integration · database-migration-alembic

This tutorial will take you step by step to build an asynchronous database access layer based on FastAPI + SQLAlchemy 2.0. We will go from environment-setup and configuration management to actual CRUD and transaction processing, while avoiding common asynchronous pitfalls. Even if you are coming into contact with ORM for the first time, you can follow the code step by step.

1. Why choose SQLAlchemy 2.0?

SQLAlchemy 2.0's support for asynchronous is no longer "patched" but natively integrated, making it even more powerful in asynchronous web frameworks.

FeaturesDescription
Native asyncAsyncSession + asyncpg/aiosqlite, no need to use thread pool to wrap synchronous connection
Unified QueryShared by Core and ORMselect()API, significantly lower learning curve
Strong type annotationMapped[int]Ability such as this allows you to perceive field types in the IDE and reduce low-level errors
Enterprise-level foundationComplete connection pool, transaction, concurrency control, can be used directly in production environment

Install dependencies

# 异步核心 + 数据库驱动(这里以 PostgreSQL 为例)
pip install fastapi sqlalchemy[asyncio] asyncpg aiosqlite pydantic-settings passlib[bcrypt]

💡 If you use MySQL, you canasyncpgReplace withaiomysql, and change the connection string tomysql+aiomysql://


2. Infrastructure: from configuration to engine

A good project structure can make maintenance more efficient. Let's first manage the database configuration, engine, and session factory in a unified manner.

2.1 Configuration Management

create.envFiles store sensitive information for later usepydantic-settingsLoading makes it easy to switch between different environments.

# config.py
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    database_url: str = "postgresql+asyncpg://user:pass@localhost/fastapi_db"
    debug: bool = False          # 开发时可打开,打印 SQL
    pool_size: int = 20          # 连接池常驻连接数
    max_overflow: int = 10       # 超出 pool_size 时最多再创建的连接数
    pool_pre_ping: bool = True   # 每次从池中取连接时先测试连通性

    class Config:
        env_file = ".env"

@lru_cache()
def get_settings() -> Settings:
    return Settings()

2.2 Asynchronous engine and session

The following paragraph is the core of asynchronous database access: the engine is responsible for managing the connection pool, and the session factory generates independentAsyncSession

# database.py
from sqlalchemy.ext.asyncio import (
    AsyncSession, create_async_engine, async_sessionmaker
)
from sqlalchemy.orm import DeclarativeBase
from config import get_settings

settings = get_settings()

# 创建异步引擎
engine = create_async_engine(
    settings.database_url,
    echo=settings.debug,           # True 时会在终端打印每一条 SQL
    pool_size=settings.pool_size,
    max_overflow=settings.max_overflow,
    pool_pre_ping=settings.pool_pre_ping
)

# 异步会话工厂
AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,        # 提交后对象不会过期,可以在后续继续使用
    autocommit=False,
    autoflush=False                # 手动控制 flush,避免意外行为
)

# 所有模型继承的基类
class Base(DeclarativeBase):
    pass

# FastAPI 依赖:为每个请求提供独立的数据库会话
async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        yield session

2.3 Integrated into FastAPI life cycle

uselifespanCreate the table when starting and release the connection pool when shutting down. For production environments, it is recommended to use Alembic to manage table structure migration.

# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from database import engine, Base

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动:创建所有表(生产环境请用 Alembic 脚本替代)
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    # 关闭:释放连接池资源
    await engine.dispose()

app = FastAPI(lifespan=lifespan)

3. Model Definition: Embracing the 2.0 Style

SQLAlchemy 2.0 is recommendedMapped[T]addmapped_columnThe writing method is both clear and convenient for type checking.

We take the three models of User – Post – Tag as an example to show the one-to-many and many-to-many relationship configuration.

# models.py
from sqlalchemy import (
    String, Integer, Boolean, DateTime, Text, ForeignKey, Table, Column
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from database import Base
from datetime import datetime
from typing import List, TYPE_CHECKING

if TYPE_CHECKING:
    from models import Post, Comment

# 多对多关联表(帖子 ↔ 标签)
post_tags = Table(
    "post_tags",
    Base.metadata,
    Column("post_id", Integer, ForeignKey("posts.id", ondelete="CASCADE"), primary_key=True),
    Column("tag_id", Integer, ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True)
)

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    name: Mapped[str] = mapped_column(String(50), nullable=False)
    email: Mapped[str] = mapped_column(String(100), unique=True, index=True, nullable=False)
    hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
    updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), onupdate=func.now())

    # 一对多:一个用户拥有多篇文章
    posts: Mapped[List["Post"]] = relationship(
        "Post", back_populates="author", lazy="selectin",
        cascade="all, delete-orphan"
    )

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    title: Mapped[str] = mapped_column(String(200), nullable=False)
    content: Mapped[str] = mapped_column(Text, nullable=False)
    published: Mapped[bool] = mapped_column(Boolean, default=False)
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())

    # 多对一:每篇文章属于一个作者
    author: Mapped["User"] = relationship("User", back_populates="posts")
    # 多对多:一篇文章可以有多个标签
    tags: Mapped[List["Tag"]] = relationship("Tag", secondary=post_tags, lazy="selectin")

class Tag(Base):
    __tablename__ = "tags"

    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    name: Mapped[str] = mapped_column(String(30), unique=True, nullable=False)

📘 lazy="selectin"Will be automatically used when the parent object is queriedINThe statement loads all related data at once, effectively avoiding the N+1 problem. But it should also be weighed according to the scenario. If there is no need to associate data in some situations, it can be changed tolazy="select"(Load on demand).


4. Repository mode: Make data access cleaner

Encapsulate SQL statements in Repository, the routing function only cares about business logic, and you can easily mock out database operations during testing.

# repositories/base.py
from typing import TypeVar, Generic, Optional, List, Dict
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from database import Base

T = TypeVar('T', bound=Base)

class BaseRepository(Generic[T]):
    """通用的 CRUD 仓库,类型绑定到具体的模型"""
    def __init__(self, model: T, db: AsyncSession):
        self.model = model
        self.db = db

    async def get_by_id(self, obj_id: int) -> Optional[T]:
        result = await self.db.execute(
            select(self.model).where(self.model.id == obj_id)
        )
        return result.scalar_one_or_none()

    async def get_multi(self, skip: int = 0, limit: int = 20) -> List[T]:
        result = await self.db.execute(
            select(self.model).offset(skip).limit(limit)
        )
        return list(result.scalars().all())

    async def create(self, obj_in: Dict) -> T:
        obj = self.model(**obj_in)
        self.db.add(obj)
        await self.db.commit()
        await self.db.refresh(obj)   # 刷新以获取数据库生成的默认值(如 id、时间)
        return obj

    async def delete(self, obj: T) -> None:
        await self.db.delete(obj)
        await self.db.commit()

5. Writing API routes: from Schema to endpoints

First use Pydantic to define the request and response model, and then call Repository in the route.

5.1 Pydantic mode

# schemas.py
from pydantic import BaseModel, EmailStr
from datetime import datetime

class UserBase(BaseModel):
    name: str
    email: EmailStr

class UserCreate(UserBase):
    password: str

class UserPublic(UserBase):
    id: int
    is_active: bool
    created_at: datetime

    class Config:
        from_attributes = True   # 允许从 ORM 对象直接转换

5.2 User routing (CRUD example)

# routers/users.py
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from passlib.context import CryptContext
from database import get_db
from repositories.base import BaseRepository
from models import User
from schemas import UserCreate, UserPublic

router = APIRouter(prefix="/users", tags=["用户管理"])
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def get_user_repo(db: AsyncSession = Depends(get_db)) -> BaseRepository[User]:
    return BaseRepository(User, db)

@router.get("/", response_model=list[UserPublic])
async def list_users(
    skip: int = Query(0, ge=0),
    limit: int = Query(20, ge=1, le=100),
    repo: BaseRepository[User] = Depends(get_user_repo)
):
    return await repo.get_multi(skip, limit)

@router.post("/", response_model=UserPublic, status_code=status.HTTP_201_CREATED)
async def create_user(
    data: UserCreate,
    repo: BaseRepository[User] = Depends(get_user_repo)
):
    # 实际项目中应在这里检查邮箱是否已存在
    hashed = pwd_context.hash(data.password)
    user_data = data.model_dump(exclude={"password"})
    user_data["hashed_password"] = hashed
    return await repo.create(user_data)

access/users?skip=0&limit=10You can get the paginated user list; to/usersSend a POST to create a user.


6. Pitfall guide: N+1 queries and transactions

In an asynchronous environment, lazy load will automatically issue additional queries, which can easily cause N+1 problems. Example:

Wrong approach (will trigger N+1 query):

posts = await repo.get_multi()
for post in posts:
    print(post.author.name)   # 每循环一次都会查一次 author

Correct approach: Use when queryingselectinloadPreload associated objects.

from sqlalchemy.orm import selectinload

result = await db.execute(
    select(Post).options(selectinload(Post.author))
)
posts = list(result.scalars().all())
for post in posts:
    print(post.author.name)   # 所有 author 已在一条 SQL 中加载完毕

6.2 Correctly handle asynchronous transactions

In asynchronous SQLAlchemy, it is recommended to useasync with db.begin()To manage transaction boundaries, it will automatically commit or rollback.

async def transfer_money(db: AsyncSession, from_id: int, to_id: int, amount: float):
    async with db.begin():
        from_user = await db.get(User, from_id)
        to_user = await db.get(User, to_id)
        from_user.balance -= amount
        to_user.balance += amount
    # 离开 with 块时如果没有异常,自动提交;有异常则自动回滚

🧰 Why notcommit() / rollback()
Manual calls are prone to omissions, especially in multi-layer nested logic.db.begin()The context manager will ensure that the transaction is processed correctly regardless of success or failure.


7. Quick review of core components

ComponentFunction
create_async_engineCreate an asynchronous database engine and configure connection pool parameters
AsyncSessionLocalAsynchronous session factory, generate independent session for each request
Mapped[T]SQLAlchemy 2.0 type annotations improve code readability and IDE support
selectinloadPreload associations to avoid N+1 queries
Repository modeIsolate data access logic to facilitate testing and maintenance

This solution can be directly used as the skeleton of a medium-sized project. On this basis, you can connect to Alembic to manage database migration, use Redis for caching, add distributed locks to handle concurrency, etc. Subsequent chapters will continue to delve into these practical skills, so please stay tuned.


🎉 Congratulations on completing the complete construction of FastAPI + SQLAlchemy 2.0 asynchronous ORM! Give it a try, and if you encounter any problems, please share them in the comment area.