FastAPI The Complete Guide to WebSocket Real-Time Communication

📂 Stage: Stage 6 - 2026 Featured Topics (AI Integration) 🔗 Related chapters: 流式响应 StreamingResponse · OAuth2 与 JWT 鉴权

Table of contents


WebSocket basic concepts

Why choose WebSocket?

Traditional HTTP communication uses the request-response model. The client must first initiate a request before the server can respond. If you want to achieve a "real-time" effect, you can only rely on the client to continuously poll - send an HTTP request every few seconds to see if there are any new messages. This approach has two obvious disadvantages:

  • Waste of bandwidth: Each request must carry complete HTTP header information, and the connection must be established repeatedly even if the server has no new data.
  • Higher delay: The actual delivery time of the message depends on the polling interval, and true instant push is not possible.

WebSocket is a full-duplex, long-connection communication protocol. It only needs to complete an HTTP handshake at the beginning, and then data can be exchanged on the same TCP connection. The server can push messages directly to the client with extremely low overhead, which is very suitable for scenarios that require real-time interaction.

Common application scenarios

  • AI real-time streaming conversations, multi-person chat rooms
  • System real-time notifications (work order updates, alarm reminders)
  • Status synchronization in collaborative editing (such as cursor position, document version)
  • Command synchronization and IoT device monitoring during game play

In FastAPI, you can handle WebSocket connections directly with a simple asynchronous syntax, which is much simpler than many traditional frameworks. Next we build a real-time communication service from scratch.


FastAPI WebSocket Getting Started

First WebSocket endpoint

Let’s look at the simplest example first: a WebSocket endpoint that receives client messages and returns them unchanged, while providing an HTML page for testing.

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import json
from datetime import datetime

app = FastAPI(title="道满WebSocket入门")

# 简单的前端测试页面
html = """
<!DOCTYPE html>
<html>
<head>
    <title>入门测试</title>
    <style>
        body {font-family: Arial; margin: 20px;}
        #msg {height: 300px; overflow-y: auto; border: 1px solid #ccc; padding: 10px;}
    </style>
</head>
<body>
    <h1>入门测试</h1>
    <div id="status">未连接</div>
    <div id="msg"></div>
    <input id="input" placeholder="输入消息..." onkeypress="if(event.key=='Enter')send()">
    <button onclick="send()">发送</button>
    <script>
        let ws;
        const status = document.getElementById('status'),
              msgDiv = document.getElementById('msg');

        function connect() {
            ws = new WebSocket(
                `${location.protocol === 'https:' ? 'wss:' : 'ws:'}//${location.host}/ws`
            );
            ws.onopen = () => {
                status.innerHTML = '<span style="color:green">已连接</span>';
            };
            ws.onmessage = (e) => {
                const d = JSON.parse(e.data);
                msgDiv.innerHTML += `<div>[${new Date().toLocaleTimeString()}] ${d.message}</div>`;
            };
            ws.onclose = () => {
                status.innerHTML = '<span style="color:red">已断开</span>';
                setTimeout(connect, 3000);  // 自动重连
            };
        }

        function send() {
            if (!ws || ws.readyState !== WebSocket.OPEN) return;
            ws.send(JSON.stringify({content: document.getElementById('input').value}));
            document.getElementById('input').value = '';
        }

        window.onload = connect;
    </script>
</body>
</html>
"""

@app.get("/")
async def get_page():
    return HTMLResponse(html)

@app.websocket("/ws")
async def ws_endpoint(websocket: WebSocket):
    await websocket.accept()               # 接受连接
    try:
        while True:
            data = await websocket.receive_text()
            content = json.loads(data).get("content", data)
            await websocket.send_json({
                "message": f"服务器收到:{content}",
                "time": datetime.utcnow().isoformat()
            })
    except WebSocketDisconnect:
        print("连接断开")

Key points:

  • await websocket.accept()Must be the first line of WebSocket, otherwise the connection will not be established.
  • passreceive_text()To continue receiving messages, usesend_json()Returning structured data is very convenient.
  • Will be thrown when the client actively closes or the network is interruptedWebSocketDisconnectException, we're here to do the cleanup.

The above example can only handle a single connection - all messages are just echoes to the server itself. In actual business, we need to manage multiple users, rooms, and push system broadcasts. Next, we will introduce Connection Manager to make all of this in an orderly manner.


