Python asyncio asynchronous programming tutorial

1. Overview

asyncioIt is a powerful tool in the Python standard library for writing asynchronous I/O programs. It was included in the standard library as early as Python 3.4. Starting from Python 3.7, the API is stable and easy to use. It simulates the "parallel" effect through single-threaded event loop, which is very suitable for I/O-intensive scenarios such as crawlers, microservice communication, and real-time chat systems. If your tasks are mainly CPU-intensive operations such as numerical calculations, encryption and decryption, you should give priority tomultiprocessing, but in high concurrent I/O scenarios,asyncioThe advantages are unparalleled.

Compared with traditional multi-thread/multi-process solutions,asyncioThere is no need to switch contexts frequently, resource usage is lower, and one process can easily manage thousands of concurrent connections.


2. Core concepts: event loop and coroutine

Understand these two concepts, and all the subsequent codes will suddenly become clear.

2.1 Event Loop

The event loop isasyncioThe general dispatch center is responsible for three things:

  • Receive coroutine tasks and put them into the scheduling queue;
  • When a task enters I/O wait (e.g.await asyncio.sleep(1)orawait 网络请求), actively transfer control to other unblocked tasks to continue execution;
  • When the waiting is over, switch back to the previous task and continue running.

In Python 3.7 and later, we no longer need to manually create and manage event loops,asyncio.run()It will automatically create and destroy the top-level event loop for us.

2.2 Coroutine

The coroutine isasyncioThe minimum execution unit, useasync defdefinition:

async def my_first_coro():
    print("我是协程")

Note: call directlymy_first_coro()It does not execute the internal code, it just returns a coroutine object. If you want the coroutine to actually run, you must passawaitasyncio.run()Or event loop to schedule it.


3. Basic operation: start from a single coroutine

3.1 Run top-level coroutine

The simplest and most recommended way to write is to useasyncio.run()

import asyncio

async def hello():
    print("Hello world!")
    # 模拟一个耗时 1 秒的 I/O 操作(例如读文件、网络请求)
    await asyncio.sleep(1)
    print("Hello again!")

# 启动顶层协程
asyncio.run(hello())

Run this code and you will first seeHello world!, wait for 1 second and then seeHello again!——It looks the same as synchronous code, but if multiple tasks are run at the same time, the advantages of concurrency are immediately apparent.

3.2 async/awaitSyntactic sugar

  • async def: Used to declare a coroutine function, and can also define asynchronous context managers, asynchronous iterators, etc.;
  • await: Pause the current coroutine, return control to the event loop, and wait for subsequent asynchronous operations to complete before resuming execution.

useawaitThere are two hard and fast rules:

  • ✅ Only available onasync defUsed internally within the function;
  • awaitIt must be followed by a coroutine object, Task object or asynchronous Future object (you only need to pay attention to the first two in the beginner stage).

4. Concurrent execution: run multiple tasks at one time

There is no difference between single coroutine and synchronous writing, but concurrency isasyncioof value. There are two common methods of concurrent execution.

4.1 Batch parallelization:asyncio.gather()

When you want to "submit a batch of tasks together and obtain the results after all tasks are completed", usegather()Most convenient:

import asyncio

async def greet(name: str):
    print(f"👋 Hello {name}!")
    await asyncio.sleep(1)   # 模拟给不同人发消息的延迟
    print(f"👋 Goodbye {name}!")
    return name

async def main():
    # gather 会收集所有协程的返回值
    results = await asyncio.gather(
        greet("Alice"),
        greet("Bob"),
        greet("Charlie")
    )
    print("📋 所有问候完成,参与者:", results)

asyncio.run(main())

Executing this code, you will see that 3 Hellos appear almost at the same time, and after a pause of 1 second, 3 Goodbyes appear almost at the same time - the total time is about 1 second, and the synchronous method takes 3 seconds.

4.2 Flexible scheduling:asyncio.create_task()

gather()It is a batch wait of "waiting for all tasks to be completed", whilecreate_task()You can immediately wrap the coroutine into a Task object and add it to the event loop, and then you can more flexibly cancel the task, wait alone, or add tasks midway.

import asyncio

async def slow_task(name: str, delay: int):
    print(f"⏳ 任务 {name} 启动,预计 {delay}s 完成")
    await asyncio.sleep(delay)
    print(f"✅ 任务 {name} 完成")
    return name

async def main():
    # 立即创建并启动两个任务
    task_a = asyncio.create_task(slow_task("A", 2))
    task_b = asyncio.create_task(slow_task("B", 1))

    # 等待所有任务完成
    results = await asyncio.gather(task_a, task_b)
    print(f"🎉 所有任务完成:{results}")

asyncio.run(main())

Note: Python 3.7+ is recommendedasyncio.create_task(), used by older versionsloop.create_task(), it is rare to encounter outdated writing methods now.


