FastAPI oauth2-jwt-auth complete guide

📂 Stage: Stage 4 - Security and Authentication (Security) 🔗 Related chapters: FastAPIdependency-injection · FastAPIpassword-hashing-security

When you build an API, authentication is an integral part of it. This tutorial will take you from scratch to build a clear and scalable authentication system using FastAPI with OAuth2 password mode and JWT. We will cover the most basic quick implementation, and then slowly evolve to a modular, complete solution including refresh tokens and permission control.

Table of contents

Why choose JWT authentication?

Traditional Session authentication vs JWT authentication

In web development, there are two common authentication methods: traditional server-side Session and today’s theme JWT. They are completely different in design philosophy. We use a table to compare:

SolutionAdvantagesDisadvantagesApplicable scenarios
Session AuthenticationSimple and easy to use, the server can revoke the login status at any timeStateful, poor scalability (multiple servers need to share Session), cross-domain troubleSingle application, small project
JWT authenticationStateless, naturally suitable for distribution; does not rely on server memory; cross-domain friendlyToken is large in size and difficult to fail unilaterallyDistributed system, microservices, front-end and back-end separation API

Simply put, if your service is deployed into multiple instances, or the front end is an independent domain name, JWT will save you a lot of pitfalls.

Core Value of JWT

  1. Stateless - The server does not need to record who has logged in, and requests self-certification every time.
  2. Scalable — Multiple microservice instances can independently verify the same Token without sharing Session.
  3. Cross-domain friendly — The front end can put Token at any timeAuthorizationThe header is sent to the backend of different domains.
  4. Self-contained — Token embeds basic user information (such as user ID, role), reducing the number of database searches.
  5. Standardization — Follow RFC 7519 and have rich ecological tools.

Project dependency installation

First install the libraries we need:

pip install fastapi python-jose[cryptography] passlib[bcrypt] bcrypt python-multipart uvicorn
  • python-jose[cryptography]: Generate and verify JWT.
  • passlib[bcrypt] + bcrypt: Secure password hashing.
  • python-multipart: Parse OAuth2 form data.

Get started quickly: core implementation

Brief analysis of JWT structure

A JWT Token looks like a garbled code, but it actually consists of three parts, separated by dots:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.
eyJzdWIiOiIxIiwiZW1haWwiOiJhbGljZUBleGFtcGxlLmNvbSIsInJvbGUiOiJhZG1pbiIsImV4cCI6MTc0MzEyNzIwMH0.
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
     ↓                ↓                ↓
  Header          Payload           Signature
  • Header: Contains algorithm type{"alg":"HS256","typ":"JWT"}
  • Payload: Stores the claims we define, such as user ID, role, expiration time, etc. Note: This part is only Base64 encoded, not encrypted, so sensitive information cannot be placed.
  • Signature: Sign the Header and Payload with a key to prevent tampering. Only having the key can generate a legitimate signature.

Minimized complete example

Below is a complete code that can be run immediately. It exposes the login interface and the personal information interface that requires authentication. You can copy and run it first, feel the process, and then slowly dismantle it.

# main.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta, timezone
from pydantic import BaseModel

# ---------- 配置 ----------
SECRET_KEY = "your-secret-key-change-this-in-production-at-least-32-chars"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# ---------- 模拟数据库 ----------
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "hashed_password": "$2b$12$EixZaYb4uX512Gpq5vWveu5G9.5G9.5G9.5G9.5G9.5G9.5G9.5G",
        "email": "johndoe@example.com",
        "role": "user"
    }
}

# ---------- 初始化 ----------
app = FastAPI()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# ---------- Pydantic 模型 ----------
class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: str | None = None

# ---------- 工具函数 ----------
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_user(db, username: str):
    if username in db:
        return db[username]

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user["hashed_password"]):
        return False
    return user

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: str = Depends(oauth2_scheme)):
    """依赖注入:从请求头中提取并校验 JWT,返回当前用户"""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无法验证凭据",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

# ---------- 路由 ----------
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    """OAuth2 密码模式登录,返回 access_token"""
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user["username"], "role": user["role"]},
        expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: dict = Depends(get_current_user)):
    """返回当前用户信息(需要认证)"""
    return current_user

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

💡 Run test After starting the service, visithttp://127.0.0.1:8000/docsYou can test it directly in Swagger UI: execute first/tokenGet Token and click/users/meFill in the lock icon on the right with Token and you can see the current user information.

Through this example, you have seen the core routine of FastAPI authentication: Password verification → Issuing JWT → Dependency injection protected routing. Below we split it into a more standardized module structure to facilitate use in actual projects.

JWT tool module

Concentrating all JWT logic into one manager can reduce duplicate code and make it easier to change algorithms or add functionality in the future.

# auth/jwt.py
import uuid
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any
from jose import jwt, JWTError
from passlib.context import CryptContext
from fastapi import HTTPException, status