Connection manager implementation

Enterprise-level real-time applications need to support multiple users online at the same time, isolate messages by room, and be able to broadcast system notifications. In order to achieve these functions, I will design aConnectionManagerClass to uniformly manage the life cycle of connections.

from fastapi import WebSocket
from typing import Dict, Set
import json
from datetime import datetime

class ConnectionManager:
    def __init__(self):
        # 全局活跃连接:user_id -> WebSocket
        self.active_connections: Dict[str, WebSocket] = {}
        # 房间管理:room_id -> Set(user_id)
        self.room_connections: Dict[str, Set[str]] = {}

    async def connect(self, user_id: str, websocket: WebSocket):
        """接受新连接,记录到活跃字典,并广播上线通知"""
        await websocket.accept()
        self.active_connections[user_id] = websocket
        await self.broadcast_system(f"用户 {user_id} 上线")

    def disconnect(self, user_id: str):
        """移除连接,并清理其加入的所有房间"""
        if user_id in self.active_connections:
            del self.active_connections[user_id]
            for room in self.room_connections.values():
                room.discard(user_id)
            # 清理空房间
            self.room_connections = {
                k: v for k, v in self.room_connections.items() if v
            }

    async def send_personal(self, user_id: str, message: dict):
        """给指定用户发送消息"""
        if user_id in self.active_connections:
            try:
                await self.active_connections[user_id].send_json(message)
            except Exception:
                self.disconnect(user_id)

    async def broadcast_room(self, room_id: str, message: dict, exclude_user: str = None):
        """房间内广播,可排除某个用户(比如消息发送者本身)"""
        if room_id not in self.room_connections:
            return
        for user_id in self.room_connections[room_id]:
            if exclude_user and user_id == exclude_user:
                continue
            await self.send_personal(user_id, message)

    async def broadcast_system(self, content: str):
        """全局系统广播"""
        message = {
            "type": "system",
            "content": content,
            "time": datetime.utcnow().isoformat()
        }
        for user_id in list(self.active_connections.keys()):
            await self.send_personal(user_id, message)

    async def join_room(self, user_id: str, room_id: str):
        """用户加入房间,并通知房间内其他人"""
        if room_id not in self.room_connections:
            self.room_connections[room_id] = set()
        self.room_connections[room_id].add(user_id)
        await self.broadcast_room(room_id, {
            "type": "join",
            "content": f"用户 {user_id} 加入房间",
            "time": datetime.utcnow().isoformat()
        }, exclude_user=user_id)

# 全局管理器实例
manager = ConnectionManager()

Now you can define an endpoint with a room and user ID:

@app.websocket("/ws/chat/{user_id}/{room_id}")
async def chat_ws(websocket: WebSocket, user_id: str, room_id: str):
    await manager.connect(user_id, websocket)
    await manager.join_room(user_id, room_id)
    try:
        while True:
            data = await websocket.receive_text()
            content = json.loads(data).get("content", data)
            await manager.broadcast_room(room_id, {
                "type": "chat",
                "user_id": user_id,
                "content": content,
                "time": datetime.utcnow().isoformat()
            })
    except WebSocketDisconnect:
        manager.disconnect(user_id)
        await manager.broadcast_system(f"用户 {user_id} 下线")

When accessing, use the formws://localhost:8000/ws/chat/user123/room1path. The manager will ensure:

  • There will be no conflict when the same user logs in multiple places (the design can be extended to single sign-on restrictions).
  • Room messages are only pushed to users in the room.
  • Automatically clean the room and global connections when the user exits to avoid memory leaks.

AI real-time assistant integration

With OpenAI's Streaming API, we can create an AI assistant with a typewriter effect, allowing each token to be pushed to the front end in real time, and the user experience is close to a real conversation. FastAPI's asynchronous features handle such long-running connections efficiently.

used hereAsyncOpenAIclient and use environment variables to store the API Key.

from openai import AsyncOpenAI
import os

# 从环境变量读取 API Key
ai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

