Complete Guide to FastAPI and Redis Integration

📂 Phase: Phase 3 - Data Persistence (Database) 🔗 Related Chapters: FastAPI SQLAlchemy 2.0实战 · FastAPI异步编程深度解析


1. Why does web service need Redis?

In the traditional architecture, every user request is directly sent to the database:

传统架构:
请求 → FastAPI → 数据库查询 → 返回

    单点瓶颈,响应变慢

As traffic grows, the database can easily become a performance shortcoming. After introducing Redis, we can cache hot data in memory, greatly reducing database pressure and improving response speed:

Redis优化架构:
请求 → FastAPI → Redis缓存(命中)→ 直接返回
          ↓              ↓
    数据库查询 ← 未命中 ← 缓存未命中

    Redis写入 ← 更新缓存

💡 Core value that Redis brings to FastAPI:

  • 🚀 Hotspot data cache: In-memory query is 10 to 100 times faster than the database;
  • 🔑 Distributed Session: When deploying multiple instances, all services share a login status and the user is unaware;
  • 📨 Lightweight Message Queue: No need to introduce RabbitMQ/Kafka, it can handle simple asynchronous tasks such as email sending and statistics;
  • 🏆 Real-time ranking list/check-in: Easily implemented using native data structures such as Sorted Set and BitMap.

2. Basic configuration and connection management

2.1 Dependency installation

pip install redis[hiredis] pydantic-settings python-dotenv

💡 hiredisIt is a Redis protocol parser implemented in C language. It has faster parsing speed and is recommended for use in production environments.

2.2 Configuration and connection

We first create a configuration class to manage Redis connection information in a unified manner, and use a connection pool to maintain long connections to avoid re-establishing the connection for each request.

# config.py
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    redis_url: str = "redis://localhost:6379/0"
    redis_max_connections: int = 20
    redis_socket_keepalive: bool = True

    class Config:
        env_file = ".env"   # 可读取本地 .env 文件

@lru_cache()
def get_settings():
    return Settings()

Next, encapsulate the Redis connection manager so that the application initializes the connection when it starts and releases it gracefully when it closes.

# redis_client.py
import redis.asyncio as redis
from config import get_settings

settings = get_settings()

class RedisManager:
    _client: redis.Redis | None = None

    @classmethod
    async def init(cls):
        """初始化连接池"""
        if not cls._client:
            cls._client = redis.from_url(
                settings.redis_url,
                max_connections=settings.redis_max_connections,
                socket_keepalive=settings.redis_socket_keepalive,
                decode_responses=True,   # 自动将 bytes 解码为 str
                encoding="utf-8"
            )
            await cls._client.ping()
            print("✅ Redis连接成功")

    @classmethod
    def get_client(cls) -> redis.Redis:
        if not cls._client:
            raise RuntimeError("Redis未初始化,请先调用 init()")
        return cls._client

Finally, mount the initialization work of Redis in the FastAPI life cycle:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from redis_client import RedisManager

@asynccontextmanager
async def lifespan(app: FastAPI):
    await RedisManager.init()
    yield
    await RedisManager.get_client().close()

app = FastAPI(lifespan=lifespan)

In this way, the Redis connection pool will be automatically established when the application starts, and all connections will be released when it is closed.


3. High performance caching strategy

Reasonable use of cache can significantly reduce the number of database queries. Below we implement a general asynchronous cache decorator and demonstrate its usage through actual scenarios.

3.1 Universal asynchronous cache decorator

The decorator will automatically generate a unique cache key based on the function name, parameters, etc., and directly return the result when the cache is hit, otherwise it will be written to the cache after executing the original function.

# cache/decorators.py
import json
import hashlib
from functools import wraps
from redis.asyncio import Redis
from redis_client import RedisManager

def cached(
    key_prefix: str,
    ttl: int = 300,   # 默认缓存 5 分钟
):
    """
    通用缓存装饰器,支持参数自动哈希
    用法示例:
    @cached("user:profile:{user_id}", ttl=600)
    async def get_user_profile(user_id: int): ...
    """
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            client: Redis = RedisManager.get_client()

            # 将参数转换为可哈希的字符串,避免缓存键冲突
            args_str = str(sorted(kwargs.items())) if kwargs else str(args)
            args_hash = hashlib.md5(args_str.encode()).hexdigest()[:8]
            cache_key = f"{key_prefix}:{args_hash}"

            # 先查缓存
            cached_val = await client.get(cache_key)
            if cached_val:
                return json.loads(cached_val)

            # 未命中则执行原函数
            result = await func(*args, **kwargs)

            # 写入缓存(使用 default=str 避免日期等类型序列化报错)
            await client.setex(
                cache_key,
                ttl,
                json.dumps(result, default=str)
            )
            return result
        return wrapper
    return decorator

