Complete Guide to FastAPIstreaming-response

📂 Related resources: websocket-realtime-communication · 异步编程深度解析

Table of contents


What is streaming response?

Imagine you are waiting for a 3-hour movie file to download: the traditional approach is to wait for the server to prepare the entire file and then throw it to you in one go. This leads to two problems:

  • The server memory may be exhausted (for example, a large file of 4GB is read directly into the memory)
  • You have to wait a long time to see the first frame

And streaming response is like playing while downloading: the server sends a small piece of data immediately when it is ready, and the client can also consume this small piece immediately. There is no need to wait for the full amount, and there is no need to hoard all the data.

Traditional HTTP vs Streaming Response

Comparison dimensionsTraditional full responseStreaming response
Return MethodThe server returns all data at once after processing itPush while processing, the client receives the chunked data immediately
Memory usageThe entire response body must be entirely in memoryOnly the small chunk of data currently processed must be cached
Time to First ByteSlow, the user keeps waitingFast, the first piece of content is received almost immediately
Suitable for scenariosSimple JSON interface, small page renderingAI generated text, large file download, real-time log, progress bar

Streaming responses in FastAPI mainly rely onStreamingResponseTo achieve this, with Python's async generator, the data can be gradually "flowed out" like a faucet.

Choose SSE or WebSocket?

Many students will be confused: how to choose between streaming response, SSE and WebSocket? Simply put:

  • StreamingResponse: The most basic streaming output, suitable for one-time streams (such as downloading files, simple digital sequences). Development is the easiest.
  • SSE (Server-Sent Events): Built on StreamingResponse, it is a standardized one-way push protocol. The browser has native support and comes with functions such as automatic reconnection and event classification, which is very suitable for token flow, status updates, etc. in AI conversations.
  • WebSocket: full-duplex two-way communication with higher complexity, suitable for scenarios that require the client to frequently send messages to the server, such as chat, games, and collaborative editing.

This article focuses on StreamingResponse and its most commonly used upper wrapper - SSE.


StreamingResponse basics and optimization

Simplest example: digital stream

Let's start with the simplest streaming response, letting the server push a number every 0.5 seconds:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def num_stream():
    """异步生成器,逐步产出数据块"""
    for i in range(1, 6):
        yield f"收到第{i}个数据块\n"
        await asyncio.sleep(0.5)  # 模拟耗时操作,比如数据库查询、AI计算

@app.get("/simple-stream")
async def simple_stream():
    return StreamingResponse(
        num_stream(),               # 传入异步生成器
        media_type="text/plain",    # 纯文本流,也可以是 application/json 等
        headers={
            "Cache-Control": "no-cache",  # 禁止代理/浏览器缓存,保证实时性
            "Connection": "keep-alive"    # 保持长连接
        }
    )

accesshttp://localhost:8000/simple-stream, you will find that the browser does not display the entire content at once, but adds an extra line every 0.5 seconds, just like the output of a terminal command.

Handle client disconnection gracefully

The user may refresh the page or close the tab in the middle. At this time, FastAPI will cancel the running asynchronous task and triggerasyncio.CancelledError. We must catch this exception in the generator in order to release resources such as database connections and close files.

async def safe_num_stream():
    try:
        for i in range(1, 6):
            yield f"收到第{i}个数据块\n"
            await asyncio.sleep(0.5)
    except asyncio.CancelledError:
        # 这里可以做一些清理工作,比如记录日志、释放锁
        print("客户端主动断开,清理资源...")
        # 注意:必须重新抛出异常,否则FastAPI无法正确完成清理
        raise

Key Point: CaptureCancelledErrormust beraiseThis is a convention of FastAPI's internal life cycle management, otherwise it may cause connection leaks.


SSE server sends events

SSE (Server-Sent Events) is a set of format specifications defined based on StreamingResponse. The purpose is to allow browsers toEventSourceThe API can directly and elegantly receive messages pushed by the server. Its benefits include:

  • Automatic reconnect: Automatically recovers after network disconnection and reconnects. Default is 3 seconds to retry.
  • Event Classification: You can add event names to messages, and the front end processes them according to different event types.
  • Lightweight: Based on HTTP, no additional libraries required, firewall friendly.