@app.websocket("/ws/ai/{user_id}")
async def ai_ws(websocket: WebSocket, user_id: str):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            user_msg = json.loads(data).get("content", "")
            if not user_msg:
                continue

            # 发起流式调用
            response = await ai_client.chat.completions.create(
                model="gpt-4o-mini",   # 可替换为其他模型
                messages=[{"role": "user", "content": user_msg}],
                stream=True,
                temperature=0.7
            )

            # 逐 token 推送
            full_resp = ""
            async for chunk in response:
                if chunk.choices[0].delta.content:
                    token = chunk.choices[0].delta.content
                    full_resp += token
                    await websocket.send_json({
                        "type": "ai_token",
                        "token": token,
                        "partial": full_resp
                    })

            # 标识生成结束
            await websocket.send_json({
                "type": "ai_done",
                "full": full_resp
            })
    except WebSocketDisconnect:
        print(f"AI会话 {user_id} 断开")
  • The front end can be based ontypeFields distinguish message types:ai_tokenfor real-time rendering,ai_doneIndicates completion and can trigger subsequent actions (such as saving the record).
  • This endpoint can be further combined with a connection manager to realize an AI dialogue room shared by multiple people.

Core Best Practices

1. Security certification

Never transmit sensitive tokens (such as JWT) in the clear in the URL. It is recommended to pass the token through WebSocket query parameters and verify it when the connection is established. FastAPI allows the use ofQueryDepends on extracting parameters.

from fastapi import Query, Depends
from jose import JWTError, jwt
# 假设 SECRET_KEY 和 ALGORITHM 已在配置中定义

async def verify_ws_token(token: str = Query(...)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload.get("sub")  # 返回 user_id
    except JWTError:
        return None

@app.websocket("/ws/secure/chat/{room_id}")
async def secure_ws(
    websocket: WebSocket,
    room_id: str,
    user_id: str = Depends(verify_ws_token)
):
    if not user_id:
        # 认证失败,主动关闭连接并返回状态码
        await websocket.close(code=4001, reason="认证失败")
        return
    # 后续的业务逻辑……

At the same time, in the production environment, rate limits (such as the number of messages per minute) and the maximum number of concurrent connections must be combined to prevent abuse.

2. Performance optimization

  • Message batch processing: Can be used when the message volume of a single client is extremely highasyncio.QueueCache messages and send them in batches to reduce the number of system calls.
  • Connection timeout cleanup: Regularly scan idle connections and close them (distributed inspection can be implemented with the help of Redis's TTL function) to avoid memory being occupied by zombie connections.
  • Disable WebSocket compression: Frequently sent small messages (such as token streams) will cause additional overhead during compression and decompression. Enable compression only if the message size is large.

3. Cluster deployment and horizontal expansion

The number of connections that a single process can support is limited. When you need to serve millions of users, you must deploy the service to multiple nodes and use Redis publish and subscribe (Pub/Sub) to synchronize messages. All nodes share a Redis channel. After any node receives the message, it will publish it to Redis, and other nodes will listen to it and then push it to the local connection.

WebSocket load balancing requires special configuration because ordinary HTTP proxies do not support long-connect upgrades. Take Nginx as an example:

location /ws/ {
    proxy_pass http://backend_fastapi;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_read_timeout 86400;   # 保持长连接不超时
}

Choose a load balancer that supports WebSocket (Nginx, HAProxy, Cloudflare), and use sticky sessions or let all nodes share state through Redis to easily scale horizontally.


Summary

FastAPI makes building high-performance real-time applications incredibly easy with native asynchronous WebSocket support. The core path of the entire article can be summarized as:

  1. Basic Access: UseWebSocketDependencies quickly establish two-way communication.
  2. Connection Management: PassedConnectionManagerUnified management of users and rooms, and implementation of broadcasts, private messages, and system notifications.
  3. Security hardening: JWT authentication combined with query parameters to avoid token leakage; increase rate limit and concurrency control.
  4. AI streaming integration: Use OpenAI’s streaming API to achieve typewriter effects and create a true real-time assistant.
  5. Cluster expansion: Use Redis Pub/Sub and load balancer to expand the application from a single machine to a distributed system.

💡 Practical Suggestion: Start iterating from the simplest version - first verify the single connection logic, then add room management, and finally gradually enhance security and scalability. The effect of each step can be visually seen through a simple HTML page, reducing the difficulty of debugging.

Now you can build your own real-time chat room, AI assistant, or collaborative editor based on this knowledge. Happy coding!