Complete Guide to FastAPI and Redis Integration
📂 Phase: Phase 3 - Data Persistence (Database)
🔗 Related Chapters: FastAPI SQLAlchemy 2.0实战 · FastAPI异步编程深度解析
1. Why does web service need Redis?
In the traditional architecture, every user request is directly sent to the database:
传统架构:
请求 → FastAPI → 数据库查询 → 返回
↓
单点瓶颈,响应变慢
As traffic grows, the database can easily become a performance shortcoming. After introducing Redis, we can cache hot data in memory, greatly reducing database pressure and improving response speed:
Redis优化架构:
请求 → FastAPI → Redis缓存(命中)→ 直接返回
↓ ↓
数据库查询 ← 未命中 ← 缓存未命中
↓
Redis写入 ← 更新缓存
💡 Core value that Redis brings to FastAPI:
- 🚀 Hotspot data cache: In-memory query is 10 to 100 times faster than the database;
- 🔑 Distributed Session: When deploying multiple instances, all services share a login status and the user is unaware;
- 📨 Lightweight Message Queue: No need to introduce RabbitMQ/Kafka, it can handle simple asynchronous tasks such as email sending and statistics;
- 🏆 Real-time ranking list/check-in: Easily implemented using native data structures such as Sorted Set and BitMap.
2. Basic configuration and connection management
2.1 Dependency installation
pip install redis[hiredis] pydantic-settings python-dotenv
💡 hiredisIt is a Redis protocol parser implemented in C language. It has faster parsing speed and is recommended for use in production environments.
2.2 Configuration and connection
We first create a configuration class to manage Redis connection information in a unified manner, and use a connection pool to maintain long connections to avoid re-establishing the connection for each request.
# config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
redis_url: str = "redis://localhost:6379/0"
redis_max_connections: int = 20
redis_socket_keepalive: bool = True
class Config:
env_file = ".env" # 可读取本地 .env 文件
@lru_cache()
def get_settings():
return Settings()
Next, encapsulate the Redis connection manager so that the application initializes the connection when it starts and releases it gracefully when it closes.
# redis_client.py
import redis.asyncio as redis
from config import get_settings
settings = get_settings()
class RedisManager:
_client: redis.Redis | None = None
@classmethod
async def init(cls):
"""初始化连接池"""
if not cls._client:
cls._client = redis.from_url(
settings.redis_url,
max_connections=settings.redis_max_connections,
socket_keepalive=settings.redis_socket_keepalive,
decode_responses=True, # 自动将 bytes 解码为 str
encoding="utf-8"
)
await cls._client.ping()
print("✅ Redis连接成功")
@classmethod
def get_client(cls) -> redis.Redis:
if not cls._client:
raise RuntimeError("Redis未初始化,请先调用 init()")
return cls._client
Finally, mount the initialization work of Redis in the FastAPI life cycle:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from redis_client import RedisManager
@asynccontextmanager
async def lifespan(app: FastAPI):
await RedisManager.init()
yield
await RedisManager.get_client().close()
app = FastAPI(lifespan=lifespan)
In this way, the Redis connection pool will be automatically established when the application starts, and all connections will be released when it is closed.
Reasonable use of cache can significantly reduce the number of database queries. Below we implement a general asynchronous cache decorator and demonstrate its usage through actual scenarios.
3.1 Universal asynchronous cache decorator
The decorator will automatically generate a unique cache key based on the function name, parameters, etc., and directly return the result when the cache is hit, otherwise it will be written to the cache after executing the original function.
# cache/decorators.py
import json
import hashlib
from functools import wraps
from redis.asyncio import Redis
from redis_client import RedisManager
def cached(
key_prefix: str,
ttl: int = 300, # 默认缓存 5 分钟
):
"""
通用缓存装饰器,支持参数自动哈希
用法示例:
@cached("user:profile:{user_id}", ttl=600)
async def get_user_profile(user_id: int): ...
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
client: Redis = RedisManager.get_client()
# 将参数转换为可哈希的字符串,避免缓存键冲突
args_str = str(sorted(kwargs.items())) if kwargs else str(args)
args_hash = hashlib.md5(args_str.encode()).hexdigest()[:8]
cache_key = f"{key_prefix}:{args_hash}"
# 先查缓存
cached_val = await client.get(cache_key)
if cached_val:
return json.loads(cached_val)
# 未命中则执行原函数
result = await func(*args, **kwargs)
# 写入缓存(使用 default=str 避免日期等类型序列化报错)
await client.setex(
cache_key,
ttl,
json.dumps(result, default=str)
)
return result
return wrapper
return decorator
async def invalidate_cache(pattern: str):
"""删除匹配指定模式的所有缓存键"""
client = RedisManager.get_client()
# 生产环境建议用 SCAN 代替 KEYS,避免阻塞
keys = await client.keys(pattern)
if keys:
await client.delete(*keys)
3.2 Caching practice: user information + popular articles
Using the above decorator, we can easily cache time-consuming operations in business code.
# services/user_service.py
from cache.decorators import cached, invalidate_cache
from redis_client import RedisManager
# 缓存用户信息,15 分钟有效
@cached("user:profile:{user_id}", ttl=900)
async def get_user_profile(user_id: int):
# 模拟一次数据库查询
return {"id": user_id, "name": f"用户{user_id}", "email": f"{user_id}@example.com"}
# 更新用户信息后,主动删除相关缓存
async def update_user_profile(user_id: int, data: dict):
print(f"更新用户{user_id}:{data}")
await invalidate_cache(f"user:profile:{user_id}*") # 删除所有匹配的键
# 缓存热门文章列表,1 小时有效,并加入随机抖动避免缓存雪崩
@cached("posts:hot", ttl=3600)
async def get_hot_posts():
# 模拟一次聚合查询
return [{"id": i, "title": f"热门文章{i}"} for i in range(10)]
🛡️ Tips: In actual projects, you canttlfollowed by a smaller random number (e.g.ttl=3600 + random.randint(0, 300)) to prevent a large number of caches from expiring at the same time, causing excessive instantaneous pressure on the database.
4. Distributed Session Management
When multiple instances of FastAPI applications are deployed, the default memory Session cannot be shared. By storing Session in Redis, unified login status management in a distributed environment can be achieved.
4.1 Session Manager
# session/manager.py
import uuid
import json
from datetime import timedelta
from redis.asyncio import Redis
from redis_client import RedisManager
class RedisSessionManager:
def __init__(self, prefix: str = "session:", expire: int = 86400 * 7):
self.client: Redis = RedisManager.get_client()
self.prefix = prefix
self.expire = expire
async def create(self, user_id: int, **extra) -> str:
"""创建新 Session,返回 session_id"""
session_id = str(uuid.uuid4())
key = self.prefix + session_id
data = {"user_id": user_id, **extra}
await self.client.setex(key, self.expire, json.dumps(data))
return session_id
async def get(self, session_id: str) -> dict | None:
"""获取 Session 数据,同时刷新过期时间"""
key = self.prefix + session_id
data = await self.client.get(key)
if data:
await self.client.expire(key, self.expire) # 续期
return json.loads(data)
return None
async def destroy(self, session_id: str) -> bool:
"""删除 Session(常用于登出)"""
key = self.prefix + session_id
return await self.client.delete(key) > 0
session_manager = RedisSessionManager()
4.2 Session authentication dependency
We can write a FastAPI dependency to automatically retrieve it from Cookie or Headersession_idand verify user identity.
from fastapi import Request, Cookie, HTTPException, Depends
from session.manager import session_manager
async def get_current_user(
request: Request,
session_id: str = Cookie(None) # 优先从 Cookie 读取
):
# 同时支持从自定义 Header 中传递(如 X-Session-ID)
header_id = request.headers.get("X-Session-ID")
current_id = header_id or session_id
if not current_id:
raise HTTPException(401, "请先登录")
session = await session_manager.get(current_id)
if not session:
raise HTTPException(401, "Session已过期,请重新登录")
return session
Using this dependency in an interface is very simple:
@app.get("/profile")
async def my_profile(user: dict = Depends(get_current_user)):
return {"profile": await get_user_profile(user["user_id"])}
@app.post("/logout")
async def logout(session_id: str = Cookie(None)):
if session_id:
await session_manager.destroy(session_id)
return {"msg": "登出成功"}
5. Lightweight asynchronous message queue
For tasks that do not require immediate response, such as sending emails and logging, you can directly use the data structure of Redis to build a simple message queue without introducing the heavyweight RabbitMQ.
5.1 List implements real-time queue
Using the List type of Redis, you can easily implement a FIFO (first in, first out) queue.
# queue/list_queue.py
import json
from redis_client import RedisManager
LIST_QUEUE = "fastapi:tasks"
async def enqueue_task(task_type: str, **task_data):
"""将任务放入队列尾部"""
client = RedisManager.get_client()
await client.rpush(LIST_QUEUE, json.dumps({"type": task_type, "data": task_data}))
The background consumer can continue to take tasks from the head of the queue and execute them:
async def process_list_queue():
"""独立进程或容器中运行,持续处理任务"""
client = RedisManager.get_client()
while True:
# blpop 会阻塞等待,直到有新任务到来
result = await client.blpop(LIST_QUEUE, timeout=0)
if result:
_, task_json = result
task = json.loads(task_json)
print(f"处理任务:{task}")
# 根据 type 分发任务
# if task["type"] == "send_welcome": await send_email(task["data"])
5.2 Sorted Set implements delay queue
Some tasks require delayed execution (for example: order cancellation not activated 10 minutes after registration). In this case, Sorted Set can be used to implement a delayed queue.
# queue/delay_queue.py
import json
import time
from redis_client import RedisManager
DELAY_QUEUE = "fastapi:delay_tasks"
TEMP_QUEUE = "fastapi:delay_temp"
async def enqueue_delay_task(task_type: str, delay_seconds: int, **task_data):
"""将任务以 score=执行时间戳 加入延迟队列"""
client = RedisManager.get_client()
score = time.time() + delay_seconds
await client.zadd(DELAY_QUEUE, {json.dumps({"type": task_type, "data": task_data}): score})
The background poller checks every 1 second to see if any tasks are due and transfers them to the immediate queue:
async def process_delay_queue():
client = RedisManager.get_client()
while True:
now = time.time()
# 取出所有 score <= 当前时间 的任务
tasks = await client.zrangebyscore(DELAY_QUEUE, 0, now, withscores=False)
if tasks:
pipe = client.pipeline()
# 原子性地从延迟队列移除,并推送到即时队列
pipe.zrem(DELAY_QUEUE, *tasks)
for task in tasks:
pipe.rpush(LIST_QUEUE, task)
await pipe.execute()
await asyncio.sleep(1)
⚠️ The above delay queue implementation is only a demonstration solution. It is recommended to use Redis Stream or a specialized message queue component in a production environment to obtain better reliability and functional support.
6. Pitfall avoidance guide and production practice
6.1 Common pitfalls and fixes
6.2 Recommendations for production environment
- Connection pool size:
max_connectionsIt is generally set to about 2 times the number of CPU cores, and is adjusted according to the actual concurrency.
- Disable Dangerous Commands: Forbidden in production environments
KEYScommand, availableSCANInstead, or directly in the Redis configuration filerename-command KEYS ""。
- Persistence Strategy: Caching applications can use RDB alone, and Session classes can use RDB + AOF hybrid persistence.
- Monitor key indicators: Follow
used_memory_human、instantaneous_ops_per_sec、keyspace_hitsetc. to ensure cache hit rate > 90%.
7. Summary
This article details the integration solution between FastAPI and Redis, covering:
- ✅ Connection management and life cycle binding
- ✅ Asynchronous cache decorator and typical business applications
- ✅ Distributed Session implementation and identity authentication
- ✅ List-based immediate queue and Sorted Set-based delayed queue
- ✅ FAQ troubleshooting and production environment optimization suggestions
Once you master these skills, you can easily add high performance wings to FastAPI applications. You can explore further in the future:
- Redis Stream implements a more reliable event-driven architecture
- BitMap implements check-in and active user statistics
- Sorted Set implements real-time rankings
- Lua script implements complex atomic operations
Make Redis an indispensable acceleration engine in your architecture!