title: Basic use of aiohttp description: Python asynchronous crawler tutorial: aiohttp detailed explanation

Python asynchronous crawler tutorial: aiohttp detailed explanation

Imagine you want to scrape data from 10 websites, each of which takes exactly 1 second to respond. If you use the most familiarrequestslibrary, you will find that the program takes at least 10 seconds - the requests are "queued" one after another, and the subsequent ones can only wait. And today’s protagonistaiohttp, which allows you to initiate all requests at the same time, and the total time may be less than 1.1 seconds. This is the charm of asynchronous crawlers.

1. Overview

aiohttpis a Python asynchronous libraryasyncioThe core HTTP tool in the ecosystem, providing both client-side and server-side capabilities. In crawler development, we mainly use its client part to initiate high-concurrency, non-blocking HTTP requests.

Simply put, it makes your crawler like an efficient restaurant waiter: running to greet the next table of guests without waiting for the previous dish to be served.

1.1 Why choose it?

  • 🚀 Completely asynchronous and non-blocking: Multiple requests are processed in parallel, and the total time of 10 websites with 1 second delay may only take more than 1 second
  • 📡 Support HTTP/HTTPS/WebSocket: Whether it is an ordinary crawler or real-time data stream, it can handle it
  • 🤝 Built-in connection pool, session, proxy, cookie management: No need to reinvent the wheel yourself
  • ⚡ ** Far more than synchronization libraryrequestsPerformance**: The advantage is extremely obvious in large-scale collection scenarios

2. Basic introduction

2.1 Installation

You can install it with one line of commands. Note that the Python version should be ≥ 3.7 (lower versions are compatible with 3.6, but it is strongly recommended to upgrade):

pip install aiohttp

2.2 The first asynchronous crawler

Before writing asynchronous code, remember two core keywords:

  • async def: Define an asynchronous function, which cannot be called directly and needs to be handed over to the event loop for scheduling.
  • await: Waiting for an asynchronous operation to complete (such as sending a request, reading response data), can only appear inasync definside function

Let’s use them to grabexample.com, and print the first 200 characters:

import aiohttp
import asyncio

async def fetch(session: aiohttp.ClientSession, url: str) -> str:
    # 用 async with 发起请求,确保响应结束后自动释放连接
    async with session.get(url) as response:
        # 读取响应文本也需要 await,因为数据是异步传输的
        return await response.text()

async def main():
    # 使用 async with 创建并自动关闭 ClientSession
    # Session 内部会复用 Cookie、请求头、连接池,不要为每个请求都新建一个!
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, "https://example.com")
        print(html[:200])

# Python 3.7+ 推荐直接用 asyncio.run() 启动事件循环
if __name__ == "__main__":
    asyncio.run(main())

💡 Key Points:ClientSessionSimilar to the "incognito window" of a browser, all pages opened in it will share the environment and can be closed after use. Do not create them repeatedly.


3. Common request configurations

3.1 URL with parameters

Avoid manual splicing and escaping and pass parameters directly in dictionary formparams, aiohttp will automatically handle:

async def search_baidu(session: aiohttp.ClientSession, keyword: str) -> str:
    params = {"wd": keyword, "ie": "utf-8"}
    async with session.get("https://www.baidu.com/s", params=params) as response:
        return await response.text()

3.2 Set request header

The most common disguise of crawlers is modificationUser-Agent. Directly place the head that needs to be customizedheadersJust in the dictionary:

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
async with session.get(url, headers=headers) as response:
    ...

3.3 Timeout control

One slow request can bog down the entire application.aiohttp.ClientTimeoutYou can finely control the total timeout and the time of each stage such as connection and reading:

# 总超时 10s,连接阶段最多 2s,逐块读取最多 3s
timeout = aiohttp.ClientTimeout(total=10, connect=2, sock_read=3)
async with session.get(url, timeout=timeout) as response:
    ...

4. Supported request methods

Except for the most commonly usedGETaiohttpAlso supportedPOSTPUTDELETEetc. all standard HTTP methods.

4.1 POST request

