Spider Middleware完全指南 - 数据预处理与后处理技术详解

📂 所属阶段:第五阶段 — 战力升级(分布式与进阶篇)
🔗 相关章节:Downloader Middleware · Pipeline管道实战 · 分布式去重与调度

目录

Spider Middleware基础概念

Spider Middleware是Scrapy框架中位于Engine和Spider之间的组件,用于拦截Spider的输入响应和输出Items/Requests。它在数据进入Spider之前和离开Spider之后提供处理机会。

Spider Middleware的作用与优势

"""
Spider Middleware的主要作用:
1. 输入预处理:在响应到达Spider之前处理响应
2. 输出后处理:在Items/Requests离开Spider后处理
3. 异常处理:处理Spider处理过程中的异常
4. 信号拦截:监听和响应爬虫事件
5. 数据过滤:过滤不需要的Items或Requests
"""

Spider Middleware与其他组件的关系

"""
Scrapy请求处理流程:
Engine -> Scheduler -> Engine -> Downloader -> Engine -> 
Spider Middleware -> Spider -> Spider Middleware -> Engine -> Pipeline
"""

Spider Middleware生命周期

Spider Middleware具有完整的生命周期方法,可以在不同阶段对响应和结果进行处理。

基础Spider Middleware结构

class BaseSpiderMiddleware:
    """
    基础Spider Middleware示例
    """
    
    def __init__(self):
        """
        初始化方法,在爬虫启动时调用一次
        """
        pass
    
    @classmethod
    def from_crawler(cls, crawler):
        """
        从crawler实例创建middleware的方法
        用于访问settings等配置信息
        """
        return cls()
    
    def process_spider_input(self, response, spider):
        """
        处理Spider输入的方法
        在响应被Spider处理之前调用
        """
        # 返回None表示继续处理
        # 抛出异常表示停止处理
        return None
    
    def process_spider_output(self, response, result, spider):
        """
        处理Spider输出的方法
        在Spider产生Items/Requests后调用
        """
        # 必须返回可迭代对象
        for item_or_request in result:
            yield item_or_request
    
    def process_spider_exception(self, response, exception, spider):
        """
        处理异常的方法
        在Spider处理过程中发生异常时调用
        """
        # 返回None表示异常未处理,交给下一个中间件
        # 返回可迭代对象表示异常已处理
        return None

Spider Middleware配置

# settings.py
SPIDER_MIDDLEWARES = {
    # 格式:'路径.到.中间件类': 优先级数字
    'myproject.middlewares.CustomSpiderMiddleware': 543,
    'myproject.middlewares.SignalMiddleware': 544,
}

# 优先级说明:
# 数字越小,优先级越高,越先执行
# Scrapy内置中间件的优先级范围:0-1000
# 自定义中间件建议使用500-1000之间的数字

输入处理方法

process_spider_input方法在响应被Spider处理之前调用,用于预处理响应数据。

基础输入处理

class BasicInputMiddleware:
    """
    基础输入处理中间件
    """
    
    def process_spider_input(self, response, spider):
        """
        基础输入处理逻辑
        """
        spider.logger.info(f"Processing response: {response.url}")
        
        # 检查响应状态
        if response.status != 200:
            spider.logger.warning(f"Response status {response.status} for {response.url}")
        
        # 检查响应内容类型
        content_type = response.headers.get('Content-Type', b'').decode('utf-8').lower()
        if 'text/html' not in content_type:
            spider.logger.debug(f"Non-HTML content: {content_type}")
        
        # 添加自定义元数据到响应
        response.meta['processed_by'] = 'BasicInputMiddleware'
        response.meta['process_time'] = time.time()
        
        return None

响应预处理

import json
import re

class ResponsePreprocessingMiddleware:
    """
    响应预处理中间件
    """
    
    def process_spider_input(self, response, spider):
        """
        响应预处理逻辑
        """
        # 检查是否为JSON响应
        content_type = response.headers.get('Content-Type', b'').decode('utf-8').lower()
        
        if 'application/json' in content_type:
            try:
                # 预解析JSON内容
                json_data = json.loads(response.text)
                response.meta['parsed_json'] = json_data
            except json.JSONDecodeError:
                spider.logger.error(f"Failed to parse JSON response: {response.url}")
        
        # 检查是否为XML响应
        elif 'application/xml' in content_type or 'text/xml' in content_type:
            try:
                # 预处理XML内容
                response.meta['xml_content'] = response.text
            except Exception as e:
                spider.logger.error(f"Failed to process XML response: {e}")
        
        # 检查页面编码
        if response.encoding != 'utf-8':
            spider.logger.debug(f"Response encoding: {response.encoding}")
        
        # 检查页面大小
        content_length = len(response.body)
        if content_length > 10 * 1024 * 1024:  # 10MB
            spider.logger.warning(f"Large response size: {content_length} bytes")
        
        return None

输入验证中间件

from scrapy.exceptions import IgnoreRequest

class InputValidationMiddleware:
    """
    输入验证中间件
    """
    
    def process_spider_input(self, response, spider):
        """
        验证输入响应的有效性
        """
        # 检查响应大小(防止过大响应)
        if len(response.body) > spider.crawler.settings.getint('MAX_RESPONSE_SIZE', 10 * 1024 * 1024):
            raise IgnoreRequest("Response too large")
        
        # 检查响应内容
        response_text = response.text.lower()
        
        # 检查反爬页面
        anti_crawl_indicators = [
            '访问过于频繁', '请稍后重试', 'blocked', 'forbidden',
            '验证码', 'captcha', 'rate limit', 'too many requests'
        ]
        
        if any(indicator in response_text for indicator in anti_crawl_indicators):
            spider.logger.warning(f"Anti-crawling detected for {response.url}")
            # 可以选择抛出异常或记录日志
            response.meta['anti_crawl_detected'] = True
        
        # 检查页面完整性
        if '<html' not in response_text or '</html>' not in response_text:
            spider.logger.warning(f"Potentially incomplete HTML for {response.url}")
        
        return None

输出处理方法

process_spider_output方法在Spider产生Items/Requests后调用,用于后处理输出数据。

基础输出处理

from itemadapter import ItemAdapter

class BasicOutputMiddleware:
    """
    基础输出处理中间件
    """
    
    def process_spider_output(self, response, result, spider):
        """
        基础输出处理逻辑
        """
        for item_or_request in result:
            if isinstance(item_or_request, dict):
                # 处理字典类型的Item
                item_or_request['processed_at'] = time.time()
                item_or_request['source_url'] = response.url
                yield item_or_request
            elif hasattr(item_or_request, 'fields'):  # Scrapy Item
                # 处理Scrapy Item
                item_adapter = ItemAdapter(item_or_request)
                item_adapter['processed_at'] = time.time()
                item_adapter['source_url'] = response.url
                yield item_or_request
            else:
                # 处理Request
                item_or_request.meta['processed_at'] = time.time()
                yield item_or_request