# ---------- 配置 ----------
SECRET_KEY = "your-secret-key-change-this-in-production-at-least-32-chars"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

class JWTManager:
    """JWT管理器 - 集中处理 Token 和密码哈希"""

    def __init__(self):
        self.secret_key = SECRET_KEY
        self.algorithm = ALGORITHM
        self.access_token_expire = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        self.refresh_token_expire = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)

    # ---------- 密码工具 ----------
    def verify_password(self, plain_password: str, hashed_password: str) -> bool:
        return pwd_context.verify(plain_password, hashed_password)

    def hash_password(self, password: str) -> str:
        return pwd_context.hash(password)

    # ---------- Token 生成 ----------
    def create_access_token(
        self,
        data: Dict[str, Any],
        expires_delta: Optional[timedelta] = None
    ) -> str:
        """生成访问令牌,包含唯一 ID、类型等声明"""
        to_encode = data.copy()
        expire = datetime.now(timezone.utc) + (expires_delta or self.access_token_expire)
        to_encode.update({
            "exp": expire.timestamp(),
            "iat": datetime.now(timezone.utc).timestamp(),
            "jti": str(uuid.uuid4()),      # 唯一标识,可用于黑名单
            "type": "access",
        })
        return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)

    def create_refresh_token(self, user_id: int) -> str:
        """生成刷新令牌,有效期更长"""
        return self.create_access_token(
            data={"sub": str(user_id), "type": "refresh"},
            expires_delta=self.refresh_token_expire
        )

    # ---------- Token 校验 ----------
    def decode_token(self, token: str) -> Dict[str, Any]:
        """解码并验证令牌,异常统一处理"""
        try:
            return jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
        except JWTError as e:
            raise self._create_credentials_exception(f"Token无效: {str(e)}")

    def _create_credentials_exception(self, detail: str = "无法验证凭据") -> HTTPException:
        return HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=detail,
            headers={"WWW-Authenticate": "Bearer"},
        )

# 全局实例,方便各模块引用
jwt_manager = JWTManager()

This module provides three core capabilities:

  • Password Hashing and Verification: Passedverify_passwordandhash_password
  • Generate Tokens for different purposes:create_access_tokenfor short-lived access tokens,create_refresh_tokenUsed for long-lived refresh tokens.
  • Uniform decoding entrance:decode_token, if the Token expires or the signature is incorrect, 401 will be thrown directly.

Authentication Service and Routing

With the JWT tool, we encapsulate a simple authentication service and separate the user verification logic from routing.

Authentication service layer

# services/auth_service.py
from typing import Optional
from auth.jwt import jwt_manager

# 模拟数据库
fake_users_db = {
    1: {
        "id": 1,
        "username": "johndoe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYb4uX512Gpq5vWveu5G9.5G9.5G9.5G9.5G9.5G9.5G9.5G",
        "role": "user",
        "is_active": True,
    }
}

class AuthService:
    """处理用户认证相关业务逻辑"""

    def authenticate_user(self, username: str, password: str) -> Optional[dict]:
        """根据用户名密码验证用户"""
        user = next((u for u in fake_users_db.values() if u["username"] == username), None)
        if not user:
            return None
        if not jwt_manager.verify_password(password, user["hashed_password"]):
            return None
        if not user.get("is_active"):
            return None
        return user

    def get_user_by_id(self, user_id: int) -> Optional[dict]:
        return fake_users_db.get(user_id)

Authentication routing

Now write a dedicated authentication route that exposes the login interface and returns both the access token and the refresh token.

# routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import timedelta

from auth.jwt import jwt_manager
from services.auth_service import AuthService

