The Complete Guide to FastAPI Asynchronous Programming

📂 Stage: Stage 2 - Advanced Black Technology (Core) 🔗 Related chapters: FastAPIdependency-injection · FastAPImiddleware-application

Table of contents

What is asynchronous programming?

Synchronous vs Asynchronous: The metaphor of waiting in line to eat 🍚

Imagine you go to the cafeteria to get food:

  • Synchronized: You stand at the window and wait for the chef to finish frying a dish, take it away, and then order the next dish. You can only wait while the chef cooks.
  • Asynchronous: You hand the menu to the chef, then get the cutlery and pour the drinks. When the food is ready, the waiter brings it directly to your table. You don't need to stand around waiting.

This is the core difference: whether you can switch to doing other things while waiting.

Why do web services need to be asynchronous?

Suppose three requests arrive at the same time:

请求 A(查询数据库,耗时 200ms)
请求 B(简单计算,耗时 1ms)
请求 C(调用外部 API,耗时 1000ms)
  • Synchronous mode: Processed in sequence, total time taken = 200 + 1 + 1000 = 1201ms
  • Asynchronous mode: A switches to B while waiting for the database, B completes the switch to C, and switches back to A while C waits → The total time is about 1000ms

In I/O-intensive scenarios (database queries, HTTP requests, file reading and writing), asynchronous allows a single thread to handle a large number of concurrent requests, significantly improving API performance.

Three major advantages of asynchronous programming

  1. High concurrent processing capability: A single process can handle thousands of concurrent connections
  2. High resource utilization: avoid the overhead of thread creation and switching
  3. Fast response: Other requests can be processed while I/O is waiting.

async/await detailed explanation

Basic syntax

import asyncio

# 定义一个协程函数(async def)
async def say_hello():
    print("Hello!")
    return "Hello from coroutine!"

# 运行协程的三种方式
# 方式一:asyncio.run()(推荐,主入口用)
asyncio.run(say_hello())

# 方式二:在已有事件循环中创建任务
async def main():
    task = asyncio.create_task(say_hello())  # 调度执行协程
    result = await task                       # 等待任务完成
    print(f"Result: {result}")

asyncio.run(main())

await What are you waiting for?

awaitYou can only wait for awaitable objects (Awaitable), including:

TypeExampleDescription
Coroutineawait coro()async defReturned object
Taskasyncio.create_task()Scheduled coroutines
Futureasyncio.Future()Unfinished result placeholder

asyncio.sleep vs time.sleep

import asyncio
import time

# ❌ 同步 sleep:阻塞整个线程
def sync_task():
    time.sleep(2)  # 2 秒内整条线程被卡住
    print("Sync done")

# ✅ 异步 sleep:让出控制权,处理其他协程
async def async_task():
    await asyncio.sleep(2)  # 2 秒内线程可以干别的
    print("Async done")

# 体验差异
async def compare():
    start = time.time()
    await asyncio.gather(async_task(), async_task())  # 并发执行
    print(f"并发耗时: {time.time() - start:.2f}s")  # 约 2 秒

asyncio.run(compare())

⚠️ is used in FastAPItime.sleep()Will block the entire event loop!

Event loop principle

What is an event loop?

The event loop is an asynchronous "dispatch center", workflow:

┌─────────────────────────────────────┐
│          事件循环(单线程)             │
├─────────────────────────────────────┤
│  1. 检查 I/O 事件(网络/文件/定时器)      │
│  2. 收集就绪任务,执行它们               │
│  3. 遇到 await 就挂起,切换到下一个任务     │
│  4. 重复...直到全部完成                  │
└─────────────────────────────────────┘

Event loop life cycle demonstration

import asyncio

async def task(name, sec):
    print(f"[{name}] 开始")
    await asyncio.sleep(sec)
    print(f"[{name}] 完成")

async def main():
    t1 = asyncio.create_task(task("A", 1))
    t2 = asyncio.create_task(task("B", 0.5))
    t3 = asyncio.create_task(task("C", 0.8))
    await asyncio.gather(t1, t2, t3)
    print("全部完成")

asyncio.run(main())
# 输出顺序说明:
# [B]开始 → [A]开始 → [C]开始
# → [B]完成 → [C]完成 → [A]完成

Asynchronous function calling rules

Four Golden Rules

CallerCalled functionCorrect writing
sync functionasync function❌ cannot be called directly
sync functionsync function✅ direct call
async functionasync function✅ await call
async functionsync function⚠️ Yes, but the time-consuming synchronization operation requires losing the thread pool

Calling time-consuming synchronous functions asynchronously

import asyncio
import time

def blocking_function():
    time.sleep(1)
    return "Blocking result"

# ✅ 推荐:Python 3.9+ 用 to_thread,更简洁
async def call_sync_simple():
    result = await asyncio.to_thread(blocking_function)
    return result

**Why not directlyawaitSynchronous function? ** Because the synchronized function is not a waitable object, it blocks the entire thread. Must useasyncio.to_thread()Throw it to a separate thread pool for execution so that the event loop can continue processing other tasks.

Asynchronous usage in FastAPI

Asynchronous routing vs synchronous routing

from fastapi import FastAPI
import asyncio

app = FastAPI()

# ✅ 异步路由:处理 I/O 密集型任务
@app.get("/async-data")
async def get_async_data():
    await asyncio.sleep(1)  # 模拟 I/O 等待
    return {"source": "async", "data": [1, 2, 3]}