Form submission (corresponding torequestsindata

Commonly used to simulate login or ordinary forms:

form_data = {"username": "test", "password": "123456"}
async with session.post("https://example.com/login", data=form_data) as response:
    ...

JSON submission (corresponding torequestsinjson

When interacting with the RESTful API use:

json_payload = {"name": "Alice", "age": 25, "city": "Beijing"}
async with session.post("https://example.com/api/users", json=json_payload) as response:
    ...

4.2 PUT / DELETE etc.

Usage andGETPOSTAlmost exactly the same, just replace the method name according to the needs of the interface:

# PUT 更新资源
async with session.put("https://example.com/api/users/1", json={"age": 26}) as response:
    ...

# DELETE 删除资源
async with session.delete("https://example.com/api/users/1") as response:
    ...

5. Process response data

5.1 Basic response information

Attributes such as status code and response headers can be obtained directly, but the response body must beawaitRead** because it is an asynchronous data stream.

async with session.get(url) as response:
    print(f"状态码: {response.status}")                      # 例如 200
    print(f"响应头: {response.headers}")                     # dict 类型
    print(f"Content-Type: {response.headers.get('Content-Type')}")

    # 三种最常用的读取方式
    text = await response.text()          # 文本内容
    bytes_data = await response.read()    # 二进制内容(适合图片、PDF)
    json_data = await response.json()     # 自动解析 JSON

5.2 Large file streaming download

If you want to download a file of several hundred MB, call directlyread()The entire file will be moved into memory, which may cause the program to crash. The correct approach is to read in chunks:

async def download_large_file(session: aiohttp.ClientSession, url: str, save_path: str) -> None:
    async with session.get(url) as response:
        response.raise_for_status()   # 检查状态码,不是 2xx 就抛异常
        with open(save_path, "wb") as f:
            # 每次读取 1 MB,避免一次性占用大量内存
            while chunk := await response.content.read(1024 * 1024):
                f.write(chunk)

6. Advanced practical functions

6.1 Control the number of concurrencies

Opening hundreds or thousands of concurrencies without restrictions may not only cause the IP to be blocked by the server, but also occupy the local bandwidth.asyncio.SemaphoreCan help you limit the number of simultaneous requests:

# 最多允许 10 个请求同时运行
sem = asyncio.Semaphore(10)

async def safe_fetch(session: aiohttp.ClientSession, url: str) -> str:
    # 用 async with 自动获取和释放信号量
    async with sem:
        async with session.get(url) as response:
            return await response.text()

6.2 Session persistence

ClientSessionCookies, connection pools, and default request headers are maintained internally. Configure it uniformly when creating, and all subsequent requests will automatically use it, which is very convenient:

async def persistent_session_demo() -> None:
    # 统一设置默认 cookie、UA 和超时
    session = aiohttp.ClientSession(
        cookies={"session_id": "abc123"},
        headers={"User-Agent": "MyCrawler/1.0"},
        timeout=aiohttp.ClientTimeout(total=20)
    )
    try:
        # 两次请求会自动复用 cookie 和已有连接
        async with session.get("https://example.com/page1") as r1:
            ...
        async with session.get("https://example.com/page2") as r2:
            ...
    finally:
        # 如果没用 async with 创建 session,必须手动关闭
        await session.close()

6.3 Proxy settings

pass directlyproxyParameter configuration HTTP/HTTPS proxy, also supports proxy with authentication:

# 普通代理
async with session.get(url, proxy="http://127.0.0.1:7890") as response:
    ...

# 带用户名密码的代理
async with session.get(url, proxy="http://user:pass@proxy.example.com:8080") as response:
    ...

7. Error handling

Network request anomalies are very frequent, so be sure to catch them proactively! aiohttp common exceptions are inherited fromaiohttp.ClientError, it is recommended to handle different types separately:

async def fetch_with_error_handling(session: aiohttp.ClientSession, url: str) -> str | None:
    try:
        async with session.get(url, timeout=10) as response:
            response.raise_for_status()   # 自动抛出 4xx/5xx 错误
            return await response.text()
    except aiohttp.ClientConnectorError:
        print(f"无法连接到服务器: {url}")
    except aiohttp.ClientTimeoutError:
        print(f"请求超时: {url}")
    except aiohttp.HTTPError as e:
        print(f"HTTP 错误: {e.status} - {url}")
    except aiohttp.ClientError as e:
        print(f"其他请求错误: {e} - {url}")
    return None

8. Complete practice: crawling websites in batches

Now connect the knowledge points learned above and write a real crawler: crawl 3 websites at the same time, set the concurrency limit to 2, and bring complete error handling.

import aiohttp
import asyncio

# 批量爬取的目标 URL
URLS = [
    "https://example.com",
    "https://httpbin.org/status/404",  # 故意测试 404 错误
    "https://httpbin.org/delay/2",     # 模拟 2 秒延迟
]

MAX_CONCURRENCY = 2   # 最大并发数
TIMEOUT = aiohttp.ClientTimeout(total=5)   # 单次请求总超时 5 秒

async def fetch(session: aiohttp.ClientSession, url: str, sem: asyncio.Semaphore) -> tuple[str, int | None]:
    async with sem:
        try:
            async with session.get(url, timeout=TIMEOUT) as response:
                response.raise_for_status()
                content = await response.text()
                return (url, len(content))
        except Exception as e:
            print(f"❌ 处理 {url} 失败: {str(e)}")
            return (url, None)

async def main():
    sem = asyncio.Semaphore(MAX_CONCURRENCY)
    async with aiohttp.ClientSession() as session:
        # 创建所有任务
        tasks = [fetch(session, url, sem) for url in URLS]
        # 并发执行,无论成败都会返回结果
        results = await asyncio.gather(*tasks)
        
        # 统计结果
        print("\n📊 爬取结果统计:")
        for url, length in results:
            if length:
                print(f"✅ {url}: 成功获取 {length} 字符")
            else:
                print(f"❌ {url}: 爬取失败")

if __name__ == "__main__":
    asyncio.run(main())

Run this code, and you will find that even though there is a page with a 2-second delay, the results of the other two pages are returned almost at the same time, and the total time taken is much less than the cumulative time spent on each of the three pages.


9. Summary of best practices

  1. must be reusedClientSession: Do not create a new one for every request, the initialization overhead is relatively high
  2. Be sure to control the number of concurrencies: Flexibly adjust according to the server's affordability and its own network conditions. Generally recommended is 5 to 20
  3. Set timeout reasonably: Avoid being dragged down by individual slow requests to the entire crawler
  4. Download large files in chunks: Prevent memory overflow
  5. Complete error handling: The network environment is uncontrollable, and all possible exceptions must be covered
  6. Make good use ofasync withManagement life cycle: automatically close Session and semaphore, less error-prone

10. Extended learning

If you want to learn more deeplyaiohttp, it is recommended to read its official documentation and combineasyncioStudy and thoroughly understand the event loop and asynchronous programming model:

  • aiohttp 官方文档
  • When dealing with more complex concurrent task scheduling, it can also be used withaiomultiprocessingand other libraries to further improve the execution efficiency of large batch tasks.

I hope this tutorial can help you get started quicklyaiohttp, write an efficient and stable asynchronous crawler!