🛠️ Python 爬虫实战教学:豆瓣电影 Top250 同步与异步实战手册

前言: 在爬虫领域,效率即生命。当我们需要采集数万条数据时,单线程同步爬取的等待时间是无法接受的。本文将通过实战代码,带你从最基础的同步逻辑演进到极致并发的协程方案。

🛠️ 一、 核心工具栈 (The Toolkit)

本案例的核心逻辑构建在以下几个顶级库之上:

  1. 数据采集requests(同步标准库)/ aiohttp(异步高性能库)。
  2. 数据解析lxml (etree)。利用 XPath 语法直接定位 DOM 节点,解析效率远超正则表达式。
  3. 高性能存储DataRecorder。支持多线程/多进程安全写入,自动处理 Excel 文件锁和表头生成。
  4. 逻辑辅助itertools.zip_longest。解决多字段对齐问题,防止因数据缺失导致解析报错。

🧠 二、 代码逻辑深度解析

1. 任务拆分思想 (Task Decoupling)

在并发编程中,不能直接套用同步的 for 循环。我们需要将任务“原子化”:

  • 同步逻辑:一个大函数包含从请求、解析到存储的所有步骤。
  • 异步/并发逻辑:将“请求一页数据并返回”封装成独立任务,由调度中心(Pool 或 Event Loop)统一分发。

2. 多线程与多进程的差异

  • 多进程 (Multiprocessing):利用 OS 级别并行,规避 Python GIL 限制,适合 CPU 密集型任务。
  • 多线程 (Threading):在 I/O 等待(网络请求)时自动释放 GIL,资源占用更低,适合爬虫场景。

3. 协程的“分身”原理 (Awaitable)

协程利用 async/await 关键字。当 await 遇到网络延迟时,CPU 会立即跳转执行下一个任务,而不是在那儿“傻等”,这使得单线程也能跑出上千并发。


🚀 三、 全方案代码实战

1. 同步爬取:一步一个脚印

这是所有爬虫的基石,逻辑最清晰,但 I/O 阻塞严重。

# 核心片段:循环请求,阻塞等待
for j in range(10):
    res = session.get(url, headers=headers).text  # 阻塞点:等待网络响应
    tree = etree.HTML(res)
    # 解析并逐行写入
    recorder.add_data(map_)
    recorder.record()
  • 实测耗时:~7.24s

2. 多进程/多线程:任务池分发

我们将页面索引 range(10) 作为参数传递给池对象。

# 多进程 (Pool) / 多线程 (Executor) 逻辑类似
with ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(get_msg, range(10))) # 并发下发 10 个任务
  • 技术点:每个任务执行完后返回 data 列表,最后由主进程/主线程统一 recorder.record()
  • 实测耗时:~5.79s (多线程) / ~6.95s (多进程)

3. 协程:极致异步

这是目前最优雅的方案。

async def get_msg(page_index):
    async with aiohttp.ClientSession() as sess:
        async with await sess.get(url, headers=headers) as resp:
            res = await resp.text() # 遇到 I/O 自动切换
            # ... 解析逻辑 ...
  • 关键点:必须配套异步 HTTP 库 aiohttp
  • 实测耗时~5.23s

📝 四、 实战运行与优化步骤

第一步:环境配置

安装所有必需依赖,确保 Python 版本 \ge 3.6。

pip install requests aiohttp lxml DataRecorder openpyxl

第二步:初始化文件管理

在代码中加入 get_excel() 逻辑。每次运行自动清除旧文件,确保数据不重叠且 Excel 不会被占用报错。

第三步:性能调优建议

  • 存储优化:将 recorder.record() 移出高频循环。建议每爬取完一整页(25条)或全部爬取完后执行一次性写入,可极大降低磁盘 I/O 开销。
  • 并发数控制:不要盲目追求高并发。对于豆瓣等网站,建议 max_workers 或并发协程数控制在 5-10 个,避免触发反爬机制。

🚀 五、 完整实战源代码整合

