Modern Python Coroutine Programming Guide

In the world of Python concurrent programming, coroutines have always been a powerful but slightly subtle existence. From the "black technology" implemented by early generators to today's elegantasync/awaitSyntactic sugar, the evolution of coroutines not only reflects the growth of the language, but also marks the overall popularity of the asynchronous programming paradigm. This article will take you from basic concepts to advanced patterns to fully master modern Python coroutines.


Basic concepts of coroutines

Coroutine (Coroutine), sometimes also called micro-threads or fibers, is a concurrent execution unit that is more lightweight than threads. The biggest difference between it and ordinary functions (subprograms) is that it can actively suspend itself during execution and be resumed later by external code.

Coroutine vs subroutine

FeaturesSubroutines (functions)Coroutines
Execution processOne entry, one return, strict call stackMultiple pauses and resumes, multiple entry points
Calling methodDirectfunc()callpassyield/sendorawaitCollaboration
Execution contextCreate an independent stack frame for each callMaintain its own execution state
Concurrency modelSynchronous blockingAsynchronous non-blocking

Advantages of coroutines

  1. Lightweight switching: Coroutine switching is completely controlled by the program, and there is no system overhead of thread switching.
  2. No lock mechanism required: Because it is only scheduled within the same thread, the natural "single thread" avoids race conditions and does not require complex locks.
  3. High concurrency: One thread can drive thousands of coroutines "running" at the same time.
  4. Write asynchronous logic like synchronous code:awaitThe process is paused without blocking the entire thread, greatly improving code readability.

Coroutine evolution in Python

The Python community has been exploring coroutines for a long time and has gone through three main stages along the way.

Phase 1: Generator coroutine (Python 2.5+)

existasync/awaitBefore its birth, people used generators toyieldFeatures cleverly simulate coroutine behavior:

def consumer():
    r = ''
    while True:
        n = yield r          # 执行到这里暂停,等待 send() 传入数据
        if not n:
            return
        print(f'[CONSUMER] 正在消费 {n}...')
        r = '200 OK'

def producer(c):
    c.send(None)             # 预激协程,让 consumer 走到第一个 yield
    for n in range(1, 6):
        print(f'[PRODUCER] 生产 {n}...')
        r = c.send(n)        # 向协程发送数据,同时恢复它
        print(f'[PRODUCER] 收到回复: {r}')
    c.close()

c = consumer()
producer(c)

Although the coroutine at this stage can be used, it needs to be manuallysendclose, and passed back and forth between multiple generators, the code is obscure and error-prone.

Second stage: asyncio +yield from(Python 3.4+)

Python 3.4 introduces the standard libraryasyncio, officially incorporating the coroutine into the "official system". At that time we used@asyncio.coroutinedecorators andyield fromgrammar:

import asyncio

@asyncio.coroutine
def consumer(n):
    print(f'[CONSUMER] 正在消费 {n}...')
    yield from asyncio.sleep(1)   # 挂起协程,把控制权交给事件循环
    return '200 OK'

@asyncio.coroutine
def producer():
    for n in range(1, 6):
        print(f'[PRODUCER] 生产 {n}...')
        r = yield from consumer(n)
        print(f'[PRODUCER] 收到回复: {r}')

loop = asyncio.get_event_loop()
loop.run_until_complete(producer())
loop.close()

yield fromIt simplifies the calling of sub-coroutines, but it is still not intuitive enough.

The third stage: modernasync/awaitCoroutines (Python 3.7+)

Python 3.5 introducedasync/await, 3.7 once again simplifies the way to start the event loop, presenting the asynchronous code style we are familiar with today:

import asyncio

async def task(name, delay):
    print(f"{name} 启动")
    await asyncio.sleep(delay)    # 主动挂起,让事件循环去处理其他任务
    print(f"{name}{delay}s 后完成")
    return delay

async def main():
    # 并发运行多个协程,等待全部结束
    results = await asyncio.gather(
        task("任务1", 2),
        task("任务2", 1),
        task("任务3", 3)
    )
    print(f"全部完成,结果:{results}")

asyncio.run(main())   # 一行代码启动事件循环

At this point, coroutine programming finally becomes natural, and the sequential sense of synchronized code is perfectly preserved.


Core concepts of coroutines

To truly make good use of modern coroutines, you need to understand several key roles behind them.

1. Event loop

The event loop is like a central scheduler. When a coroutine executesawaitWhen, it will tell the event loop: "I need to wait for a while, you can handle other coroutines first." The event loop will switch control to other ready coroutines, and then resume it when the suspension condition is met.

import asyncio

async def hello_world():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# Python 3.7 之前的老办法
# loop = asyncio.get_event_loop()
# loop.run_until_complete(hello_world())
# loop.close()

# 现在的推荐方式
asyncio.run(hello_world())

asyncio.run()It will create an event loop, run the incoming coroutine, and automatically clean up after completion. It is the entry point of most programs.

2. Waitable objects

existawaitOnly awaitable objects (Awaitable) can be placed after the keyword. There are three main categories in Python:

  • coroutine object: byasync defThe object returned by the function call must beawaitOr it will be executed only when packaged into tasks.
  • Task (Task): Passedasyncio.create_task()An object that wraps a coroutine. Once the task is created, it will be immediately scheduled for execution by the event loop. You can thenawaitIt gets results.
  • Future: represents an unfinished result. It is mostly used in underlying libraries and is not often directly contacted in ordinary application development.