router = APIRouter(prefix="/auth", tags=["认证"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
auth_service = AuthService()

class TokenResponse(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"
    expires_in: int

@router.post("/login", response_model=TokenResponse)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    """OAuth2 密码模式登录,返回 access + refresh token"""
    user = auth_service.authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token = jwt_manager.create_access_token(
        data={"sub": str(user["id"]), "username": user["username"], "role": user["role"]},
    )
    refresh_token = jwt_manager.create_refresh_token(user["id"])

    return TokenResponse(
        access_token=access_token,
        refresh_token=refresh_token,
        token_type="bearer",
        expires_in=jwt_manager.access_token_expire.seconds,
    )

🧠 Design Idea

  • The access token has a short validity period, reducing the risk of leakage.
  • The refresh token has a long validity period, and the front desk can be refreshed silently, so users do not need to log in frequently.
  • Refresh token'stypeFields can be used to distinguish purposes and can be verified later when the refresh interface is implemented.

Dependency injection and protected routing

FastAPI's dependency injection is the most elegant part of authentication. We write several dependencies with different granularities to easily implement permission control for "ordinary users" and "administrators".

Dependency injection implementation

# dependencies.py
from fastapi import Depends, HTTPException, status
from auth.jwt import jwt_manager
from services.auth_service import AuthService
from routers.auth import oauth2_scheme

auth_service = AuthService()

async def get_current_user(token: str = Depends(oauth2_scheme)):
    """从 Token 中提取当前用户(基础依赖)"""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无法验证凭据",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt_manager.decode_token(token)
        user_id: int = int(payload.get("sub"))
        if user_id is None:
            raise credentials_exception
    except Exception:
        raise credentials_exception

    user = auth_service.get_user_by_id(user_id)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(current_user: dict = Depends(get_current_user)):
    """进一步检查用户是否为活跃状态"""
    if not current_user.get("is_active"):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="用户账户未激活"
        )
    return current_user

async def get_current_admin_user(current_user: dict = Depends(get_current_user)):
    """要求管理员角色"""
    if current_user.get("role") != "admin":
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="权限不足:需要管理员权限"
        )
    return current_user

These dependencies are like building blocks, stacking verification logic layer by layer. The routing function only needs to declare the required dependencies.

Protected routing

# routers/protected.py
from fastapi import APIRouter, Depends
from dependencies import get_current_active_user, get_current_admin_user

router = APIRouter(prefix="/protected", tags=["受保护路由"])

@router.get("/profile")
async def get_profile(current_user: dict = Depends(get_current_active_user)):
    """查看个人资料 - 需要已激活的登录用户"""
    # 过滤掉密码哈希等敏感字段
    return {k: v for k, v in current_user.items() if k != "hashed_password"}

@router.get("/admin/dashboard")
async def admin_dashboard(current_user: dict = Depends(get_current_admin_user)):
    """管理员面板 - 只有 admin 角色可以访问"""
    return {
        "message": "管理员仪表板",
        "user": current_user["username"],
        "role": current_user["role"],
    }

Now your routing can freely combine different permission dependencies, and code readability and security are greatly improved.

Security Best Practices

Functional run-through is only the first step, and these "protective shields" must be added to the production environment.

Core Security Recommendations

  1. Full HTTPS — Token will never be passed on the HTTP link, otherwise it will be transparent.
  2. Strong Keys + Rotation — JWT keys are at least 32 bytes, replaced regularly, and old keys can be retained for a period of time for migration.
  3. Short access token + long refresh token — The access token expires in 15~30 minutes, and the impact window after leakage is small; use the refresh token silently to exchange for a new access token.
  4. Token Blacklist — When the user logs out or changes their password, the current TokenjtiAdded to the Redis blacklist, unified interception at the gateway layer.
  5. Password security — Always store only bcrypt hashes, do not record plain text, and password strength requirements (length, character type) are verified at the registration end.
  6. Rate Limit — The login interface must limit the frequency (for example, 5 times/minute) to prevent brute force cracking.
  7. Security Response Header — SettingsX-Content-Type-Options: nosniffX-Frame-Options: DENYetc. to enhance overall protection.

Enterprise-level security configuration example

Centrally manage security-related configurations and easily overwrite them through environment variables.

# config/security.py
import os

class SecurityConfig:
    """安全配置(通过环境变量覆盖默认值)"""

    # JWT
    JWT_ALGORITHM: str = os.getenv("JWT_ALGORITHM", "HS256")
    JWT_SECRET_KEY: str = os.getenv("JWT_SECRET_KEY", "")

    # 密码策略
    PASSWORD_MIN_LENGTH: int = 12
    PASSWORD_REQUIRE_UPPERCASE: bool = True
    PASSWORD_REQUIRE_LOWERCASE: bool = True
    PASSWORD_REQUIRE_NUMBERS: bool = True
    PASSWORD_REQUIRE_SYMBOLS: bool = True

    # 速率限制
    LOGIN_RATE_LIMIT: str = "5/minute"
    API_RATE_LIMIT: str = "100/minute"

    # 会话与过期时间
    MAX_ACTIVE_SESSIONS: int = 5
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 15
    REFRESH_TOKEN_EXPIRE_DAYS: int = 7

security_config = SecurityConfig()

⚠️ Never write the key in the code in the production environment. It must be injected through environment variables. It is also recommended to use asymmetric algorithms (such as RS256) to further improve security.

Summary

Through this guide, we have built a FastAPI authentication system from simple to complete that can be implemented directly:

  1. Stateless Design — JWT self-contains user information and is perfectly suitable for multi-instance deployment.
  2. Fine-grained permission control — Dependency injection makes the permission division of ordinary users, active users, administrators, etc. clear at a glance.
  3. Secure Token Management - Supports short-term access tokens, long-term refresh tokens and blacklist mechanisms.
  4. Clear modular architecture — JWT manager, authentication service, and dependencies are independent and easy to test and expand.

💡 Remember three key points: use HTTPS to transmit tokens; maintain a short validity period + refresh mechanism; do rate limiting and log auditing. These are the basic disks for production environment security.


🔗 Extended reading