5. Practical case: asynchronous web crawler

Network requests are the most typical I/O-intensive scenario. Below we useasyncio.open_connection()Handwrite a simple HTTP crawler to help you intuitively understand asynchronous network interactions. It is more recommended to use in actual projectsaiohttpWait for third-party libraries, but handwriting here allows you to see the underlying mechanism clearly.

import asyncio

async def fetch_raw_page(host: str):
    """异步获取指定域名的首页原始数据"""
    print(f"🔍 开始获取 {host} ...")
    # 建立异步 TCP 连接
    reader, writer = await asyncio.open_connection(host, 80)

    # 构造 HTTP GET 请求
    request = f"GET / HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n"
    writer.write(request.encode("utf-8"))
    # 确保请求完全发送
    await writer.drain()

    # 异步读取响应内容
    raw_response = await reader.read()
    # 关闭连接
    writer.close()
    await writer.wait_closed()

    print(f"✅ 获取 {host} 完成,接收 {len(raw_response)} 字节")
    return raw_response

async def main():
    hosts = ["www.example.com", "www.github.com", "www.python.org"]
    # 批量异步获取多个页面
    raw_pages = await asyncio.gather(*[fetch_raw_page(host) for host in hosts])
    print(f"📦 总共获取了 {len(raw_pages)} 个页面")

asyncio.run(main())

Similarly, the total time taken depends only on the slowest request, not the sum of all request times.


6. Pitfall avoidance guide and best practices

WriteasyncioHere are some of the most common mistakes to make, and be sure to avoid them in advance:

6.1 Synchronous blocking code is prohibited in coroutines

If called in a coroutinetime.sleep(), synchronousrequests.get()Or ordinary file reading and writing (open()), the entire event loop will be stuck, and all other tasks will be blocked.

✅ Correct alternative:

  • Syncsleepawait asyncio.sleep()
  • Synchronous network requests →aiohttphttpx(asynchronous mode)
  • Synchronize file reading and writing →aiofiles

6.2 Add timeout control

Many I/O operations may be stuck due to network fluctuations or server unresponsiveness. Be sure to set a timeout:

import asyncio

async def maybe_stuck_task():
    await asyncio.sleep(10)   # 假装一个会卡死的任务
    return "Success"

async def main():
    try:
        result = await asyncio.wait_for(maybe_stuck_task(), timeout=5)
        print(result)
    except asyncio.TimeoutError:
        print("⏰ 任务超时!")

asyncio.run(main())

6.3 Useasync withManage asynchronous resources

For objects that support asynchronous context managers (such asaiohttp.ClientSession), be sure to useasync withAutomatically release resources to prevent memory leaks and connection exhaustion.

6.4 Handle exceptions correctly

  • asyncio.gather()By default, exceptions from all tasks will be collected and thrown together. If you want an error in a task not to affect other tasks, you can setreturn_exceptions=True, at this time the exception will be returned normally as the return value; -Exceptions from a single Task can passtry/except await taskto capture.

7. advanced-features: Make asynchronous code more elegant (optional reading)

Mastering the following features will make you more comfortable handling complex asynchronous processes.

7.1 Asynchronous iterator:async for

Common iterator usage__iter__and__next__, asynchronous iterator needs to be implemented__aiter__(returns itself) and__anext__(Must be a coroutine):

import asyncio

class AsyncTimer:
    """异步倒计时器"""
    def __init__(self, start: int):
        self.current = start

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current <= 0:
            raise StopAsyncIteration
        await asyncio.sleep(1)
        self.current -= 1
        return self.current + 1

async def main():
    async for num in AsyncTimer(3):
        print(f"⏲️ 倒计时:{num}")

asyncio.run(main())

7.2 Asynchronous generator: more concise way of writing

Asynchronous iterators are a bit verbose to write, so useasync defmatchyieldAsynchronous generators can be quickly created:

import asyncio

async def async_timer_gen(start: int):
    """异步倒计时生成器(更简洁)"""
    for num in range(start, 0, -1):
        await asyncio.sleep(1)
        yield num

async def main():
    async for num in async_timer_gen(3):
        print(f"⏲️ 倒计时:{num}")

asyncio.run(main())

8. Summary

asyncioIt is the preferred solution for Python I/O-intensive concurrency. The core points are reviewed as follows:

  1. Useasync defDefine coroutines,awaitUsed to pause and resume coroutines;
  2. Useasyncio.run()Automatically manage top-level event loops;
  3. Batch parallel useasyncio.gather(), for flexible schedulingasyncio.create_task()
  4. Never use synchronous blocking calls in coroutines, be sure to replace them with the corresponding asynchronous library;
  5. Add timeout control and exception-handling, and useasync withManage resources.

After mastering the above content, you can easily useasyncioWrite high-performance crawlers, chatbot backends, microservice communication components, etc. Come and try it!