数据过滤中间件

class DataFilteringMiddleware:
    """
    数据过滤中间件
    """
    
    def __init__(self):
        self.filtered_count = 0
    
    def process_spider_output(self, response, result, spider):
        """
        过滤不需要的数据
        """
        for item_or_request in result:
            if self._should_filter(item_or_request, spider):
                self.filtered_count += 1
                spider.logger.debug(f"Filtered item: {item_or_request}")
                continue
            
            yield item_or_request
    
    def _should_filter(self, item_or_request, spider):
        """
        判断是否应该过滤该项目
        """
        if isinstance(item_or_request, dict):
            # 过滤空数据
            if not item_or_request:
                return True
            
            # 过滤包含特定关键词的数据
            filter_keywords = spider.crawler.settings.getlist('FILTER_KEYWORDS', [])
            for keyword in filter_keywords:
                if any(keyword.lower() in str(value).lower() 
                       for value in item_or_request.values() if isinstance(value, str)):
                    return True
        
        elif hasattr(item_or_request, 'fields'):
            # 过滤Scrapy Item
            adapter = ItemAdapter(item_or_request)
            if not adapter.asdict():
                return True
        
        return False

数据增强中间件

import hashlib

class DataEnrichmentMiddleware:
    """
    数据增强中间件
    """
    
    def process_spider_output(self, response, result, spider):
        """
        增强数据,添加额外信息
        """
        for item_or_request in result:
            if isinstance(item_or_request, dict):
                # 添加数据增强信息
                enriched_item = self._enrich_item(item_or_request, response)
                yield enriched_item
            elif hasattr(item_or_request, 'fields'):
                # 处理Scrapy Item
                adapter = ItemAdapter(item_or_request)
                enriched_adapter = self._enrich_item(adapter.asdict(), response)
                
                # 更新原始Item
                for key, value in enriched_adapter.items():
                    adapter[key] = value
                
                yield item_or_request
            else:
                # 处理Request
                item_or_request.meta['enriched_at'] = time.time()
                yield item_or_request
    
    def _enrich_item(self, item_dict, response):
        """
        增强单个项目
        """
        enriched = item_dict.copy()
        
        # 添加来源信息
        enriched['source_url'] = response.url
        enriched['crawl_timestamp'] = time.time()
        enriched['crawl_date'] = time.strftime('%Y-%m-%d %H:%M:%S')
        
        # 添加数据唯一标识
        content_str = str(sorted(item_dict.items()))
        enriched['data_hash'] = hashlib.md5(content_str.encode()).hexdigest()
        
        # 添加响应相关信息
        enriched['response_status'] = response.status
        enriched['response_size'] = len(response.body)
        
        # 添加分类信息(如果适用)
        if 'url' in enriched:
            enriched['domain'] = self._extract_domain(enriched['url'])
        
        return enriched
    
    def _extract_domain(self, url):
        """
        从URL提取域名
        """
        from urllib.parse import urlparse
        parsed = urlparse(url)
        return parsed.netloc

异常处理方法

process_spider_exception方法在Spider处理过程中发生异常时调用。

基础异常处理

import traceback

class BasicExceptionMiddleware:
    """
    基础异常处理中间件
    """
    
    def process_spider_exception(self, response, exception, spider):
        """
        处理Spider异常的基础方法
        """
        spider.logger.error(f"Spider exception in {response.url}: {exception}")
        spider.logger.error(f"Traceback: {traceback.format_exc()}")
        
        # 根据异常类型决定处理方式
        if isinstance(exception, ValueError):
            # 对于ValueError,尝试返回一个空结果
            spider.logger.warning(f"ValueError handled for {response.url}")
            return []
        elif isinstance(exception, KeyError):
            # 对于KeyError,记录并继续
            spider.logger.warning(f"KeyError handled for {response.url}")
            return []
        
        # 对于其他异常,返回None让其他中间件处理
        return None

高级异常处理

from scrapy.http import Request

class AdvancedExceptionMiddleware:
    """
    高级异常处理中间件
    """
    
    def process_spider_exception(self, response, exception, spider):
        """
        高级异常处理逻辑
        """
        exception_type = type(exception).__name__
        
        spider.logger.error(f"Exception {exception_type} in {response.url}: {str(exception)}")
        
        # 根据异常类型进行不同处理
        if self._is_network_related_exception(exception):
            return self._handle_network_exception(response, exception, spider)
        elif self._is_parsing_exception(exception):
            return self._handle_parsing_exception(response, exception, spider)
        elif self._is_data_validation_exception(exception):
            return self._handle_validation_exception(response, exception, spider)
        
        # 如果无法处理特定异常,记录并返回空结果
        spider.logger.error(f"Unhandled exception type: {exception_type}")
        return []
    
    def _is_network_related_exception(self, exception):
        """
        检查是否为网络相关异常
        """
        network_exceptions = [
            'ConnectionError', 'TimeoutError', 'ConnectTimeout',
            'ReadTimeout', 'TooManyRedirects'
        ]
        return type(exception).__name__ in network_exceptions
    
    def _is_parsing_exception(self, exception):
        """
        检查是否为解析相关异常
        """
        parsing_exceptions = [
            'ValueError', 'TypeError', 'AttributeError', 
            'IndexError', 'KeyError'
        ]
        return type(exception).__name__ in parsing_exceptions
    
    def _is_data_validation_exception(self, exception):
        """
        检查是否为数据验证相关异常
        """
        validation_exceptions = [
            'ValidationError', 'AssertionError'
        ]
        return type(exception).__name__ in validation_exceptions
    
    def _handle_network_exception(self, response, exception, spider):
        """
        处理网络相关异常
        """
        spider.logger.warning(f"Network exception, scheduling retry for {response.url}")
        
        # 返回一个新的请求进行重试
        retry_request = response.request.replace(
            dont_filter=True,
            meta={
                **response.request.meta,
                'retry_count': response.request.meta.get('retry_count', 0) + 1
            }
        )
        
        return [retry_request]
    
    def _handle_parsing_exception(self, response, exception, spider):
        """
        处理解析相关异常
        """
        spider.logger.warning(f"Parsing exception, returning empty result for {response.url}")
        
        # 返回空结果,但记录异常信息
        response.meta['parsing_error'] = str(exception)
        return []
    
    def _handle_validation_exception(self, response, exception, spider):
        """
        处理数据验证相关异常
        """
        spider.logger.warning(f"Validation exception, skipping invalid data from {response.url}")
        
        # 返回空结果,表示跳过无效数据
        return []