# ✅ 同步路由:处理 CPU 密集型任务
@app.get("/sync-data")
def get_sync_data():
    result = sum(range(10**7))  # 纯计算
    return {"source": "sync", "result": result}

FastAPI will automatically detect the routing type:

  • if defined asasync def, will run in the event loop and encounterawaitJust hang
  • if defined asdef, will be thrown into the thread pool for execution, without blocking the main loop (but adding a small amount of thread overhead)

Asynchronous HTTP request (take httpx as an example)

from fastapi import FastAPI
import httpx
import asyncio

app = FastAPI()

@app.get("/fetch-multiple")
async def fetch_multiple():
    urls = [
        "https://api.github.com/users/octocat",
        "https://api.github.com/users/torvalds",
    ]
    async with httpx.AsyncClient(timeout=10) as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
    return [r.json() for r in responses]

💡 usehttpx.AsyncClientAs a context manager, you can reuse connections instead of creating them temporarily each timeAsyncClientMore efficient.

When to use synchronous code

Applicable scenarios for synchronization code

ScenarioReasonExample
CPU-intensive tasksasyncio is powerless against CPU-boundImage processing, encryption, large-scale computing
No asynchronous version of the libraryThird-party libraries only provide synchronous APIPillow, NumPy calculation part
Simple synchronous operationExtremely fast execution, no asynchronousization requiredos.path.exists(), basic mathematical operations
Startup/initialization codeOnly needs to be executed once, blocking has no impactapp = FastAPI()

Rule of Thumb:

  • Tasks are mainly I/O (network, disk, database) ➔ Useasync def
  • Tasks are mainly computational (CPU intensive) ➔ Usedef, FastAPI automatically loses the thread pool
  • If you are not sure, you can write it firstdef, and then optimize after performance testing.

Common Traps and Pit Avoidance Guide

Trap 1: Forget await

# ❌ 错误
async def bad():
    data = some_async_function()  # 返回协程对象,不会执行
    return data

# ✅ 正确
async def good():
    data = await some_async_function()
    return data

Trap 2: Serial await in a loop

# ❌ 错误:逐个等待,效率低
async def slow():
    results = []
    for url in urls:
        result = await fetch(url)  # 一个一个获取,总耗时累加
        results.append(result)

# ✅ 正确:并发等待
async def fast():
    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)  # 同时发起请求

Trap 3: Mixing synchronous blocking calls in asynchronous functions

import time

async def fragile_route():
    time.sleep(2)  # ❌ 阻塞事件循环,所有请求被卡住
    return {"status": "done"}

Fix Method:

  • Replace withawait asyncio.sleep(2)
  • or puttime.sleepput inawait asyncio.to_thread(time.sleep, 2)

Trap 4: Misuse of global variables

Since the event loop is single-threaded, no locks are required to modify variables, but the execution order between asynchronous tasks is uncertain, and excessive reliance on shared state is prone to bugs. It is recommended to use local state per request.

Practical Combat: Building an Asynchronous API Service

The following demonstrates a practical example: concurrently obtaining local "database" data and external GitHub API data.

from fastapi import FastAPI, HTTPException
import httpx
import asyncio

app = FastAPI()

# 模拟异步数据库查询
async def query_users():
    await asyncio.sleep(0.1)  # 模拟 I/O 延迟
    return [
        {"id": 1, "name": "Alice"},
        {"id": 2, "name": "Bob"},
    ]

# 异步获取 GitHub 用户信息
async def fetch_github_user(username: str):
    async with httpx.AsyncClient() as client:
        resp = await client.get(f"https://api.github.com/users/{username}")
        if resp.status_code == 404:
            raise HTTPException(status_code=404, detail="User not found")
        resp.raise_for_status()
        return resp.json()

@app.get("/users")
async def list_users():
    """并发获取本地用户列表 + GitHub 公开信息"""
    users, octocat = await asyncio.gather(
        query_users(),
        fetch_github_user("octocat")
    )
    users.append({
        "id": octocat["id"],
        "name": octocat["name"],
        "is_github": True,
        "avatar": octocat["avatar_url"]
    })
    return users

Operating effect: Two independent I/O operations are performed simultaneously, and the total time taken depends on the slowest one, not the sum of the two operations.

Performance optimization suggestions

  1. Use connection pool likehttpx.AsyncClientasyncpg.create_pool, to avoid creating a new connection for each request.
  2. Proper caching Synchronous calculations are availablefunctools.lru_cache, asynchronous caching optionalasyncio.LifoQueueOr third-party library.
  3. Avoid unnecessary serial await Unified use for independent tasksasyncio.gatherConcurrent execution.
  4. Monitor event loop delays Production environment can be accessedasynciodebug mode or use a tool likeprometheus_clientObserve the blockage.

Summary

ConceptCore Points
async defDefine coroutine function and return coroutine object
awaitWait for a waitable object and give up control
asyncio.sleepAsynchronous sleep, does not block the event loop
asyncio.gatherExecute multiple coroutines concurrently
asyncio.to_threadTime-consuming synchronization operations lose the thread pool
FastAPI routingI/O intensive useasync, CPU intensive usedef

💡 Remember: The core of asynchronous is to efficiently handle a large number of concurrent I/O requests, but do not do CPU-intensive calculations in asynchronous functions - that will make the entire event loop stuck. Only by rationally using the thread pool and allowing synchronization and asynchronous execution to perform their respective duties can the performance of FastAPI be maximized.