Python 异步爬虫教程:aiohttp 详解

1. 概述

aiohttp 是一个基于 asyncio 的异步 HTTP 网络库,提供了完整的客户端和服务器端实现。在爬虫开发中,我们主要使用其客户端功能来发起异步 HTTP 请求。

1.1 为什么选择 aiohttp?

  • 完全异步支持,性能优异
  • 同时支持 HTTP 和 WebSocket
  • 客户端和服务器端一体化
  • 完善的连接池管理
  • 支持代理、Cookie、Session 等高级功能

2. 基本使用

2.1 安装

pip install aiohttp

2.2 简单示例

import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'https://example.com')
        print(html[:200])  # 打印前200个字符

# Python 3.7+ 推荐使用
asyncio.run(main())

# 兼容旧版本写法
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())

2.3 关键点说明

  1. 必须使用 async with 创建 ClientSession
  2. 所有网络操作都需要 await
  3. 响应内容获取方法需要 await(如 response.text()
  4. 建议使用 asyncio.run() 运行主函数(Python 3.7+)

3. 请求参数设置

3.1 URL 参数

params = {'key1': 'value1', 'key2': 'value2'}
async with session.get('https://example.com', params=params) as response:
    ...

3.2 请求头设置

headers = {'User-Agent': 'MyApp/1.0'}
async with session.get('https://example.com', headers=headers) as response:
    ...

3.3 超时设置

timeout = aiohttp.ClientTimeout(total=10)  # 10秒总超时
async with session.get('https://example.com', timeout=timeout) as response:
    ...

4. 不同请求方法

4.1 POST 请求

表单提交

data = {'username': 'admin', 'password': '123456'}
async with session.post('https://example.com/login', data=data) as response:
    ...

JSON 提交

json_data = {'name': 'John', 'age': 30}
async with session.post('https://example.com/api', json=json_data) as response:
    ...

4.2 PUT/DELETE 请求

# PUT 请求
async with session.put('https://example.com/resource', data=b'data') as response:
    ...

# DELETE 请求
async with session.delete('https://example.com/resource') as response:
    ...

5. 处理响应

5.1 获取响应内容

async with session.get('https://example.com') as response:
    print(response.status)  # 状态码
    print(await response.text())  # 文本内容
    print(await response.read())  # 二进制内容
    print(await response.json())  # JSON 内容
    print(response.headers)  # 响应头

5.2 流式响应

对于大文件下载,可以使用流式处理:

async with session.get('https://example.com/large_file') as response:
    with open('large_file', 'wb') as fd:
        while True:
            chunk = await response.content.read(1024)
            if not chunk:
                break
            fd.write(chunk)

6. 高级功能

6.1 并发控制

使用信号量控制并发数:

semaphore = asyncio.Semaphore(10)  # 最大并发10

async def fetch(url):
    async with semaphore:
        async with session.get(url) as response:
            return await response.text()

6.2 会话持久化

async with aiohttp.ClientSession(
    cookies={'my_cookie': 'value'},
    headers={'User-Agent': 'MyApp/1.0'},
    timeout=aiohttp.ClientTimeout(total=30)
) as session:
    # 所有请求共享相同的cookies和headers
    ...

6.3 代理设置

async with session.get('https://example.com', 
                      proxy='http://proxy.example.com') as response:
    ...

7. 错误处理

try:
    async with session.get('https://example.com', timeout=10) as response:
        response.raise_for_status()  # 检查HTTP错误
        return await response.text()
except aiohttp.ClientError as e:
    print(f"请求错误: {e}")
except asyncio.TimeoutError:
    print("请求超时")

8. 最佳实践

  1. 重用 ClientSession:不要在每次请求时都创建新会话
  2. 合理设置超时:避免请求长时间挂起
  3. 控制并发量:防止对目标服务器造成过大压力
  4. 处理异常:网络请求可能因各种原因失败
  5. 资源清理:确保正确关闭响应和会话

9. 完整示例

import aiohttp
import asyncio

async def fetch(session, url, semaphore):
    async with semaphore:
        try:
            async with session.get(url, timeout=10) as response:
                response.raise_for_status()
                return await response.text()
        except Exception as e:
            print(f"Error fetching {url}: {str(e)}")
            return None

async def main():
    urls = [
        'https://example.com/page1',
        'https://example.com/page2',
        'https://example.com/page3'
    ]
    
    semaphore = asyncio.Semaphore(5)  # 最大并发5
    connector = aiohttp.TCPConnector(limit=100)  # 连接池大小
    
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [fetch(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for url, content in zip(urls, results):
            if content:
                print(f"{url} - {len(content)} bytes")

if __name__ == '__main__':
    asyncio.run(main())

10. 总结

aiohttp 是 Python 异步爬虫开发的重要工具,相比传统的 requests 库,它能提供更高的并发性能。通过合理使用会话管理、并发控制和错误处理,可以构建出高效稳定的异步爬虫程序。

更多高级用法和详细配置请参考 aiohttp 官方文档