SSE data format

SSE is delivered via a plain text stream, with strict format requirements:

  • Each message starts withdata: Beginning, followed by specific content (can be plain text or JSON)
  • The end of message is marked by two consecutive newlines\n\n
  • Optional fields:event:Define the event type,id:Set the message number (for resume transmission),retry:Set reconnection interval (milliseconds)

Let’s look at a complete SSE endpoint example:

import json
from datetime import datetime

async def sse_status_stream():
    # 第一条消息建议发送重连间隔,告诉浏览器10秒后再重试
    yield "retry: 10000\n\n"
    
    for i in range(1, 11):
        # 普通进度消息,不带事件类型
        normal_data = {
            "step": i,
            "progress": i * 10,
            "time": datetime.now().isoformat()
        }
        yield f"data: {json.dumps(normal_data)}\n\n"
        
        # 每完成3步发送一个 checkpoint 事件
        if i % 3 == 0:
            status_data = {"state": "checkpoint_reached", "step": i}
            # 格式: event: <事件名>\ndata: <json>\n\n
            yield f"event: checkpoint\ndata: {json.dumps(status_data)}\n\n"
        
        await asyncio.sleep(0.8)

@app.get("/sse-status")
async def sse_status():
    return StreamingResponse(
        sse_status_stream(),
        media_type="text/event-stream",   # ★ 必须是这个MIME类型,浏览器才能识别
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"     # 禁用Nginx缓冲(非常重要!)
        }
    )

⚠️ X-Accel-Buffering: noThis response header is key. Without it, when you use Nginx as a reverse proxy, the data will be buffered, and it may take several seconds for the front end to receive a batch of messages, completely losing real-time performance.


AI dialogue typewriter effect

In AI dialogue products such as ChatGPT, responses are "typed" word by word, which is the classic Typewriter effect. The implementation principle is exactly SSE streaming output: every time a token is generated by the backend, it is pushed immediately.

Simulate AI reply

Below we simulate a simplified AI answering process and send sentences character by character:

import json
from fastapi import Request

async def mock_ai_stream(user_msg: str):
    # 模拟AI生成的完整回复
    ai_reply = f"您的问题是「{user_msg}」,让我为您逐步分析:首先,我们要理清需求边界;其次,梳理实现路径;最后,优化细节。"
    
    accumulated = ""
    for char in ai_reply:
        accumulated += char
        chunk = {
            "type": "token",      # 消息类型
            "content": char,      # 当前新增的字符
            "full": accumulated   # 累积完整文本,方便前端直接渲染
        }
        yield f"data: {json.dumps(chunk)}\n\n"
        await asyncio.sleep(0.02)  # 模拟AI生成延迟(一般实际生产可能是几十毫秒)

@app.post("/mock-ai-chat")
async def mock_ai_chat(req: Request):
    body = await req.json()
    user_msg = body.get("msg", "")
    return StreamingResponse(
        mock_ai_stream(user_msg),
        media_type="text/event-stream",
        headers={"X-Accel-Buffering": "no"}
    )

After the front end gets each chunk, it can be spliced ​​into the conversation interface in real time to create a typing effect. Usually one is added"[DONE]"The mark indicates the end of the stream, but this example is omitted for simplicity. Please refer to OpenAI’s response format in real scenarios.


File stream and log real-time push

Download large files in chunks

Streaming responses also apply to file downloads. Traditional methods may be usedFileResponseReading the entire file into memory before sending it causes problems when the file reaches gigabytes. useStreamingResponseWith asynchronous file reading, we can read a small block (such as 8KB) and send it each time, and the memory usage is always controlled within the block size.

import aiofiles
from pathlib import Path

async def file_chunk_stream(file_path: str, chunk_size: int = 8192):
    p = Path(file_path)
    if not p.exists():
        yield b"File not found"
        return
    
    async with aiofiles.open(p, "rb") as f:
        while chunk := await f.read(chunk_size):
            yield chunk   # 二进制块直接 yield