信号系统集成

Spider Middleware可以与Scrapy的信号系统集成,监听爬虫生命周期事件。

信号中间件基础

from scrapy import signals
from scrapy.exceptions import NotConfigured

class SignalMiddleware:
    """
    信号处理中间件
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.stats = crawler.stats
        
        # 连接信号
        crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)
        crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)
        crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
        crawler.signals.connect(self.request_scheduled, signal=signals.request_scheduled)
        crawler.signals.connect(self.response_received, signal=signals.response_received)
        crawler.signals.connect(self.item_scraped, signal=signals.item_scraped)
    
    @classmethod
    def from_crawler(cls, crawler):
        """
        从crawler实例创建中间件
        """
        return cls(crawler)
    
    def spider_opened(self, spider):
        """
        爬虫开启时的处理
        """
        spider.logger.info(f"Spider {spider.name} opened")
        self.stats.inc_value('spider/opened_count')
    
    def spider_closed(self, spider, reason):
        """
        爬虫关闭时的处理
        """
        spider.logger.info(f"Spider {spider.name} closed: {reason}")
        self.stats.inc_value('spider/closed_count')
    
    def spider_idle(self, spider):
        """
        爬虫空闲时的处理
        """
        spider.logger.debug(f"Spider {spider.name} is idle")
        self.stats.inc_value('spider/idle_count')
    
    def request_scheduled(self, request, spider):
        """
        请求调度时的处理
        """
        spider.logger.debug(f"Request scheduled: {request.url}")
        self.stats.inc_value('requests/scheduled')
    
    def response_received(self, response, request, spider):
        """
        响应接收时的处理
        """
        spider.logger.debug(f"Response received: {response.url} (Status: {response.status})")
        self.stats.inc_value('responses/received')
        self.stats.inc_value(f'responses/status_count/{response.status}')
    
    def item_scraped(self, item, response, spider):
        """
        Item被抓取时的处理
        """
        spider.logger.debug(f"Item scraped from {response.url}")
        self.stats.inc_value('items/scraped')

高级信号处理

import time
from scrapy import signals
from collections import defaultdict, deque

class AdvancedSignalMiddleware:
    """
    高级信号处理中间件
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.stats = crawler.stats
        self.performance_metrics = defaultdict(deque)
        self.start_time = time.time()
        
        # 连接信号
        self._connect_signals()
    
    def _connect_signals(self):
        """
        连接所有需要的信号
        """
        signal_connections = [
            (self.spider_opened, signals.spider_opened),
            (self.spider_closed, signals.spider_closed),
            (self.request_scheduled, signals.request_scheduled),
            (self.response_received, signals.response_received),
            (self.item_scraped, signals.item_scraped),
            (self.request_dropped, signals.request_dropped),
            (self.response_downloaded, signals.response_downloaded),
        ]
        
        for handler, signal in signal_connections:
            self.crawler.signals.connect(handler, signal=signal)
    
    def spider_opened(self, spider):
        """
        爬虫开启时初始化性能监控
        """
        spider.logger.info(f"Performance monitoring started for {spider.name}")
        self.stats.set_value('performance/start_time', time.time())
    
    def spider_closed(self, spider, reason):
        """
        爬虫关闭时输出性能报告
        """
        total_time = time.time() - self.start_time
        spider.logger.info(f"Performance report for {spider.name}:")
        spider.logger.info(f"  Total time: {total_time:.2f}s")
        
        # 输出统计信息
        requests_count = self.stats.get_value('requests/scheduled', 0)
        responses_count = self.stats.get_value('responses/received', 0)
        items_count = self.stats.get_value('items/scraped', 0)
        
        if total_time > 0:
            spider.logger.info(f"  Requests/sec: {requests_count/total_time:.2f}")
            spider.logger.info(f"  Responses/sec: {responses_count/total_time:.2f}")
            spider.logger.info(f"  Items/sec: {items_count/total_time:.2f}")
    
    def request_scheduled(self, request, spider):
        """
        记录请求调度时间
        """
        request.meta['scheduled_time'] = time.time()
        self.stats.inc_value('requests/scheduled')
    
    def response_received(self, response, request, spider):
        """
        处理响应接收事件
        """
        scheduled_time = request.meta.get('scheduled_time', time.time())
        download_time = time.time() - scheduled_time
        
        # 记录下载时间
        self.performance_metrics['download_times'].append(download_time)
        self.stats.inc_value('responses/received')
        self.stats.inc_value('responses/download_time', download_time)
    
    def response_downloaded(self, response, request, spider):
        """
        处理响应下载事件
        """
        download_start_time = request.meta.get('download_start_time', time.time())
        actual_download_time = time.time() - download_start_time
        
        self.stats.inc_value('responses/actual_download_time', actual_download_time)
    
    def item_scraped(self, item, response, spider):
        """
        处理Item抓取事件
        """
        self.stats.inc_value('items/scraped')
        
        # 记录抓取延迟
        if hasattr(response, 'meta') and 'scheduled_time' in response.meta:
            scraping_time = time.time() - response.meta['scheduled_time']
            self.stats.inc_value('items/scraping_time', scraping_time)
    
    def request_dropped(self, request, response, spider):
        """
        处理请求被丢弃事件
        """
        self.stats.inc_value('requests/dropped')
        spider.logger.warning(f"Request dropped: {request.url}")
    
    @classmethod
    def from_crawler(cls, crawler):
        """
        从crawler实例创建中间件
        """
        return cls(crawler)

数据预处理技术

数据预处理是在数据进入Spider之前进行的处理,可以提高数据质量和处理效率。

内容预处理中间件

import re
import json
from lxml import etree