1. 同步爬取方案 (基础入门)

特点:逻辑线性,适合初学者理解爬取全过程。

import os, time
from itertools import zip_longest
from lxml import etree
import requests
from DataRecorder import Recorder


def get_excel(mode):
    filename = f'top250_{mode}.xlsx'
    if os.path.exists(filename): os.remove(filename)
    recorder = Recorder(filename)
    recorder.show_msg = False
    return recorder


def run_sync():
    recorder = get_excel('同步')
    session = requests.Session()
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36'}
    for j in range(10):
        url = f'https://movie.douban.com/top250?start={j * 25}'
        res = session.get(url, headers=headers).text
        tree = etree.HTML(res)
        titles = tree.xpath('//ol[@class="grid_view"]//span[@class="title"][1]/text()')
        scores = tree.xpath('//span[@class="rating_num"]/text()')
        comments = tree.xpath('//span[@class="inq"]/text()')
        for title, score, comment in zip_longest(titles, scores, comments, fillvalue='无'):
            recorder.add_data({
                '电影名': title,
                '评分': score,
                '短评': comment
            })
        recorder.record()  
        print(f"已完成第 {j + 1} 页采集")


if __name__ == '__main__':
    start = time.time()
    run_sync()
    print(f'同步爬取耗时: {time.time() - start:.2f}秒')

2. 多线程方案 (生产首选)

特点:兼容 requests,通过 ThreadPoolExecutor 快速提速。

from concurrent.futures import ThreadPoolExecutor
import os, time
from itertools import zip_longest
from lxml import etree
import requests
from DataRecorder import Recorder

def get_excel(mode):
    filename = f'top250_{mode}.xlsx'
    if os.path.exists(filename): os.remove(filename)
    recorder = Recorder(filename)
    recorder.show_msg = False
    return recorder

def fetch_page(page_index):
    # 构造 URL
    url = f'https://movie.douban.com/top250?start={page_index*25}'
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
        'Referer': 'https://movie.douban.com/top250'
    }
    try:
        # 每个线程独立发送请求
        res = requests.get(url, headers=headers, timeout=10).text
        tree = etree.HTML(res)
        
        # 解析三个字段
        titles = tree.xpath('//ol[@class="grid_view"]//li//div[@class="hd"]/a/span[1]/text()')
        scores = tree.xpath('//span[@class="rating_num"]/text()')
        comments = tree.xpath('//span[@class="inq"]/text()')
        
        # 封装成字典列表返回
        page_data = []
        for t, s, c in zip_longest(titles, scores, comments, fillvalue='无'):
            page_data.append({'电影名': t, '评分': s, '短评': c})
        
        print(f"线程已完成第 {page_index + 1} 页抓取")
        return page_data
    except Exception as e:
        print(f"抓取第 {page_index + 1} 页失败: {e}")
        return []

if __name__ == '__main__':
    recorder = get_excel('多线程_完整版')
    start = time.time()
    
    # 1. 使用线程池并发抓取网络数据
    with ThreadPoolExecutor(max_workers=5) as executor:
        # executor.map 保证了返回结果的顺序与 range(10) 一致
        all_results = list(executor.map(fetch_page, range(10)))
    
    # 2. 抓取完成后,在内存中统一整合数据
    total_index = 1
    for page_data in all_results:
        for item in page_data:
            item['序号'] = total_index
            recorder.add_data(item)
            total_index += 1
            
    # 3. 最后【一次性】写入硬盘
    recorder.record()
    
    print(f'\n全部完成!')
    print(f'总计条数: {total_index - 1}')
    print(f'多线程极致耗时: {time.time() - start:.2f}秒')

多进程方案 (真正的并行)

特点:每个进程拥有独立的 CPU 核心支持,适合需要大量计算(如数据解密、图像处理)的爬虫任务。

import os, time
from itertools import zip_longest
from lxml import etree
import requests
from DataRecorder import Recorder
import multiprocessing # 引入多进程模块

