使用asyncio

Python asyncio 异步编程教程

1. 概述

asyncio 是 Python 3.4 引入的标准库,提供了完整的异步 I/O 支持。Python 3.7 后,asyncio API 变得更加稳定和易用。

2. 核心概念

2.1 事件循环 (Event Loop)

asyncio 的核心是一个事件循环,负责调度和执行协程。

2.2 协程 (Coroutine)

协程是 asyncio 的基本执行单元,使用 async def 定义:

async def my_coroutine():
    # 协程代码

3. 基本用法

3.1 运行协程

Python 3.7+ 推荐使用 asyncio.run() 运行顶层协程:

import asyncio

async def hello():
    print("Hello world!")
    await asyncio.sleep(1)  # 模拟I/O操作
    print("Hello again!")

asyncio.run(hello())

3.2 async/await 语法

  • async def: 定义协程函数
  • await: 等待协程或异步操作完成

4. 并发执行

4.1 使用 asyncio.gather()

async def hello(name):
    print(f"Hello {name}!")
    await asyncio.sleep(1)
    print(f"Hello {name} again!")
    return name

async def main():
    results = await asyncio.gather(
        hello("Alice"),
        hello("Bob"),
        hello("Charlie")
    )
    print(results)  # ['Alice', 'Bob', 'Charlie']

asyncio.run(main())

4.2 使用 asyncio.create_task()

async def task(name, delay):
    print(f"Task {name} started")
    await asyncio.sleep(delay)
    print(f"Task {name} completed")
    return name

async def main():
    task1 = asyncio.create_task(task("A", 2))
    task2 = asyncio.create_task(task("B", 1))
    
    results = await asyncio.gather(task1, task2)
    print(f"All tasks completed: {results}")

asyncio.run(main())

5. 网络请求示例

import asyncio

async def fetch_url(url):
    print(f"Fetching {url}...")
    reader, writer = await asyncio.open_connection(url, 80)
    
    request = f"GET / HTTP/1.1\r\nHost: {url}\r\nConnection: close\r\n\r\n"
    writer.write(request.encode())
    await writer.drain()
    
    response = await reader.read()
    writer.close()
    await writer.wait_closed()
    
    print(f"Finished {url}, received {len(response)} bytes")
    return response

async def main():
    urls = ["www.python.org", "www.github.com", "www.example.com"]
    results = await asyncio.gather(*[fetch_url(url) for url in urls])
    print(f"Fetched {len(results)} pages")

asyncio.run(main())

6. 最佳实践

  1. 避免阻塞操作:不要在协程中使用同步阻塞操作
  2. 使用 async with:对于支持异步上下文管理的对象
  3. 异常处理:使用 try/except 捕获协程中的异常
  4. 超时控制:使用 asyncio.wait_for() 设置超时
async def long_running_task():
    try:
        await asyncio.sleep(10)
        return "Done"
    except asyncio.CancelledError:
        print("Task was cancelled")
        raise

async def main():
    try:
        result = await asyncio.wait_for(long_running_task(), timeout=5)
        print(result)
    except asyncio.TimeoutError:
        print("Task timed out")

asyncio.run(main())

7. 高级特性

7.1 异步迭代器

class AsyncCounter:
    def __init__(self, stop):
        self.current = 0
        self.stop = stop

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.stop:
            raise StopAsyncIteration
        await asyncio.sleep(0.5)
        self.current += 1
        return self.current

async def main():
    async for num in AsyncCounter(5):
        print(num)

asyncio.run(main())

7.2 异步生成器

async def async_gen(n):
    for i in range(n):
        await asyncio.sleep(0.5)
        yield i

async def main():
    async for item in async_gen(5):
        print(item)

asyncio.run(main())

8. 总结

asyncio 提供了强大的异步编程能力,适用于 I/O 密集型应用。关键点:

  1. 使用 async/await 语法定义协程
  2. 使用 asyncio.run() 运行主协程
  3. 使用 asyncio.gather()asyncio.create_task() 实现并发
  4. 避免阻塞操作,使用异步替代方案

通过合理使用 asyncio,可以显著提高 Python 程序的并发性能。