class ContentPreprocessingMiddleware:
    """
    内容预处理中间件
    """
    
    def process_spider_input(self, response, spider):
        """
        预处理响应内容
        """
        content_type = response.headers.get('Content-Type', b'').decode('utf-8').lower()
        
        if 'application/json' in content_type:
            self._preprocess_json(response, spider)
        elif 'text/html' in content_type:
            self._preprocess_html(response, spider)
        elif 'application/xml' in content_type or 'text/xml' in content_type:
            self._preprocess_xml(response, spider)
        
        return None
    
    def _preprocess_json(self, response, spider):
        """
        预处理JSON响应
        """
        try:
            data = json.loads(response.text)
            
            # 预处理JSON数据
            if isinstance(data, dict):
                # 扁平化嵌套结构
                flattened = self._flatten_dict(data)
                response.meta['flattened_json'] = flattened
            
            response.meta['json_data'] = data
            response.meta['json_processed'] = True
            
        except json.JSONDecodeError as e:
            spider.logger.error(f"JSON decode error: {e}")
            response.meta['json_processed'] = False
    
    def _preprocess_html(self, response, spider):
        """
        预处理HTML响应
        """
        try:
            # 解析HTML文档
            tree = etree.HTML(response.text)
            
            # 提取基本信息
            title = tree.xpath('//title/text()')
            response.meta['page_title'] = title[0] if title else ''
            
            # 检查页面结构
            links = tree.xpath('//a/@href')
            response.meta['link_count'] = len(links)
            
            # 检查是否为列表页或详情页
            if self._is_list_page(tree):
                response.meta['page_type'] = 'list'
            elif self._is_detail_page(tree):
                response.meta['page_type'] = 'detail'
            else:
                response.meta['page_type'] = 'other'
            
            response.meta['html_processed'] = True
            
        except Exception as e:
            spider.logger.error(f"HTML preprocessing error: {e}")
            response.meta['html_processed'] = False
    
    def _preprocess_xml(self, response, spider):
        """
        预处理XML响应
        """
        try:
            tree = etree.fromstring(response.text.encode('utf-8'))
            
            # 提取XML信息
            root_tag = tree.tag
            response.meta['xml_root'] = root_tag
            response.meta['xml_elements_count'] = len(list(tree.iter()))
            
            response.meta['xml_processed'] = True
            
        except Exception as e:
            spider.logger.error(f"XML preprocessing error: {e}")
            response.meta['xml_processed'] = False
    
    def _flatten_dict(self, d, parent_key='', sep='_'):
        """
        扁平化嵌套字典
        """
        items = []
        for k, v in d.items():
            new_key = f"{parent_key}{sep}{k}" if parent_key else k
            if isinstance(v, dict):
                items.extend(self._flatten_dict(v, new_key, sep=sep).items())
            else:
                items.append((new_key, v))
        return dict(items)
    
    def _is_list_page(self, tree):
        """
        判断是否为列表页
        """
        # 检查是否有列表元素
        list_elements = tree.xpath('//ul/li | //ol/li | //div[contains(@class, "item")]')
        return len(list_elements) > 5  # 假设超过5个列表项为列表页
    
    def _is_detail_page(self, tree):
        """
        判断是否为详情页
        """
        # 检查是否有详情页特征
        detail_elements = tree.xpath('//div[contains(@class, "detail")] | //div[contains(@class, "content")]')
        return len(detail_elements) > 0

编码预处理中间件

import chardet

class EncodingPreprocessingMiddleware:
    """
    编码预处理中间件
    """
    
    def process_spider_input(self, response, spider):
        """
        预处理响应编码
        """
        # 检测实际编码
        detected_encoding = chardet.detect(response.body)
        response.meta['detected_encoding'] = detected_encoding
        
        # 检查响应声明的编码
        declared_encoding = response.headers.encoding
        response.meta['declared_encoding'] = declared_encoding
        
        # 如果编码不匹配,尝试使用检测到的编码
        if detected_encoding['confidence'] > 0.8:
            if (declared_encoding and 
                declared_encoding.lower() != detected_encoding['encoding'].lower()):
                spider.logger.warning(
                    f"Encoding mismatch: declared={declared_encoding}, detected={detected_encoding['encoding']}"
                )
                
                # 尝试使用检测到的编码重新解码
                try:
                    decoded_text = response.body.decode(detected_encoding['encoding'])
                    # 创建新的Response对象
                    from scrapy.http import HtmlResponse
                    new_response = HtmlResponse(
                        url=response.url,
                        body=decoded_text.encode('utf-8'),
                        encoding='utf-8',
                        request=response.request
                    )
                    # 替换响应(这在中间件中可能有限制)
                    response.meta['corrected_encoding'] = detected_encoding['encoding']
                except UnicodeDecodeError:
                    spider.logger.error(f"Failed to decode with detected encoding: {detected_encoding['encoding']}")
        
        # 记录编码信息
        response.meta['encoding_confidence'] = detected_encoding['confidence']
        
        return None

数据后处理技术

数据后处理是在数据离开Spider之后进行的处理,用于数据清洗和格式化。

数据清洗后处理

import re
from itemadapter import ItemAdapter

class DataCleaningPostprocessingMiddleware:
    """
    数据清洗后处理中间件
    """
    
    def __init__(self):
        # 定义清洗规则
        self.cleaning_rules = {
            'text': self._clean_text,
            'number': self._clean_number,
            'url': self._clean_url,
            'email': self._clean_email,
        }
    
    def process_spider_output(self, response, result, spider):
        """
        后处理输出数据
        """
        for item_or_request in result:
            if isinstance(item_or_request, (dict, type(None))):
                if item_or_request is not None:
                    cleaned_item = self._clean_item(item_or_request, spider)
                    yield cleaned_item
                else:
                    yield item_or_request
            elif hasattr(item_or_request, 'fields'):
                # 处理Scrapy Item
                adapter = ItemAdapter(item_or_request)
                cleaned_data = self._clean_item(adapter.asdict(), spider)
                
                # 更新原始Item
                for key, value in cleaned_data.items():
                    adapter[key] = value
                
                yield item_or_request
            else:
                yield item_or_request
    
    def _clean_item(self, item_dict, spider):
        """
        清洗单个项目
        """
        cleaned = {}
        
        for field_name, value in item_dict.items():
            if value is None:
                cleaned[field_name] = None
                continue
            
            field_type = self._detect_field_type(field_name, value)
            
            if field_type in self.cleaning_rules:
                cleaned[field_name] = self.cleaning_rules[field_type](value)
            else:
                # 默认处理
                if isinstance(value, str):
                    cleaned[field_name] = self._clean_text(value)
                else:
                    cleaned[field_name] = value
        
        return cleaned
    
    def _detect_field_type(self, field_name, value):
        """
        检测字段类型
        """
        field_name_lower = field_name.lower()
        
        # 基于字段名判断类型
        if any(keyword in field_name_lower for keyword in ['name', 'title', 'description', 'content']):
            return 'text'
        elif any(keyword in field_name_lower for keyword in ['price', 'amount', 'count', 'number']):
            return 'number'
        elif any(keyword in field_name_lower for keyword in ['url', 'link', 'href']):
            return 'url'
        elif 'email' in field_name_lower:
            return 'email'
        
        # 基于值判断类型
        if isinstance(value, (int, float)):
            return 'number'
        elif isinstance(value, str):
            if self._looks_like_url(value):
                return 'url'
            elif self._looks_like_email(value):
                return 'email'
        
        return 'text'  # 默认为文本
    
    def _clean_text(self, text):
        """
        清洗文本数据
        """
        if not isinstance(text, str):
            return text
        
        # 去除多余空白
        cleaned = re.sub(r'\s+', ' ', text.strip())
        
        # 去除特殊字符(保留中文、英文、数字、基本标点)
        cleaned = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:()""''-]', '', cleaned)
        
        return cleaned
    
    def _clean_number(self, number):
        """
        清洗数字数据
        """
        if isinstance(number, (int, float)):
            return number
        
        if isinstance(number, str):
            # 提取数字
            numbers = re.findall(r'-?\d+\.?\d*', number.replace(',', ''))
            if numbers:
                try:
                    if '.' in numbers[0]:
                        return float(numbers[0])
                    else:
                        return int(numbers[0])
                except ValueError:
                    pass
        
        return number
    
    def _clean_url(self, url):
        """
        清洗URL数据
        """
        if not isinstance(url, str):
            return url
        
        # 去除首尾空白
        url = url.strip()
        
        # 处理相对URL
        if url.startswith('//'):
            url = 'https:' + url
        elif url.startswith('/'):
            # 需要基础URL来处理相对路径
            pass
        
        return url
    
    def _clean_email(self, email):
        """
        清洗邮箱数据
        """
        if not isinstance(email, str):
            return email
        
        email = email.strip().lower()
        
        # 验证邮箱格式
        import re
        if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
            return email
        
        return email
    
    def _looks_like_url(self, text):
        """
        检查文本是否像URL
        """
        return bool(re.match(r'^https?://', text.strip()))
    
    def _looks_like_email(self, text):
        """
        检查文本是否像邮箱
        """
        return bool(re.match(r'^[^@]+@[^@]+\.[^@]+$', text.strip()))