# 1. 初始化文件(多进程下建议由主进程统一处理)
def get_excel():
    filename = 'top250_多进程.xlsx'
    if os.path.exists(filename): os.remove(filename)
    recorder = Recorder(filename)
    recorder.show_msg = False
    return recorder

# 2. 任务函数:每个进程独立执行
def fetch_page_mp(page_index):
    # 每个进程内部建议重新建立 Session,避免进程间资源竞争
    url = f'https://movie.douban.com/top250?start={page_index*25}'
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36'}
    
    try:
        res = requests.get(url, headers=headers, timeout=10).text
        tree = etree.HTML(res)
        titles = tree.xpath('//span[@class="title"][1]/text()')
        scores = tree.xpath('//span[@class="rating_num"]/text()')
        comments = tree.xpath('//span[@class="inq"]/text()')
        # 将解析后的结果作为列表返回
        page_data = []
        for t, s, c in zip_longest(titles, scores, comments, fillvalue='无'):
            page_data.append({'电影名': t, '评分': s, '短评': c})
        return page_data
    except Exception as e:
        print(f"进程 {os.getpid()} 处理第 {page_index} 页出错: {e}")
        return []

if __name__ == '__main__':
    start_time = time.time()
    recorder = get_excel()

    # 3. 创建进程池
    # processes=5 表示同时启动 5 个独立的 Python 解释器进程
    pool = multiprocessing.Pool(processes=5)
    
    # 4. 分发任务并获取结果(results 为嵌套列表)
    results = pool.map(fetch_page_mp, range(10))
    
    pool.close() # 关闭进程池,不再接受新任务
    pool.join()  # 等待所有子进程执行完毕

    # 5. 主进程统一录入数据
    total_index = 1
    for page_data in results:
        for movie in page_data:
            movie['序号'] = total_index
            recorder.add_data(movie)
            total_index += 1
    
    recorder.record() # 批量保存
    print(f'\n多进程爬取共用时:{time.time() - start_time:.2f}秒!')

3. 协程方案 (极致性能)

特点:单线程下的最高吞吐量,需使用 aiohttp

import asyncio
import aiohttp
import os, time
from itertools import zip_longest
from lxml import etree
import requests
from DataRecorder import Recorder

def get_excel(mode):
    filename = f'top250_{mode}.xlsx'
    if os.path.exists(filename): os.remove(filename)
    recorder = Recorder(filename)
    recorder.show_msg = False
    return recorder

async def fetch_async(page_index, session):
    url = f'https://movie.douban.com/top250?start={page_index*25}'
    headers = {'User-Agent': 'Mozilla/5.0...'}
    async with session.get(url, headers=headers) as resp:
        html = await resp.text()
        tree = etree.HTML(html)
        titles = tree.xpath('//ol[@class="grid_view"]//li//div[@class="hd"]/a/span[1]/text()')
        scores = tree.xpath('//span[@class="rating_num"]/text()')
        comments = tree.xpath('//span[@class="inq"]/text()')
        
        page_data = []
        for t, s, c in zip_longest(titles, scores, comments, fillvalue='无'):
            page_data.append({'电影名': t, '评分': s, '短评': c})
        return page_data

async def main_async():
    recorder = get_excel('协程')
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_async(i, session) for i in range(10)]
        # 真正意义上的同时发起请求
        pages = await asyncio.gather(*tasks)
        
    for page_data in all_pages_data:
        for item in page_data:
            recorder.add_data(item)
    recorder.record()
    print(f'协程爬取耗时: {time.time() - start:.2f}秒')

if __name__ == '__main__':
    asyncio.run(main_async())

📊 六、 总结与选型指南

场景推荐方案理由
新手练习/数据量极小同步逻辑简单,无需考虑线程安全。
中大型爬虫项目协程 (Asyncio)资源消耗极低,吞吐量最大。
包含复杂计算的项目多进程真正利用多核,防止计算卡死网络请求。
维护旧代码/不想改库多线程兼容 requests 库,提速立竿见影。