async def invalidate_cache(pattern: str):
    """删除匹配指定模式的所有缓存键"""
    client = RedisManager.get_client()
    # 生产环境建议用 SCAN 代替 KEYS,避免阻塞
    keys = await client.keys(pattern)
    if keys:
        await client.delete(*keys)

Using the above decorator, we can easily cache time-consuming operations in business code.

# services/user_service.py
from cache.decorators import cached, invalidate_cache
from redis_client import RedisManager

# 缓存用户信息,15 分钟有效
@cached("user:profile:{user_id}", ttl=900)
async def get_user_profile(user_id: int):
    # 模拟一次数据库查询
    return {"id": user_id, "name": f"用户{user_id}", "email": f"{user_id}@example.com"}

# 更新用户信息后,主动删除相关缓存
async def update_user_profile(user_id: int, data: dict):
    print(f"更新用户{user_id}{data}")
    await invalidate_cache(f"user:profile:{user_id}*")  # 删除所有匹配的键

# 缓存热门文章列表,1 小时有效,并加入随机抖动避免缓存雪崩
@cached("posts:hot", ttl=3600)
async def get_hot_posts():
    # 模拟一次聚合查询
    return [{"id": i, "title": f"热门文章{i}"} for i in range(10)]

🛡️ Tips: In actual projects, you canttlfollowed by a smaller random number (e.g.ttl=3600 + random.randint(0, 300)) to prevent a large number of caches from expiring at the same time, causing excessive instantaneous pressure on the database.


4. Distributed Session Management

When multiple instances of FastAPI applications are deployed, the default memory Session cannot be shared. By storing Session in Redis, unified login status management in a distributed environment can be achieved.

4.1 Session Manager

# session/manager.py
import uuid
import json
from datetime import timedelta
from redis.asyncio import Redis
from redis_client import RedisManager

class RedisSessionManager:
    def __init__(self, prefix: str = "session:", expire: int = 86400 * 7):
        self.client: Redis = RedisManager.get_client()
        self.prefix = prefix
        self.expire = expire

    async def create(self, user_id: int, **extra) -> str:
        """创建新 Session,返回 session_id"""
        session_id = str(uuid.uuid4())
        key = self.prefix + session_id
        data = {"user_id": user_id, **extra}
        await self.client.setex(key, self.expire, json.dumps(data))
        return session_id

    async def get(self, session_id: str) -> dict | None:
        """获取 Session 数据,同时刷新过期时间"""
        key = self.prefix + session_id
        data = await self.client.get(key)
        if data:
            await self.client.expire(key, self.expire)  # 续期
            return json.loads(data)
        return None

    async def destroy(self, session_id: str) -> bool:
        """删除 Session(常用于登出)"""
        key = self.prefix + session_id
        return await self.client.delete(key) > 0

session_manager = RedisSessionManager()

4.2 Session authentication dependency

We can write a FastAPI dependency to automatically retrieve it from Cookie or Headersession_idand verify user identity.

from fastapi import Request, Cookie, HTTPException, Depends
from session.manager import session_manager

async def get_current_user(
    request: Request,
    session_id: str = Cookie(None)           # 优先从 Cookie 读取
):
    # 同时支持从自定义 Header 中传递(如 X-Session-ID)
    header_id = request.headers.get("X-Session-ID")
    current_id = header_id or session_id

    if not current_id:
        raise HTTPException(401, "请先登录")

    session = await session_manager.get(current_id)
    if not session:
        raise HTTPException(401, "Session已过期,请重新登录")
    return session

Using this dependency in an interface is very simple:

@app.get("/profile")
async def my_profile(user: dict = Depends(get_current_user)):
    return {"profile": await get_user_profile(user["user_id"])}

@app.post("/logout")
async def logout(session_id: str = Cookie(None)):
    if session_id:
        await session_manager.destroy(session_id)
    return {"msg": "登出成功"}

5. Lightweight asynchronous message queue

For tasks that do not require immediate response, such as sending emails and logging, you can directly use the data structure of Redis to build a simple message queue without introducing the heavyweight RabbitMQ.

5.1 List implements real-time queue

Using the List type of Redis, you can easily implement a FIFO (first in, first out) queue.