数据验证后处理

from itemadapter import ItemAdapter

class DataValidationPostprocessingMiddleware:
    """
    数据验证后处理中间件
    """
    
    def __init__(self):
        self.validation_rules = {
            'required': self._validate_required,
            'min_length': self._validate_min_length,
            'max_length': self._validate_max_length,
            'pattern': self._validate_pattern,
            'range': self._validate_range,
        }
    
    def process_spider_output(self, response, result, spider):
        """
        验证输出数据
        """
        for item_or_request in result:
            if isinstance(item_or_request, (dict, type(None))):
                if item_or_request is not None:
                    validated_item = self._validate_item(item_or_request, spider)
                    if validated_item is not None:
                        yield validated_item
                else:
                    yield item_or_request
            elif hasattr(item_or_request, 'fields'):
                # 处理Scrapy Item
                adapter = ItemAdapter(item_or_request)
                validated_data = self._validate_item(adapter.asdict(), spider)
                
                if validated_data is not None:
                    # 更新原始Item
                    for key, value in validated_data.items():
                        adapter[key] = value
                    yield item_or_request
            else:
                yield item_or_request
    
    def _validate_item(self, item_dict, spider):
        """
        验证单个项目
        """
        # 获取验证配置
        validation_config = getattr(spider, 'validation_config', {})
        
        validated = {}
        errors = []
        
        for field_name, value in item_dict.items():
            field_validations = validation_config.get(field_name, [])
            
            is_valid = True
            for validation_rule in field_validations:
                rule_type = validation_rule['type']
                rule_params = validation_rule.get('params', {})
                
                if rule_type in self.validation_rules:
                    valid, error_msg = self.validation_rules[rule_type](value, **rule_params)
                    if not valid:
                        is_valid = False
                        errors.append(f"{field_name}: {error_msg}")
                        break
            
            if is_valid:
                validated[field_name] = value
            else:
                # 如果字段验证失败,可以选择跳过整个item或设置默认值
                if validation_config.get('strict_mode', False):
                    spider.logger.error(f"Validation failed for item: {errors}")
                    return None  # 跳过整个item
                else:
                    # 设置默认值或跳过该字段
                    default_value = validation_config.get('default_values', {}).get(field_name)
                    if default_value is not None:
                        validated[field_name] = default_value
        
        # 验证完成后可以添加验证状态
        validated['_validation_passed'] = len(errors) == 0
        validated['_validation_errors'] = errors
        
        return validated
    
    def _validate_required(self, value, **kwargs):
        """
        验证必需字段
        """
        if value is None or (isinstance(value, str) and not value.strip()):
            return False, "Field is required"
        return True, ""
    
    def _validate_min_length(self, value, length=0, **kwargs):
        """
        验证最小长度
        """
        if isinstance(value, str) and len(value) < length:
            return False, f"Length must be at least {length}"
        return True, ""
    
    def _validate_max_length(self, value, length=1000, **kwargs):
        """
        验证最大长度
        """
        if isinstance(value, str) and len(value) > length:
            return False, f"Length must be at most {length}"
        return True, ""
    
    def _validate_pattern(self, value, pattern='', **kwargs):
        """
        验证正则表达式模式
        """
        import re
        if isinstance(value, str) and not re.match(pattern, value):
            return False, f"Does not match pattern: {pattern}"
        return True, ""
    
    def _validate_range(self, value, min_val=None, max_val=None, **kwargs):
        """
        验证数值范围
        """
        try:
            num_value = float(value)
            if min_val is not None and num_value < min_val:
                return False, f"Value must be at least {min_val}"
            if max_val is not None and num_value > max_val:
                return False, f"Value must be at most {max_val}"
            return True, ""
        except (ValueError, TypeError):
            return False, "Value must be a number"

高级中间件技巧

条件中间件

