Spider Middleware完全指南 - 数据预处理与后处理技术详解

📂 所属阶段:第五阶段 — 战力升级(分布式与进阶篇)
🔗 相关章节:Downloader Middleware · Pipeline管道实战


目录


核心基础与位置

Spider Middleware是Scrapy引擎与Spider之间的钩子组件,用于拦截两类数据:

  1. 到达Spider前的响应Response(预处理)
  2. Spider生成后离开的Items/Requests(后处理)

Scrapy请求处理简化流程(突出核心位置): Engine → Scheduler → [Downloader] → Engine → 输入层 → Spider → 输出层 → Engine → [Pipeline/Scheduler]


配置与极简生命周期

配置说明(优先级表格)

优先级范围:0-1000,数字越小越先处理输入、先处理输出

配置项说明
SPIDER_MIDDLEWARES字典格式,键为中间件路径,值为优先级
# settings.py 示例
SPIDER_MIDDLEWARES = {
    # 先处理输入(反爬→编码),先处理输出(增强→去重)
    'myproject.middlewares.AntiCrawlEncodingFixMiddleware': 400,
    'myproject.middlewares.EnrichCleanMiddleware': 450,
    'myproject.middlewares.ClassifiedExceptionMiddleware': 500,
    'myproject.middlewares.DistributedPreDedupMiddleware': 600,
}

三大核心方法调用规则

📌 记住核心返回值规则,避免踩坑:

  1. process_spider_input(response, spider):响应到达Spider前触发
    • 返回None:继续传递
    • 抛出异常:中断→跳转到异常处理
  2. process_spider_output(response, result, spider):Spider生成结果后触发
    • 必须返回可迭代对象(生成器优先,节省内存)
  3. process_spider_exception(response, exception, spider):输入/Spider内部抛异常时触发
    • 返回None:继续传递异常
    • 返回可迭代对象:异常处理完成

三大核心方法实战

3.1 输入预处理:反爬检测+编码修正

合并两个高频场景,快速过滤无效响应并修正编码:

import chardet
import re
from scrapy.exceptions import IgnoreRequest

class AntiCrawlEncodingFixMiddleware:
    """输入预处理:反爬检测+编码修正(极简实用版)"""
    def process_spider_input(self, response, spider):
        # 1. 快速反爬检测(10KB以内响应优先检查)
        if len(response.body) < 10240:
            anti_indicators = ['验证码', 'blocked', '访问频繁', 'forbidden']
            if any(i in response.text.lower() for i in anti_indicators):
                spider.logger.warning(f"反爬触发跳过:{response.url}")
                raise IgnoreRequest("Anti-crawling detected")
        
        # 2. 编码修正(置信度>90%覆盖声明编码)
        detect = chardet.detect(response.body)
        if detect['confidence'] > 0.9:
            response._encoding = detect['encoding']  # 临时覆盖,后续解析生效
            response.meta['fixed_encoding'] = detect['encoding']
        return None

3.2 输出后处理:数据增强+文本清洗

合并基础增强(去重标记、来源、时间)和文本清洗(去多余空格):

import time
import hashlib
from itemadapter import ItemAdapter
from scrapy.item import Item

class EnrichCleanMiddleware:
    """输出后处理:数据增强+文本清洗(极简实用版)"""
    def process_spider_output(self, response, result, spider):
        for obj in result:
            if isinstance(obj, (dict, Item)):
                # 文本清洗
                self._clean_text(obj)
                # 数据增强
                self._enrich_meta(obj, response)
                yield obj
            else:
                yield obj  # Request直接返回

    def _clean_text(self, obj):
        adapter = ItemAdapter(obj)
        for k, v in adapter.items():
            if isinstance(v, str):
                adapter[k] = re.sub(r'\s+', ' ', v.strip())  # 去换行、连续空格

    def _enrich_meta(self, obj, response):
        adapter = ItemAdapter(obj)
        # 加MD5去重标记(依赖sorted保证稳定性)
        content = str(sorted(adapter.asdict().items())).encode()
        adapter['_item_md5'] = hashlib.md5(content).hexdigest()
        # 加来源和时间
        adapter['_source_url'] = response.url
        adapter['_crawl_time'] = time.strftime('%Y-%m-%d %H:%M:%S')