# queue/list_queue.py
import json
from redis_client import RedisManager

LIST_QUEUE = "fastapi:tasks"

async def enqueue_task(task_type: str, **task_data):
    """将任务放入队列尾部"""
    client = RedisManager.get_client()
    await client.rpush(LIST_QUEUE, json.dumps({"type": task_type, "data": task_data}))

The background consumer can continue to take tasks from the head of the queue and execute them:

async def process_list_queue():
    """独立进程或容器中运行,持续处理任务"""
    client = RedisManager.get_client()
    while True:
        # blpop 会阻塞等待,直到有新任务到来
        result = await client.blpop(LIST_QUEUE, timeout=0)
        if result:
            _, task_json = result
            task = json.loads(task_json)
            print(f"处理任务:{task}")
            # 根据 type 分发任务
            # if task["type"] == "send_welcome": await send_email(task["data"])

5.2 Sorted Set implements delay queue

Some tasks require delayed execution (for example: order cancellation not activated 10 minutes after registration). In this case, Sorted Set can be used to implement a delayed queue.

# queue/delay_queue.py
import json
import time
from redis_client import RedisManager

DELAY_QUEUE = "fastapi:delay_tasks"
TEMP_QUEUE = "fastapi:delay_temp"

async def enqueue_delay_task(task_type: str, delay_seconds: int, **task_data):
    """将任务以 score=执行时间戳 加入延迟队列"""
    client = RedisManager.get_client()
    score = time.time() + delay_seconds
    await client.zadd(DELAY_QUEUE, {json.dumps({"type": task_type, "data": task_data}): score})

The background poller checks every 1 second to see if any tasks are due and transfers them to the immediate queue:

async def process_delay_queue():
    client = RedisManager.get_client()
    while True:
        now = time.time()
        # 取出所有 score <= 当前时间 的任务
        tasks = await client.zrangebyscore(DELAY_QUEUE, 0, now, withscores=False)
        if tasks:
            pipe = client.pipeline()
            # 原子性地从延迟队列移除,并推送到即时队列
            pipe.zrem(DELAY_QUEUE, *tasks)
            for task in tasks:
                pipe.rpush(LIST_QUEUE, task)
            await pipe.execute()
        await asyncio.sleep(1)

⚠️ The above delay queue implementation is only a demonstration solution. It is recommended to use Redis Stream or a specialized message queue component in a production environment to obtain better reliability and functional support.


6. Pitfall avoidance guide and production practice

6.1 Common pitfalls and fixes

PitfallsError examplesFixes
Cache penetration
A large number of queries for non-existent IDs, each request is penetrated to the database
Directlyif not user: return NoneCache null values ​​and set a short expiration time (such as 60 seconds)
Cache avalanche
A large number of caches expire at the same time, overwhelming the database instantly
All caches are unifiedttl=3600Add a random value to the expiration time of each key (such asttl=3600+random.randint(0,300)
Memory Leakawait client.set("key", data)No expiration is setAll caches must be set with a reasonable expiration time to avoid permanent residence
Serialization ErrorDirectjson.dumps({"time": datetime.now()})usedefault=stror manually convert to string

6.2 Recommendations for production environment

  • Connection pool size:max_connectionsIt is generally set to about 2 times the number of CPU cores, and is adjusted according to the actual concurrency.
  • Disable Dangerous Commands: Forbidden in production environmentsKEYScommand, availableSCANInstead, or directly in the Redis configuration filerename-command KEYS ""
  • Persistence Strategy: Caching applications can use RDB alone, and Session classes can use RDB + AOF hybrid persistence.
  • Monitor key indicators: Followused_memory_humaninstantaneous_ops_per_seckeyspace_hitsetc. to ensure cache hit rate > 90%.

7. Summary

This article details the integration solution between FastAPI and Redis, covering:

  • ✅ Connection management and life cycle binding
  • ✅ Asynchronous cache decorator and typical business applications
  • ✅ Distributed Session implementation and identity authentication
  • ✅ List-based immediate queue and Sorted Set-based delayed queue
  • ✅ FAQ troubleshooting and production environment optimization suggestions

Once you master these skills, you can easily add high performance wings to FastAPI applications. You can explore further in the future:

  • Redis Stream implements a more reliable event-driven architecture
  • BitMap implements check-in and active user statistics
  • Sorted Set implements real-time rankings
  • Lua script implements complex atomic operations

Make Redis an indispensable acceleration engine in your architecture!