class ConditionalMiddleware:
    """
    条件中间件 - 根据不同条件应用不同处理逻辑
    """
    
    def __init__(self):
        self.strategies = {
            'ecommerce': self._handle_ecommerce,
            'news': self._handle_news,
            'social': self._handle_social,
            'forum': self._handle_forum
        }
    
    def process_spider_input(self, response, spider):
        """
        根据网站类型选择处理策略
        """
        site_type = self._classify_site(response.url)
        
        if site_type in self.strategies:
            return self.strategies[site_type](response, spider, 'input')
        
        return None
    
    def process_spider_output(self, response, result, spider):
        """
        根据网站类型选择输出处理策略
        """
        site_type = self._classify_site(response.url)
        
        for item_or_request in result:
            if site_type in self.strategies:
                processed = self.strategies[site_type](response, [item_or_request], spider, 'output')
                yield from processed
            else:
                yield item_or_request
    
    def _classify_site(self, url):
        """
        分类网站类型
        """
        url_lower = url.lower()
        
        ecommerce_keywords = ['shop', 'buy', 'cart', 'product', 'mall', 'store', 'taobao', 'jd', 'amazon', 'aliexpress']
        news_keywords = ['news', 'article', 'blog', 'journal', 'media', 'press']
        social_keywords = ['facebook', 'twitter', 'instagram', 'weibo', 'tiktok', 'youtube']
        forum_keywords = ['forum', 'bbs', 'discuss', 'thread', 'topic']
        
        for keyword in ecommerce_keywords:
            if keyword in url_lower:
                return 'ecommerce'
        for keyword in news_keywords:
            if keyword in url_lower:
                return 'news'
        for keyword in social_keywords:
            if keyword in url_lower:
                return 'social'
        for keyword in forum_keywords:
            if keyword in url_lower:
                return 'forum'
        
        return 'general'
    
    def _handle_ecommerce(self, data, spider, phase):
        """
        处理电商网站
        """
        if phase == 'input':
            # 电商网站输入处理
            spider.logger.debug(f"E-commerce input processing for {data.url}")
            data.meta['site_type'] = 'ecommerce'
            return None
        else:
            # 电商网站输出处理
            result = data  # data is the result iterator
            processed = []
            for item in result:
                if isinstance(item, dict):
                    # 电商特定字段处理
                    item['category_type'] = 'ecommerce'
                    item['processing_strategy'] = 'detailed'
                processed.append(item)
            return processed
    
    def _handle_news(self, data, spider, phase):
        """
        处理新闻网站
        """
        if phase == 'input':
            spider.logger.debug(f"News input processing for {data.url}")
            data.meta['site_type'] = 'news'
            return None
        else:
            result = data
            processed = []
            for item in result:
                if isinstance(item, dict):
                    item['category_type'] = 'news'
                    item['processing_strategy'] = 'fast'
                processed.append(item)
            return processed
    
    def _handle_social(self, data, spider, phase):
        """
        处理社交媒体
        """
        if phase == 'input':
            spider.logger.debug(f"Social input processing for {data.url}")
            data.meta['site_type'] = 'social'
            return None
        else:
            result = data
            processed = []
            for item in result:
                if isinstance(item, dict):
                    item['category_type'] = 'social'
                    item['processing_strategy'] = 'compliance'
                processed.append(item)
            return processed
    
    def _handle_forum(self, data, spider, phase):
        """
        处理论坛网站
        """
        if phase == 'input':
            spider.logger.debug(f"Forum input processing for {data.url}")
            data.meta['site_type'] = 'forum'
            return None
        else:
            result = data
            processed = []
            for item in result:
                if isinstance(item, dict):
                    item['category_type'] = 'forum'
                    item['processing_strategy'] = 'contextual'
                processed.append(item)
            return processed

缓存中间件

import hashlib
import pickle
import os
from datetime import datetime, timedelta

class CachingMiddleware:
    """
    缓存中间件 - 缓存处理结果以提高效率
    """
    
    def __init__(self, cache_dir='spider_cache'):
        self.cache_dir = cache_dir
        self.cache_ttl = timedelta(hours=24)  # 缓存有效期24小时
        os.makedirs(cache_dir, exist_ok=True)
    
    def process_spider_input(self, response, spider):
        """
        检查是否有缓存结果
        """
        cache_key = self._generate_cache_key(response.url, response.body)
        cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl")
        
        if os.path.exists(cache_file):
            cache_time = datetime.fromtimestamp(os.path.getmtime(cache_file))
            if datetime.now() - cache_time < self.cache_ttl:
                try:
                    with open(cache_file, 'rb') as f:
                        cached_result = pickle.load(f)
                    
                    spider.logger.info(f"Cache hit for {response.url}")
                    response.meta['cached_result'] = cached_result
                    response.meta['cache_hit'] = True
                    return None
                except:
                    # 缓存损坏,删除文件
                    os.remove(cache_file)
        
        response.meta['cache_hit'] = False
        return None
    
    def process_spider_output(self, response, result, spider):
        """
        缓存处理结果
        """
        if response.meta.get('cache_hit'):
            # 如果是缓存命中,直接返回缓存结果
            cached_result = response.meta.get('cached_result', [])
            yield from cached_result
        else:
            # 正常处理并缓存结果
            results = list(result)  # 转换为列表以便缓存
            
            # 生成缓存键并保存
            cache_key = self._generate_cache_key(response.url, response.body)
            cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl")
            
            try:
                with open(cache_file, 'wb') as f:
                    pickle.dump(results, f)
                spider.logger.info(f"Result cached for {response.url}")
            except Exception as e:
                spider.logger.error(f"Failed to cache result: {e}")
            
            yield from results
    
    def _generate_cache_key(self, url, content):
        """
        生成缓存键
        """
        content_hash = hashlib.md5(content).hexdigest()
        url_hash = hashlib.md5(url.encode()).hexdigest()
        return f"{url_hash}_{content_hash[:8]}"

性能优化策略

内存优化中间件

import gc
import sys
from collections import defaultdict

class MemoryOptimizedMiddleware:
    """
    内存优化中间件
    """
    
    def __init__(self, max_items_before_gc=1000):
        self.max_items_before_gc = max_items_before_gc
        self.processed_count = 0
        self.memory_monitor = defaultdict(int)
    
    def process_spider_input(self, response, spider):
        """
        输入处理时的内存优化
        """
        # 监控响应大小
        response_size = sys.getsizeof(response.body)
        self.memory_monitor['total_response_size'] += response_size
        
        # 检查是否需要垃圾回收
        if self.processed_count > 0 and self.processed_count % self.max_items_before_gc == 0:
            collected = gc.collect()
            spider.logger.debug(f"Garbage collected: {collected} objects")
        
        self.processed_count += 1
        return None
    
    def process_spider_output(self, response, result, spider):
        """
        输出处理时的内存优化
        """
        for item_or_request in result:
            # 限制单个项目的大小
            item_size = sys.getsizeof(str(item_or_request))
            if item_size > 1024 * 1024:  # 1MB
                spider.logger.warning(f"Large item detected: {item_size} bytes")
            
            # 限制字符串长度
            if isinstance(item_or_request, dict):
                for key, value in item_or_request.items():
                    if isinstance(value, str) and len(value) > 10000:  # 10KB
                        item_or_request[key] = value[:10000] + "...[truncated]"
            
            yield item_or_request
    
    def spider_closed(self, spider):
        """
        爬虫关闭时的内存清理
        """
        spider.logger.info(f"Memory usage summary: {dict(self.memory_monitor)}")
        gc.collect()

并发优化中间件

import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor

class ConcurrentOptimizedMiddleware:
    """
    并发优化中间件
    """
    
    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=4)
        self.semaphore = threading.Semaphore(10)  # 限制并发数
    
    def process_spider_input(self, response, spider):
        """
        并发优化的输入处理
        """
        with self.semaphore:
            # 执行CPU密集型预处理任务
            processed_response = self._cpu_intensive_preprocessing(response)
            return None
    
    def _cpu_intensive_preprocessing(self, response):
        """
        CPU密集型预处理任务
        """
        # 例如:复杂的文本分析、图像处理等
        # 使用线程池避免阻塞
        future = self.executor.submit(self._heavy_computation, response)
        # 可以选择同步等待或异步处理
        return future.result(timeout=5)  # 5秒超时
    
    def _heavy_computation(self, response):
        """
        模拟重计算任务
        """
        # 实际的计算逻辑
        return response

分布式环境下的中间件

在分布式爬虫环境中,Spider Middleware需要考虑跨节点协作、数据一致性、负载均衡等问题。以下是一些适用于分布式环境的中间件设计模式和实现方案。

分布式任务协调中间件

import redis
import json
import time
from scrapy import signals
from scrapy.exceptions import IgnoreRequest

class DistributedTaskCoordinationMiddleware:
    """
    分布式任务协调中间件 - 确保任务在分布式环境中的协调处理
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.redis_client = redis.from_url(crawler.settings.get('REDIS_URL', 'redis://localhost:6379'))
        self.task_coordination_key = 'distributed_spider:task_coordination'
        self.node_id = crawler.settings.get('NODE_ID', 'default_node')
        
        # 连接信号
        crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)
        crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)
    
    def spider_opened(self, spider):
        """
        爬虫开启时注册节点
        """
        node_info = {
            'node_id': self.node_id,
            'started_at': time.time(),
            'status': 'active'
        }
        
        self.redis_client.hset(
            f'{self.task_coordination_key}:nodes',
            self.node_id,
            json.dumps(node_info)
        )
        
        spider.logger.info(f"Node {self.node_id} registered in distributed coordination")
    
    def spider_closed(self, spider, reason):
        """
        爬虫关闭时注销节点
        """
        self.redis_client.hdel(
            f'{self.task_coordination_key}:nodes',
            self.node_id
        )
        
        spider.logger.info(f"Node {self.node_id} unregistered from distributed coordination")
    
    def process_spider_input(self, response, spider):
        """
        处理输入前检查任务分配
        """
        url = response.url
        task_key = f"{self.task_coordination_key}:assigned_tasks:{url}"
        
        # 检查该任务是否已被其他节点处理
        assigned_node = self.redis_client.get(task_key)
        if assigned_node and assigned_node.decode() != self.node_id:
            # 任务已被其他节点分配,跳过
            spider.logger.info(f"Task {url} already assigned to node {assigned_node.decode()}, skipping")
            raise IgnoreRequest(f"Task already assigned to another node")
        
        # 分配任务给当前节点
        self.redis_client.setex(task_key, 3600, self.node_id)  # 1小时过期
        response.meta['distributed_task_assigned'] = True
        
        return None
    
    def process_spider_output(self, response, result, spider):
        """
        处理输出时更新任务状态
        """
        for item_or_request in result:
            # 标记任务处理完成
            if response.meta.get('distributed_task_assigned'):
                url = response.url
                task_complete_key = f"{self.task_coordination_key}:completed_tasks:{url}"
                self.redis_client.setex(task_complete_key, 86400, self.node_id)  # 24小时过期
            
            yield item_or_request

class DistributedRateLimitMiddleware:
    """
    分布式速率限制中间件 - 在多个节点间协调请求频率
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.redis_client = redis.from_url(crawler.settings.get('REDIS_URL', 'redis://localhost:6379'))
        self.node_id = crawler.settings.get('NODE_ID', 'default_node')
        self.rate_limits = {}
        
        # 从配置中加载速率限制规则
        rate_limit_config = crawler.settings.get('DISTRIBUTED_RATE_LIMITS', {})
        for domain, limit_info in rate_limit_config.items():
            self.rate_limits[domain] = {
                'requests': limit_info.get('requests', 10),
                'window': limit_info.get('window', 60)
            }
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)
    
    def process_spider_input(self, response, spider):
        """
        检查分布式速率限制
        """
        from urllib.parse import urlparse
        domain = urlparse(response.url).netloc
        
        if domain in self.rate_limits:
            limit_info = self.rate_limits[domain]
            requests_allowed = limit_info['requests']
            window = limit_info['window']
            
            # 使用Redis的滑动窗口算法检查速率限制
            current_time = int(time.time())
            window_start = current_time - window
            
            # 获取当前窗口内的请求数
            requests_key = f"distributed_rate_limit:{domain}:requests"
            pipe = self.redis_client.pipeline()
            pipe.zremrangebyscore(requests_key, 0, window_start)
            pipe.zcard(requests_key)
            pipe.zadd(requests_key, {f"{self.node_id}:{current_time}": current_time})
            pipe.expire(requests_key, window * 2)  # 设置过期时间
            results = pipe.execute()
            
            current_requests = results[1]
            
            if current_requests > requests_allowed:
                # 超出速率限制,延迟处理
                delay_needed = window - (current_time % window)
                spider.logger.warning(f"Distributed rate limit exceeded for {domain}, delaying for {delay_needed}s")
                
                # 将请求重新排队(需要自定义调度器配合)
                response.meta['distributed_rate_limited'] = True
                response.meta['delay_until'] = current_time + delay_needed
        
        return None