3.3 异常分类处理:重试+跳过

处理最常见的两类异常,避免爬虫中断:

from scrapy.http import Request

class ClassifiedExceptionMiddleware:
    """异常分类处理(带配置项的极简版)"""
    def __init__(self, max_retry=2):
        self.max_retry = max_retry

    @classmethod
    def from_crawler(cls, crawler):
        return cls(max_retry=crawler.settings.getint('SM_MAX_RETRY', 2))

    def process_spider_exception(self, response, exception, spider):
        exc_name = type(exception).__name__
        spider.logger.error(f"{exc_name} 处理失败:{response.url} | {exception}")

        # 1. 网络相关异常:重试
        if exc_name in ['ConnectionError', 'TimeoutError', 'ConnectTimeout']:
            retry_cnt = response.meta.get('retry_cnt', 0) + 1
            if retry_cnt <= self.max_retry:
                spider.logger.info(f"第{retry_cnt}次重试:{response.url}")
                return [response.request.replace(dont_filter=True, meta={'retry_cnt': retry_cnt})]
        # 2. 解析相关异常:跳过
        elif exc_name in ['ValueError', 'KeyError', 'IndexError']:
            spider.logger.warning(f"跳过解析失败页面:{response.url}")
            return []
        # 3. 其他异常:继续传递
        return None

高频场景:分布式前置去重

Pipeline去重已经太晚(浪费了Spider的处理资源),用Spider Middleware的输出层前置去重更高效,依赖前面的_item_md5和Redis实现:

import redis
from scrapy.exceptions import IgnoreRequest
from itemadapter import ItemAdapter

class DistributedPreDedupMiddleware:
    """分布式前置去重(Redis实现,节省处理资源)"""
    def __init__(self, redis_url, dedup_key, expire_days=7):
        self.redis = redis.from_url(redis_url)
        self.dedup_key = dedup_key
        self.expire_days = expire_days

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            redis_url=crawler.settings.get('REDIS_URL', 'redis://localhost:6379'),
            dedup_key=crawler.settings.get('SM_DEDUP_KEY', 'spider:item_dedup:md5'),
            expire_days=crawler.settings.getint('SM_DEDUP_EXPIRE', 7)
        )

    def process_spider_output(self, response, result, spider):
        for obj in result:
            if isinstance(obj, (dict, Item)):
                # 检查是否已存在
                item_md5 = ItemAdapter(obj).get('_item_md5')
                if item_md5 and self.redis.sismember(self.dedup_key, item_md5):
                    spider.logger.debug(f"前置去重跳过:{response.url}")
                    continue
                # 不存在则添加
                self.redis.sadd(self.dedup_key, item_md5)
                self.redis.expire(self.dedup_key, 86400 * self.expire_days)
            yield obj

避坑指南&最佳实践

⚠️ 避坑指南(Top3)

  1. 优先级搞反
    • 输入层:先反爬→再编码→优先级400(反爬)< 500(编码)
    • 输出层:先增强→再去重→优先级400(增强)< 500(去重)
  2. 内存泄漏
    • 不要存大量实例/全局缓存(如需缓存,用functools.lru_cache或Redis)
    • 输出处理用生成器,别转成列表
  3. 异常吞掉
    • 别随便返回可迭代对象,除非明确处理逻辑
    • 返回前必须记录日志

🎯 最佳实践(Top4)

  1. 单一职责:每个中间件只做1-2件事(比如反爬+编码,不要混OCR)
  2. 可配置:把参数放到settings.py(重试次数、Redis地址)
  3. 前置去重:用输出层做Item前置去重,比Pipeline省资源
  4. 轻量处理:CPU密集型操作扔到Pipeline或异步任务(如Celery)

🔗 相关教程推荐

🏷️ 标签云: Scrapy Spider Middleware 数据预处理 数据后处理 异常处理 网络爬虫