@app.get("/download/{filename}")
async def download_file(filename: str):
    local_path = f"./data/{filename}"  # 生产环境务必做路径穿越检查!
    return StreamingResponse(
        file_chunk_stream(local_path),
        media_type="application/octet-stream",
        headers={
            "Content-Disposition": f"attachment; filename={filename}"
        }
    )

Real-time log push

Similarly, we can push the logs generated by the application to the front-end dashboard in real time. For example, there is a log queue in the background, and the generator continuously fetches log lines from it and sends them:

import asyncio
from collections import deque

log_queue = deque()

async def log_stream():
    while True:
        if log_queue:
            line = log_queue.popleft()
            yield f"data: {line}\n\n"
        else:
            await asyncio.sleep(0.1)  # 避免空转消耗CPU

This method is often used for monitoring systems and CI/CD pipeline log display.


Key configuration of production environment

Nginx reverse proxy

Most FastAPI applications will have a layer of Nginx hanging in front of them. If Nginx enables buffering for SSE or streaming interfaces, your data will be "blocked" and the front end will have to wait until the buffer is full before it can see the content, completely losing the streaming characteristics. Therefore buffering must be disabled for the relevant paths:

location /stream/ {
    proxy_pass http://uvicorn_backend;
    proxy_http_version 1.1;

    # 核心:关闭所有缓冲
    proxy_buffering off;
    proxy_cache off;
    proxy_set_header X-Accel-Buffering no;
    
    # 长连接超时设置(根据业务调整)
    proxy_read_timeout 300s;
    proxy_send_timeout 300s;
    keepalive_timeout 300s;
}

Note that if your streaming interface path is not/stream/, remember to modify accordinglylocationblock, or via response headersX-Accel-Buffering: noClose them all together (just choose one of the two).

Uvicorn / Gunicorn parameters

It is recommended to use asynchronous workers to take advantage of the asynchronous features of FastAPI, and at the same time increase the timeout and concurrency limits:

# 使用 uvicorn 直接运行(适合开发、小规模生产)
uvicorn main:app --workers 4 \
    --worker-class uvicorn.workers.UvicornWorker \
    --timeout-keep-alive 300 \
    --limit-concurrency 100
  • --workers 4: The production environment is generally set to the number of CPU cores
  • --worker-class uvicorn.workers.UvicornWorker: Specify asynchronous workers when managing Gunicorn
  • --timeout-keep-alive 300: Maintain the maximum idle time of the connection, adapting to long connection scenarios
  • --limit-concurrency 100: Limit the number of requests processed simultaneously to prevent resource exhaustion

Minimalist front-end integration solution

The most convenient way for the front end to receive SSE messages is to use the browser's nativeEventSourceObject without introducing any third-party libraries.

<!DOCTYPE html>
<html>
<body>
    <div id="output" style="font-family: monospace; white-space: pre-wrap;"></div>
    <script>
        // 连接到 SSE 端点
        const es = new EventSource("/sse-status");
        const output = document.getElementById("output");

        // 1. 接收默认事件(没有 event: 前缀的消息)
        es.onmessage = (e) => {
            const data = JSON.parse(e.data);
            output.innerHTML += `默认事件:进度 ${data.progress}%<br>`;
        };

        // 2. 接收自定义 checkpoint 事件
        es.addEventListener("checkpoint", (e) => {
            const data = JSON.parse(e.data);
            output.innerHTML += `<span style="color:blue;">✅ 检查点 ${data.step} 达成!</span><br>`;
        });

        // 3. 错误处理(连接中断或异常时触发,会自动重连)
        es.onerror = () => console.log("连接出错,尝试自动重连...");

        // 4. 如果需要主动关闭连接
        // setTimeout(() => es.close(), 10000);
    </script>
</body>
</html>

EventSourceThe built-in reconnection mechanism is very practical: after network jitter or server restart, it will automatically reconnect at certain intervals. You can also send it through the serverretry:field to adjust the reconnection interval.


📝 Summary: FastAPIStreamingResponseIt is the cornerstone of realizing streaming response. With the SSE standard protocol, you can easily build smooth AI conversations, real-time log panels and large file download services. The key is: Asynchronous generators + appropriate MIME types + disabling buffering at all levels. By mastering these three points, you can run streaming applications stably in a production environment.