class DistributedDataConsistencyMiddleware:
    """
    分布式数据一致性中间件 - 确保跨节点数据处理的一致性
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.redis_client = redis.from_url(crawler.settings.get('REDIS_URL', 'redis://localhost:6379'))
        self.node_id = crawler.settings.get('NODE_ID', 'default_node')
        self.consistency_key = 'distributed_spider:data_consistency'
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)
    
    def process_spider_output(self, response, result, spider):
        """
        处理输出时确保数据一致性
        """
        for item_or_request in result:
            if isinstance(item_or_request, dict):
                # 为数据项生成全局唯一ID
                import hashlib
                item_content = str(sorted(item_or_request.items()))
                global_item_id = hashlib.md5(item_content.encode()).hexdigest()
                
                consistency_check_key = f"{self.consistency_key}:processed:{global_item_id}"
                
                # 使用Redis分布式锁确保一致性
                lock_key = f"{consistency_check_key}:lock"
                lock_timeout = 30  # 30秒锁超时
                
                # 尝试获取分布式锁
                if self._acquire_lock(lock_key, self.node_id, lock_timeout):
                    try:
                        # 检查是否已处理过
                        already_processed_by = self.redis_client.get(consistency_check_key)
                        if already_processed_by:
                            spider.logger.info(f"Item already processed by node {already_processed_by.decode()}")
                            continue  # 跳过已处理的项目
                        
                        # 标记为当前节点处理
                        self.redis_client.setex(
                            consistency_check_key,
                            86400 * 7,  # 7天过期
                            self.node_id
                        )
                        
                        # 添加处理节点信息
                        item_or_request['distributed_processed_by'] = self.node_id
                        item_or_request['distributed_process_time'] = time.time()
                        
                        yield item_or_request
                    finally:
                        # 释放锁
                        self._release_lock(lock_key, self.node_id)
                else:
                    spider.logger.warning(f"Could not acquire lock for item {global_item_id}, skipping")
            else:
                yield item_or_request
    
    def _acquire_lock(self, lock_key, node_id, timeout):
        """
        获取分布式锁
        """
        lua_acquire = """
        if redis.call("GET", KEYS[1]) == false then
            redis.call("SET", KEYS[1], ARGV[1])
            redis.call("EXPIRE", KEYS[1], ARGV[2])
            return 1
        else
            return 0
        end
        """
        return bool(self.redis_client.eval(lua_acquire, 1, lock_key, node_id, timeout))
    
    def _release_lock(self, lock_key, node_id):
        """
        释放分布式锁
        """
        lua_release = """
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
        """
        return bool(self.redis_client.eval(lua_release, 1, lock_key, node_id))

class DistributedLoadBalancingMiddleware:
    """
    分布式负载均衡中间件 - 根据节点负载动态调整任务分配
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.redis_client = redis.from_url(crawler.settings.get('REDIS_URL', 'redis://localhost:6379'))
        self.node_id = crawler.settings.get('NODE_ID', 'default_node')
        self.load_balance_key = 'distributed_spider:load_balance'
        self.task_assignment_key = f"{self.load_balance_key}:assignments:{self.node_id}"
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)
    
    def process_spider_input(self, response, spider):
        """
        根据负载情况决定是否处理请求
        """
        # 更新节点负载信息
        self._update_node_load(spider)
        
        # 检查当前节点负载是否过高
        current_load = self._get_current_node_load()
        max_load = self.crawler.settings.get('NODE_MAX_LOAD', 100)
        
        if current_load > max_load:
            spider.logger.warning(f"Node {self.node_id} load too high ({current_load}), considering task offloading")
            
            # 可以实现任务重新分配逻辑
            # 这里简单地记录负载过高
            response.meta['node_load_high'] = True
        
        return None
    
    def process_spider_output(self, response, result, spider):
        """
        处理完任务后更新负载信息
        """
        # 减少当前节点的任务计数
        self.redis_client.decr(self.task_assignment_key, 1)
        
        for item_or_request in result:
            yield item_or_request
    
    def _update_node_load(self, spider):
        """
        更新节点负载信息
        """
        # 增加当前节点的任务计数
        self.redis_client.incr(self.task_assignment_key, 1)
        
        # 更新负载时间戳
        self.redis_client.setex(
            f"{self.load_balance_key}:node_last_active:{self.node_id}",
            300,  # 5分钟过期
            time.time()
        )
    
    def _get_current_node_load(self):
        """
        获取当前节点负载
        """
        task_count = int(self.redis_client.get(self.task_assignment_key) or 0)
        return task_count

## 常见问题与解决方案 \{#常见问题与解决方案}

### 问题1: 中间件执行顺序错误

**现象**: 中间件没有按照预期顺序执行
**解决方案**:
```python
# 确保在settings.py中正确设置优先级
SPIDER_MIDDLEWARES = {
    'myproject.middlewares.ValidationMiddleware': 400,  # 先执行验证
    'myproject.middlewares.CleaningMiddleware': 500,    # 再执行清洗
    'myproject.middlewares.EnrichmentMiddleware': 600,  # 最后执行增强
}

问题2: 内存泄漏

现象: 长时间运行后内存占用过高 解决方案:

class MemorySafeMiddleware:
    def __init__(self):
        self.item_cache = {}  # 使用弱引用或限制大小
        self.max_cache_size = 1000
    
    def process_spider_output(self, response, result, spider):
        # 控制缓存大小
        if len(self.item_cache) > self.max_cache_size:
            self.item_cache.clear()
        
        for item in result:
            yield item

问题3: 异常处理不当

现象: 异常导致整个爬虫停止 解决方案:

class RobustExceptionMiddleware:
    def process_spider_exception(self, response, exception, spider):
        spider.logger.error(f"Exception occurred: {exception}", exc_info=True)
        
        # 记录错误但不中断爬虫
        response.meta['error_occurred'] = True
        response.meta['error_message'] = str(exception)
        
        # 返回空结果继续处理
        return []

最佳实践总结

设计原则

  1. 单一职责: 每个中间件只处理一种类型的逻辑,如输入预处理、输出后处理、异常处理等
  2. 可配置性: 通过settings.py配置中间件参数,提高灵活性
  3. 健壮性: 妥善处理异常,避免影响整个爬虫运行
  4. 性能考虑: 避免在中间件中进行耗时操作,考虑异步处理

性能优化策略

  1. 内存管理: 及时清理不必要的数据,使用生成器而非列表
  2. 缓存机制: 对于重复处理的数据使用缓存
  3. 并发处理: 在适当场景使用并发处理提高效率
  4. 资源限制: 设置合理的资源使用上限

分布式环境最佳实践

  1. 状态管理: 使用Redis等外部存储管理分布式状态
  2. 一致性保证: 实现分布式锁确保数据一致性
  3. 负载均衡: 根据节点负载动态分配任务
  4. 容错机制: 设计故障转移和恢复机制

部署建议

  1. 监控: 实施适当的监控和日志记录
  2. 测试: 充分测试各种边界情况
  3. 文档: 为自定义中间件编写清晰的文档
  4. 版本控制: 管理中间件的版本变更

💡 核心要点: Spider Middleware是Scrapy数据处理流程中的关键组件,通过合理使用中间件,可以实现复杂的数据预处理和后处理逻辑,提高爬虫的灵活性和数据质量。

SEO优化策略

  1. 关键词优化: 在标题、内容中合理布局"Scrapy", "Spider Middleware", "数据预处理", "数据后处理", "异常处理"等关键词
  2. 内容结构: 使用清晰的标题层级(H1-H6),便于搜索引擎理解内容结构
  3. 内部链接: 建立与其他相关教程的内部链接,提升页面权重
  4. 元数据优化: 在页面头部包含描述性的标题、描述和标签

🔗 相关教程推荐

🏷️ 标签云: Scrapy Spider Middleware 数据预处理 数据后处理 异常处理 信号系统 爬虫框架 网络爬虫 Python爬虫 分布式爬虫