3. Coroutines and tasks

The coroutine object itself will not actively run. You need to decide whether to wait for it directly or turn it into a task.

import asyncio

async def nested():
    return 42

async def main():
    # 方式一:直接 await 协程 —— 串行执行
    result = await nested()
    print(result)

    # 方式二:创建任务 —— 并发执行
    task = asyncio.create_task(nested())
    # 此时 nested() 已经在后台“跑”了
    print(await task)

asyncio.run(main())

if onlyawaitA coroutine is equivalent to waiting until it ends before continuing; creating a task is equivalent to "launching and forgetting", and then getting the results when the results are needed, which provides the basis for concurrency.


Advanced coroutine mode

1. Concurrent execution of coroutines

When there are a batch of independent asynchronous tasks, you can useasyncio.gatherLet them run simultaneously:

import asyncio

async def fetch_data(delay, id):
    print(f"开始获取数据 {id}...")
    await asyncio.sleep(delay)
    print(f"数据 {id} 获取完成")
    return {"id": id, "delay": delay}

async def main():
    # 同时“发射”三个任务
    results = await asyncio.gather(
        fetch_data(2, 1),
        fetch_data(1, 2),
        fetch_data(3, 3)
    )
    print("所有数据:", results)

asyncio.run(main())

If you want to more flexibly control the timing of task completion, you can useasyncio.wait, which returns the first completed task:

async def main():
    tasks = [
        asyncio.create_task(fetch_data(2, 1)),
        asyncio.create_task(fetch_data(1, 2)),
        asyncio.create_task(fetch_data(3, 3))
    ]
    # 等待第一个任务完成
    done, pending = await asyncio.wait(
        tasks, return_when=asyncio.FIRST_COMPLETED
    )
    first = done.pop()
    print(f"最先完成的任务结果: {first.result()}")

    # 继续等待剩下的任务
    await asyncio.wait(pending)

2. Combination of coroutine and thread pool

Coroutines are suitable for IO-intensive tasks, but if you have to use some synchronous blocking libraries in your code (such astime.sleep, file reading and writing, etc.), calling directly will slow down the entire event loop. At this time, the blocking operation can be handed over to the thread pool:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def blocking_io():
    print(f"阻塞 IO 开始 {time.strftime('%X')}")
    time.sleep(2)          # 模拟同步阻塞操作
    print(f"阻塞 IO 结束 {time.strftime('%X')}")
    return "IO 结果"

async def main():
    print(f"主协程开始 {time.strftime('%X')}")
    loop = asyncio.get_running_loop()

    # 把阻塞函数扔进默认线程池执行,事件循环不受影响
    result = await loop.run_in_executor(None, blocking_io)
    print("得取结果:", result)

    print(f"主协程结束 {time.strftime('%X')}")

asyncio.run(main())

For CPU-intensive tasks, you can also use a similar method to hand them over to the process pool, but the more common approach is to useconcurrent.futures.ProcessPoolExecutor

3. Coroutine timeout control

In actual projects, network requests or external services may not respond for a long time. We can set a timeout for the coroutine to prevent the task from getting stuck forever.

import asyncio

async def long_running_task():
    try:
        print("长时间任务启动")
        await asyncio.sleep(3600)    # 模拟耗时操作
        return "任务完成"
    except asyncio.CancelledError:
        print("任务被取消")
        raise

async def main():
    try:
        # 设置 1 秒超时,超时后会取消任务并抛出 TimeoutError
        await asyncio.wait_for(long_running_task(), timeout=1.0)
    except asyncio.TimeoutError:
        print("超时!")

asyncio.run(main())

Tip: After Python 3.11, you can also use the more elegantasync with asyncio.timeout(1.0):Context manager, the effect is similar.


Best Practices

Mastering the following principles in daily development can help you write stable and efficient coroutine code:

  1. Never call synchronous blocking functions in coroutines Use it if you want to pauseasyncio.sleep(), when encountering synchronous IO, throw it to the thread pool.

  2. Choose concurrency tools reasonably asyncio.gatherSuitable for "want it all",asyncio.waitGreat for "just one" or fine control over the order of completion.

  3. useasync withManage asynchronous resources Resources such as database connections and files should use asynchronous context managers to ensure clean shutdown.

  4. Don’t forget to catch exceptions If a coroutine throws an exception but nowhereawaitIt, exceptions will be "swallowed" by the event loop, leading to hard-to-find bugs. Be sure to be in the right placetry/except

  5. Enable debug mode to detect missed During development, you can passasyncio.run(coro, debug=True)Or set environment variablesPYTHONASYNCIODEBUG=1, causing asyncio to warn about coroutines that are not awaited after they are created.


Summarize

Modern Python coroutines passasync/awaitProvides us with a clear and intuitive asynchronous programming model. Compared with the ancient generator coroutine, it brings not only simple syntax, but also complete concurrency control capabilities and deep integration with the asynchronous IO ecosystem.

As computer scientist Donald Knuth said: "Subroutines are just a special case of coroutines." By mastering coroutine thinking, your code can more easily cope with high-concurrency IO scenarios - from web crawlers, API services to real-time communications, it can run fast and stably.

I hope this guide can help you open the door to coroutine programming and make your Python asynchronous code even more powerful.