大规模爬虫优化 - 内存管理、网络优化与性能调优详解

📂 所属阶段:第五阶段 — 战力升级(分布式与进阶篇)
🔗 相关章节:自动限速AutoThrottle · 数据去重与增量更新 · 分布式去重与调度

目录

大规模爬虫优化概述

大规模爬虫系统面临着诸多挑战,包括内存使用、网络带宽、并发控制、数据处理效率等问题。优化这些方面对于确保爬虫系统的稳定性和高效性至关重要。

大规模爬虫面临的挑战

"""
大规模爬虫面临的主要挑战:

1. 内存压力:
   - 大量URL队列存储
   - 响应数据缓存
   - 中间数据处理
   - 状态跟踪维护

2. 网络瓶颈:
   - 连接池管理
   - DNS解析效率
   - 带宽利用率
   - 超时和重试策略

3. 并发控制:
   - 请求频率限制
   - 线程/协程管理
   - 资源竞争协调
   - 负载均衡策略

4. 数据处理:
   - 实时处理能力
   - 存储效率优化
   - 数据质量保障
   - 错误处理机制
"""

优化目标与原则

"""
大规模爬虫优化的核心目标:

1. 稳定性:
   - 7x24小时稳定运行
   - 容错和恢复机制
   - 异常处理完善

2. 效率性:
   - 最大化吞吐量
   - 最小化资源消耗
   - 降低响应延迟

3. 可扩展性:
   - 支持动态扩容
   - 水平扩展能力
   - 负载均衡支持

4. 可维护性:
   - 详细的监控指标
   - 清晰的日志记录
   - 易于调试和优化
"""

内存管理优化

内存管理是大规模爬虫优化的核心环节,直接影响系统的稳定性和性能。

内存配置优化

# advanced_settings.py - 高级内存配置
import psutil
import gc
from scrapy.utils.reactor import install_reactor

# 内存使用限制
MEMUSAGE_ENABLED = True
MEMUSAGE_LIMIT_MB = 2048  # 限制内存使用为2GB
MEMUSAGE_WARNING_MB = 1536  # 1.5GB警告

# 内存调试
MEMDEBUG_ENABLED = True
MEMDEBUG_NOTIFY = ['admin@example.com']  # 内存泄露通知邮箱

# 自动关闭爬虫当内存超过限制
CLOSESPIDER_MEMUSAGE = 2048  # 2GB

# 优化内存使用的配置
FEED_EXPORTERS = {
    'json': 'scrapy.exporters.JsonItemExporter',
    'csv': 'scrapy.exporters.CsvItemExporter',
}
FEED_STORAGES = {
    'file': 'scrapy.extensions.feedexport.FileFeedStorage',
}

# 限制并发项目处理数量
CONCURRENT_ITEMS = 100

# 优化下载器内存使用
DOWNLOAD_HANDLERS = {
    'http': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler',
    'https': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler',
}

# 限制下载响应大小
DOWNLOAD_MAXSIZE = 50 * 1024 * 1024  # 50MB
DOWNLOAD_WARNSIZE = 10 * 1024 * 1024  # 10MB警告

# 优化请求队列内存使用
SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.PickleLifoMemoryQueue'

内存监控与管理

# memory_manager.py - 内存管理器
import psutil
import gc
import time
import logging
from scrapy import signals
from scrapy.exceptions import CloseSpider
from pydispatch import dispatcher

class MemoryManager:
    """
    内存管理器
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.stats = crawler.stats
        self.max_memory_mb = crawler.settings.getint('MEMUSAGE_LIMIT_MB', 2048)
        self.warning_memory_mb = crawler.settings.getint('MEMUSAGE_WARNING_MB', 1536)
        self.logger = logging.getLogger(__name__)
        
        # 连接信号
        dispatcher.connect(self.spider_opened, signal=signals.spider_opened)
        dispatcher.connect(self.spider_closed, signal=signals.spider_closed)
    
    def spider_opened(self, spider):
        """
        爬虫开启时的内存监控
        """
        self.logger.info(f"Memory manager started for spider: {spider.name}")
        
        # 启动内存监控
        self.start_memory_monitoring()
    
    def spider_closed(self, spider, reason):
        """
        爬虫关闭时的内存清理
        """
        self.logger.info(f"Memory manager stopped for spider: {spider.name}")
        
        # 执行垃圾回收
        collected = gc.collect()
        self.logger.info(f"Garbage collected: {collected} objects")
    
    def start_memory_monitoring(self):
        """
        开始内存监控
        """
        self.last_check_time = time.time()
    
    def check_memory_usage(self) -> bool:
        """
        检查内存使用情况
        
        Returns:
            True: 内存使用正常,False: 内存超限需要关闭爬虫
        """
        process = psutil.Process()
        memory_info = process.memory_info()
        memory_mb = memory_info.rss / 1024 / 1024
        
        # 更新统计信息
        self.stats.set_value('memusage/rss', memory_mb)
        
        # 检查是否达到警告阈值
        if memory_mb > self.warning_memory_mb:
            self.logger.warning(f"Memory usage warning: {memory_mb:.2f}MB / {self.warning_memory_mb}MB")
        
        # 检查是否达到限制阈值
        if memory_mb > self.max_memory_mb:
            self.logger.error(f"Memory usage exceeded limit: {memory_mb:.2f}MB / {self.max_memory_mb}MB")
            return False
        
        return True
    
    def get_memory_info(self) -> dict:
        """
        获取内存信息
        """
        process = psutil.Process()
        memory_info = process.memory_info()
        
        return {
            'rss': memory_info.rss / 1024 / 1024,  # MB
            'vms': memory_info.vms / 1024 / 1024,  # MB
            'percent': process.memory_percent(),
            'max_limit_mb': self.max_memory_mb,
            'warning_limit_mb': self.warning_memory_mb
        }

class MemoryUsageExtension:
    """
    内存使用扩展
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.memory_manager = MemoryManager(crawler)
    
    @classmethod
    def from_crawler(cls, crawler):
        ext = cls(crawler)
        
        # 监听spider_idle信号来检查内存使用
        crawler.signals.connect(ext.spider_idle, signal=signals.spider_idle)
        
        return ext
    
    def spider_idle(self, spider):
        """
        爬虫空闲时检查内存使用
        """
        if not self.memory_manager.check_memory_usage():
            # 内存超限,关闭爬虫
            raise CloseSpider('memory_exceeded')

内存优化中间件

# memory_optimization_middleware.py - 内存优化中间件
import gc
import weakref
from scrapy.http import Response
from itemadapter import ItemAdapter

class MemoryOptimizationMiddleware:
    """
    内存优化中间件
    """
    
    def __init__(self):
        self.item_cache = weakref.WeakSet()  # 使用弱引用避免内存泄漏
        self.response_cache = {}
        self.cache_size_limit = 1000
    
    def process_spider_input(self, response, spider):
        """
        处理输入响应的内存优化
        """
        # 检查响应大小
        if len(response.body) > 10 * 1024 * 1024:  # 10MB
            spider.logger.warning(f"Large response detected: {len(response.body)} bytes for {response.url}")
        
        # 清理缓存
        if len(self.response_cache) > self.cache_size_limit:
            # 清理一半缓存
            keys_to_remove = list(self.response_cache.keys())[:self.cache_size_limit // 2]
            for key in keys_to_remove:
                del self.response_cache[key]
        
        return None
    
    def process_spider_output(self, response, result, spider):
        """
        处理输出结果的内存优化
        """
        for item_or_request in result:
            if hasattr(item_or_request, 'fields'):  # Item
                # 优化Item内存使用
                adapter = ItemAdapter(item_or_request)
                
                # 限制大字段大小
                for field_name, value in adapter.items():
                    if isinstance(value, str) and len(value) > 10000:  # 10KB
                        adapter[field_name] = value[:10000] + "...[TRUNCATED]"
                
                # 添加到弱引用缓存
                self.item_cache.add(item_or_request)
            
            yield item_or_request
    
    def spider_closed(self, spider):
        """
        爬虫关闭时的内存清理
        """
        # 执行垃圾回收
        collected = gc.collect()
        spider.logger.info(f"Memory optimization middleware cleanup: collected {collected} objects")

网络性能优化

网络性能优化是提高爬虫效率的关键因素。

网络配置优化

# network_optimization_settings.py - 网络优化配置
import socket

# DNS优化
DNSCACHE_ENABLED = True
DNSCACHE_SIZE = 10000  # 增大DNS缓存
DNS_TIMEOUT = 60  # DNS超时时间

# 连接池优化
CONCURRENT_REQUESTS = 32  # 并发请求数
CONCURRENT_REQUESTS_PER_DOMAIN = 8  # 每个域名的并发请求数
CONCURRENT_REQUESTS_PER_IP = 4  # 每个IP的并发请求数

# 下载超时设置
DOWNLOAD_TIMEOUT = 30  # 下载超时时间
DOWNLOAD_DELAY = 1  # 下载延迟
RANDOMIZE_DOWNLOAD_DELAY = 0.5  # 随机延迟比例

# 重试设置
RETRY_TIMES = 3  # 重试次数
RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 429, 403]  # 需要重试的HTTP状态码
RETRY_PRIORITY_ADJUST = -1  # 重试请求优先级调整

# 下载器设置
DOWNLOADER_STATS = True  # 启用下载器统计
DOWNLOAD_FAIL_ON_DATALOSS = False  # 数据丢失时不失败

# TCP连接优化
REACTOR_THREADPOOL_MAXSIZE = 20  # Reactor线程池最大大小
TCP_DEFERRED_LISTENING_DELAY = 0.1  # TCP延迟监听

# HTTP缓存(通常在大规模爬虫中关闭)
HTTPCACHE_ENABLED = False  # 关闭HTTP缓存以节省内存
HTTPCACHE_EXPIRATION_SECS = 0

# User-Agent优化
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'

# 下载器中间件配置
DOWNLOADER_MIDDLEWARES = {
    'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
    'scrapy.downloadermiddlewares.retry.RetryMiddleware': 90,
    'scrapy.downloadermiddlewares.redirect.MetaRefreshMiddleware': 80,
    'scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware': 81,
    'scrapy.downloadermiddlewares.stats.DownloaderStats': 82,
}

连接池管理

# connection_pool_manager.py - 连接池管理器
from scrapy.core.downloader.handlers.http11 import TunnelError
from twisted.internet import reactor
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.web.client import Agent, HTTPConnectionPool
from scrapy.core.downloader.contextfactory import ScrapyClientContextFactory
import logging

class OptimizedHTTPPool:
    """
    优化的HTTP连接池
    """
    
    def __init__(self, maxsize=20, persistent=True):
        self.pool = HTTPConnectionPool(reactor, persistent=persistent)
        self.pool.maxPersistentPerHost = maxsize
        self.pool.cachedConnectionTimeout = 30  # 连接超时
        self.pool.retryAutomatically = True  # 自动重试
    
    def get_agent(self, crawler):
        """
        获取优化的HTTP代理
        """
        context_factory = ScrapyClientContextFactory.from_settings(crawler.settings)
        agent = Agent(reactor, contextFactory=context_factory, pool=self.pool)
        return agent
    
    def close_pool(self):
        """
        关闭连接池
        """
        d = self.pool.closeCachedConnections()
        return d

class NetworkOptimizationMiddleware:
    """
    网络优化中间件
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.connection_pool = OptimizedHTTPPool(maxsize=crawler.settings.getint('CONCURRENT_REQUESTS_PER_DOMAIN', 8))
        self.logger = logging.getLogger(__name__)
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)
    
    def process_request(self, request, spider):
        """
        处理请求的网络优化
        """
        # 添加网络优化头部
        request.headers.setdefault('Accept-Encoding', 'gzip, deflate')
        request.headers.setdefault('Connection', 'keep-alive')
        request.headers.setdefault('Keep-Alive', 'timeout=30, max=100')
        
        # 根据URL优化请求策略
        if self._is_api_request(request.url):
            # API请求可能需要不同的策略
            request.meta.setdefault('download_timeout', 15)
        elif self._is_static_resource(request.url):
            # 静态资源可能需要不同的策略
            request.meta.setdefault('download_timeout', 10)
        
        return None
    
    def process_response(self, request, response, spider):
        """
        处理响应的网络优化
        """
        # 检查响应头信息
        content_length = response.headers.get('Content-Length')
        if content_length:
            try:
                size = int(content_length)
                if size > 50 * 1024 * 1024:  # 50MB
                    spider.logger.warning(f"Large response: {size} bytes from {response.url}")
            except ValueError:
                pass
        
        # 检查压缩情况
        content_encoding = response.headers.get('Content-Encoding', b'').decode('utf-8')
        if 'gzip' not in content_encoding and len(response.body) > 1024:  # 1KB
            spider.logger.debug(f"Response not compressed: {response.url}")
        
        return response
    
    def _is_api_request(self, url: str) -> bool:
        """
        判断是否为API请求
        """
        api_patterns = ['/api/', '/v1/', '/v2/', '/json/', '.json', '/rest/']
        return any(pattern in url.lower() for pattern in api_patterns)
    
    def _is_static_resource(self, url: str) -> bool:
        """
        判断是否为静态资源
        """
        static_extensions = ['.css', '.js', '.jpg', '.jpeg', '.png', '.gif', '.ico', '.svg', '.woff', '.woff2', '.ttf']
        return any(url.lower().endswith(ext) for ext in static_extensions)
    
    def spider_closed(self, spider):
        """
        爬虫关闭时清理连接池
        """
        self.connection_pool.close_pool()
        self.logger.info("Connection pool closed")

自适应限速优化

# adaptive_throttling.py - 自适应限速
import time
import statistics
from collections import deque, defaultdict
from scrapy.downloadermiddlewares.throttle import AutoThrottle

class AdaptiveAutoThrottle(AutoThrottle):
    """
    自适应自动限速
    """
    
    def __init__(self, crawler):
        super().__init__(crawler)
        self.response_times = defaultdict(lambda: deque(maxlen=100))  # 响应时间记录
        self.success_rates = defaultdict(lambda: deque(maxlen=50))   # 成功率记录
        self.adjustment_history = defaultdict(list)                  # 调整历史
        self.min_delay = crawler.settings.getfloat('AUTOTHROTTLE_MIN_DELAY', 1)
        self.max_delay = crawler.settings.getfloat('AUTOTHROTTLE_MAX_DELAY', 60)
        self.target_avg_delay = crawler.settings.getfloat('AUTOTHROTTLE_TARGET_CONCURRENCY', 1.0)
    
    def _adjust_delay(self, slot, latency, response=None):
        """
        自适应调整延迟
        """
        # 记录响应时间
        slot_key = slot.key
        self.response_times[slot_key].append(latency)
        
        # 计算平均响应时间和变化趋势
        if len(self.response_times[slot_key]) >= 10:
            recent_avg = statistics.mean(list(self.response_times[slot_key])[-10:])
            overall_avg = statistics.mean(self.response_times[slot_key])
            
            # 根据响应时间趋势调整延迟
            if recent_avg > overall_avg * 1.2:  # 响应时间变慢
                # 增加延迟
                new_delay = min(slot.delay * 1.2, self.max_delay)
                slot.delay = new_delay
            elif recent_avg < overall_avg * 0.8:  # 响应时间变快
                # 减少延迟
                new_delay = max(slot.delay * 0.9, self.min_delay)
                slot.delay = new_delay
        
        # 根据成功率调整(如果可用)
        if response and hasattr(response, 'status'):
            success = 200 <= response.status < 400
            self.success_rates[slot_key].append(1 if success else 0)
            
            if len(self.success_rates[slot_key]) >= 10:
                recent_success_rate = sum(list(self.success_rates[slot_key])[-10:]) / 10
                
                if recent_success_rate < 0.8:  # 成功率低,可能是被限制
                    new_delay = min(slot.delay * 1.5, self.max_delay)
                    slot.delay = new_delay
                elif recent_success_rate > 0.95:  # 成功率很高,可以加快
                    new_delay = max(slot.delay * 0.9, self.min_delay)
                    slot.delay = new_delay
    
    def _get_slot_key(self, request, spider):
        """
        获取槽键(可根据域名或其他因素)
        """
        return request.meta.get('slot_key') or super()._get_slot_key(request, spider)

class AdvancedThrottleMiddleware:
    """
    高级限速中间件
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.throttle = AdaptiveAutoThrottle(crawler)
        self.domain_configs = {}
        
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)
    
    def process_request(self, request, spider):
        """
        处理请求时应用限速
        """
        # 获取域名配置
        domain = self._get_domain(request.url)
        config = self.domain_configs.get(domain, {})
        
        # 应用自定义限速规则
        custom_delay = config.get('delay')
        if custom_delay:
            time.sleep(custom_delay)
        
        # 应用自适应限速
        self.throttle.process_request(request, spider)
        
        return None
    
    def process_response(self, request, response, spider):
        """
        处理响应后更新限速参数
        """
        # 更新限速参数
        latency = response.meta.get('download_latency', 0)
        self.throttle._adjust_delay(self.throttle._get_slot(request, spider), latency, response)
        
        return response
    
    def _get_domain(self, url: str) -> str:
        """
        从URL提取域名
        """
        from urllib.parse import urlparse
        parsed = urlparse(url)
        return parsed.netloc

并发控制策略

合理的并发控制是大规模爬虫性能的关键。

并发配置优化

# concurrency_settings.py - 并发配置
# Scrapy内置并发设置
CONCURRENT_REQUESTS = 64  # 总并发请求数
CONCURRENT_REQUESTS_PER_DOMAIN = 16  # 每个域名的并发请求数
CONCURRENT_REQUESTS_PER_IP = 8  # 每个IP的并发请求数

# 下载器设置
DOWNLOAD_DELAY = 0.5  # 下载延迟
RANDOMIZE_DOWNLOAD_DELAY = 0.3  # 随机延迟比例

# 自动限速设置
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 0.5
AUTOTHROTTLE_MAX_DELAY = 10
AUTOTHROTTLE_TARGET_CONCURRENCY = 4.0
AUTOTHROTTLE_DEBUG = True

# 线程池设置
REACTOR_THREADPOOL_MAXSIZE = 20

# 队列设置
SCHEDULER_PRIORITY_QUEUE = 'scrapy.pqueues.ScrapyPriorityQueue'
SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleLifoDiskQueue'
SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.PickleLifoMemoryQueue'

# 处理器设置
CONCURRENT_ITEMS = 200  # 并发处理的项目数

动态并发控制器

# dynamic_concurrency_controller.py - 动态并发控制器
import time
import threading
from collections import defaultdict, deque
from scrapy import signals
from pydispatch import dispatcher
import psutil

class DynamicConcurrencyController:
    """
    动态并发控制器
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.stats = crawler.stats
        self.original_concurrent_requests = crawler.settings.getint('CONCURRENT_REQUESTS', 16)
        self.original_per_domain = crawler.settings.getint('CONCURRENT_REQUESTS_PER_DOMAIN', 8)
        
        # 性能监控
        self.response_times = deque(maxlen=100)
        self.success_counts = deque(maxlen=100)
        self.error_counts = deque(maxlen=100)
        
        # 系统资源监控
        self.cpu_threshold = crawler.settings.getfloat('CONCURRENCY_CPU_THRESHOLD', 80.0)
        self.memory_threshold = crawler.settings.getfloat('CONCURRENCY_MEMORY_THRESHOLD', 80.0)
        
        # 控制参数
        self.adjustment_interval = crawler.settings.getint('CONCURRENCY_ADJUSTMENT_INTERVAL', 30)
        self.last_adjustment = time.time()
        
        # 连接信号
        dispatcher.connect(self.response_received, signal=signals.response_received)
        dispatcher.connect(self.request_dropped, signal=signals.request_dropped)
    
    def response_received(self, response, request, spider):
        """
        响应接收时更新统计信息
        """
        latency = response.meta.get('download_latency', 0)
        if latency > 0:
            self.response_times.append(latency)
        
        success = 200 <= response.status < 400
        if success:
            self.success_counts.append(1)
        else:
            self.error_counts.append(1)
    
    def request_dropped(self, request, response, spider):
        """
        请求被丢弃时更新统计信息
        """
        self.error_counts.append(1)
    
    def should_adjust_concurrency(self) -> bool:
        """
        判断是否应该调整并发数
        """
        return time.time() - self.last_adjustment > self.adjustment_interval
    
    def get_system_load(self) -> dict:
        """
        获取系统负载信息
        """
        cpu_percent = psutil.cpu_percent(interval=1)
        memory_percent = psutil.virtual_memory().percent
        
        return {
            'cpu_percent': cpu_percent,
            'memory_percent': memory_percent,
            'cpu_threshold_exceeded': cpu_percent > self.cpu_threshold,
            'memory_threshold_exceeded': memory_percent > self.memory_threshold
        }
    
    def calculate_optimal_concurrency(self) -> tuple:
        """
        计算最优并发数
        
        Returns:
            (总并发数, 每域名并发数)
        """
        system_load = self.get_system_load()
        
        # 基于系统负载调整
        if system_load['cpu_threshold_exceeded'] or system_load['memory_threshold_exceeded']:
            # 负载过高,降低并发
            factor = 0.7
        else:
            # 负载正常,可以适当增加并发
            factor = 1.2
        
        # 基于成功率调整
        if len(self.success_counts) >= 20:
            recent_success_rate = sum(list(self.success_counts)[-20:]) / 20
            if recent_success_rate < 0.7:  # 成功率低
                factor *= 0.8
            elif recent_success_rate > 0.95:  # 成功率高
                factor *= 1.1
        
        # 基于响应时间调整
        if len(self.response_times) >= 20:
            avg_response_time = sum(list(self.response_times)[-20:]) / 20
            if avg_response_time > 5.0:  # 响应时间过长
                factor *= 0.8
            elif avg_response_time < 1.0:  # 响应时间很短
                factor *= 1.1
        
        # 计算新的并发数
        new_concurrent_requests = int(self.original_concurrent_requests * factor)
        new_per_domain = int(self.original_per_domain * factor)
        
        # 限制在合理范围内
        new_concurrent_requests = max(4, min(new_concurrent_requests, 128))
        new_per_domain = max(2, min(new_per_domain, 32))
        
        return new_concurrent_requests, new_per_domain
    
    def apply_concurrency_adjustment(self, spider):
        """
        应用并发数调整
        """
        if not self.should_adjust_concurrency():
            return
        
        new_total, new_per_domain = self.calculate_optimal_concurrency()
        
        # 更新爬虫设置
        current_total = self.crawler.engine.slot.scheduler.concurrent_queue_count
        current_per_domain = getattr(self.crawler.engine.slot.scheduler, 'per_domain_concurrent', 8)
        
        if new_total != current_total or new_per_domain != current_per_domain:
            spider.logger.info(
                f"Adjusting concurrency: total={new_total}(was {current_total}), "
                f"per_domain={new_per_domain}(was {current_per_domain})"
            )
            
            # 更新统计信息
            self.stats.set_value('concurrency/total', new_total)
            self.stats.set_value('concurrency/per_domain', new_per_domain)
            self.stats.set_value('concurrency/last_adjustment', time.time())
        
        self.last_adjustment = time.time()

class ConcurrencyControlMiddleware:
    """
    并发控制中间件
    """
    
    def __init__(self, crawler):
        self.controller = DynamicConcurrencyController(crawler)
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)
    
    def process_request(self, request, spider):
        """
        处理请求时检查并发控制
        """
        # 应用动态并发调整
        self.controller.apply_concurrency_adjustment(spider)
        
        return None

数据处理优化

数据处理优化对于大规模爬虫的效率至关重要。

高效数据处理管道

# efficient_pipeline.py - 高效数据处理管道
import json
import pickle
import gzip
import hashlib
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
from scrapy.pipelines.files import FilesPipeline
from scrapy.pipelines.images import ImagesPipeline
import logging

class EfficientDataPipeline:
    """
    高效数据处理管道
    """
    
    def __init__(self, settings):
        self.settings = settings
        self.batch_size = settings.getint('PIPELINE_BATCH_SIZE', 100)
        self.batch_buffer = []
        self.dropped_count = 0
        self.processed_count = 0
        self.logger = logging.getLogger(__name__)
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings)
    
    def open_spider(self, spider):
        """
        爬虫开启时的初始化
        """
        self.start_time = time.time()
        self.logger.info("Efficient data pipeline opened")
    
    def close_spider(self, spider):
        """
        爬虫关闭时处理剩余数据
        """
        if self.batch_buffer:
            self._process_batch(spider)
        
        processing_time = time.time() - self.start_time
        self.logger.info(
            f"Pipeline stats - Processed: {self.processed_count}, "
            f"Dropped: {self.dropped_count}, Time: {processing_time:.2f}s"
        )
    
    def process_item(self, item, spider):
        """
        处理单个项目
        """
        adapter = ItemAdapter(item)
        
        # 数据验证和清洗
        if not self._validate_item(adapter):
            self.dropped_count += 1
            raise DropItem(f"Invalid item: {dict(adapter)}")
        
        # 数据标准化
        self._normalize_item(adapter)
        
        # 添加处理元数据
        adapter['processed_at'] = time.time()
        adapter['source_spider'] = spider.name
        
        # 添加到批处理缓冲区
        self.batch_buffer.append(dict(adapter))
        self.processed_count += 1
        
        # 检查是否需要处理批次
        if len(self.batch_buffer) >= self.batch_size:
            self._process_batch(spider)
        
        return item
    
    def _validate_item(self, adapter) -> bool:
        """
        验证项目数据
        """
        # 检查必需字段
        required_fields = self.settings.getlist('REQUIRED_FIELDS', [])
        for field in required_fields:
            if not adapter.get(field):
                return False
        
        # 检查数据长度限制
        max_lengths = self.settings.getdict('FIELD_MAX_LENGTHS', {})
        for field, max_len in max_lengths.items():
            value = adapter.get(field)
            if value and isinstance(value, str) and len(value) > max_len:
                return False
        
        return True
    
    def _normalize_item(self, adapter):
        """
        标准化项目数据
        """
        # 清理文本字段
        for field, value in adapter.items():
            if isinstance(value, str):
                # 去除多余空白
                adapter[field] = ' '.join(value.split())
        
        # 标准化URL
        if 'url' in adapter:
            adapter['url'] = adapter['url'].strip().lower()
    
    def _process_batch(self, spider):
        """
        处理一批数据
        """
        if not self.batch_buffer:
            return
        
        # 批量处理逻辑
        processed_batch = self._batch_process_logic(self.batch_buffer)
        
        # 输出处理结果
        self._output_batch(processed_batch, spider)
        
        # 清空缓冲区
        self.batch_buffer = []
    
    def _batch_process_logic(self, batch):
        """
        批量处理逻辑
        """
        # 这里可以实现具体的批量处理逻辑
        # 如:批量数据库插入、批量文件写入等
        return batch
    
    def _output_batch(self, batch, spider):
        """
        输出批处理结果
        """
        # 根据配置选择输出方式
        output_method = self.settings.get('PIPELINE_OUTPUT_METHOD', 'json')
        
        if output_method == 'json':
            self._output_json(batch, spider)
        elif output_method == 'csv':
            self._output_csv(batch, spider)
        elif output_method == 'database':
            self._output_database(batch, spider)
    
    def _output_json(self, batch, spider):
        """
        JSON格式输出
        """
        filename = f"items_{spider.name}_{int(time.time())}.json.gz"
        
        # 压缩输出
        json_data = json.dumps(batch, ensure_ascii=False, indent=2)
        compressed_data = gzip.compress(json_data.encode('utf-8'))
        
        with open(filename, 'wb') as f:
            f.write(compressed_data)

class DeduplicationPipeline:
    """
    去重处理管道
    """
    
    def __init__(self, settings):
        self.settings = settings
        self.use_redis = settings.getbool('DEDUPLICATION_USE_REDIS', True)
        self.dedup_field = settings.get('DEDUPLICATION_FIELD', 'url')
        
        if self.use_redis:
            import redis
            self.redis_client = redis.from_url(settings.get('REDIS_URL', 'redis://localhost:6379'))
            self.redis_key = settings.get('DEDUPLICATION_REDIS_KEY', 'scrapy:duplicates')
        else:
            self.seen_items = set()
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings)
    
    def process_item(self, item, spider):
        """
        处理去重逻辑
        """
        adapter = ItemAdapter(item)
        item_value = adapter.get(self.dedup_field)
        
        if item_value is None:
            return item  # 没有去重字段,直接通过
        
        # 生成去重标识
        dedup_id = self._generate_dedup_id(item_value)
        
        if self._is_duplicate(dedup_id):
            raise DropItem(f"Duplicate item: {item_value}")
        
        # 标记为已见过
        self._mark_seen(dedup_id)
        
        # 添加去重标识到项目
        adapter['dedup_id'] = dedup_id
        
        return item
    
    def _generate_dedup_id(self, value) -> str:
        """
        生成去重标识
        """
        return hashlib.md5(str(value).encode()).hexdigest()
    
    def _is_duplicate(self, dedup_id: str) -> bool:
        """
        检查是否为重复项
        """
        if self.use_redis:
            return bool(self.redis_client.sismember(self.redis_key, dedup_id))
        else:
            return dedup_id in self.seen_items
    
    def _mark_seen(self, dedup_id: str):
        """
        标记项目为已见过
        """
        if self.use_redis:
            self.redis_client.sadd(self.redis_key, dedup_id)
        else:
            self.seen_items.add(dedup_id)

内存高效的数据处理

# memory_efficient_processing.py - 内存高效数据处理
import io
import csv
from contextlib import contextmanager
from scrapy.utils.serialize import ScrapyJSONEncoder

class MemoryEfficientPipeline:
    """
    内存高效处理管道
    """
    
    def __init__(self, settings):
        self.settings = settings
        self.buffer_size = settings.getint('PROCESSING_BUFFER_SIZE', 1024 * 1024)  # 1MB
        self.temp_storage = settings.get('TEMP_STORAGE_PATH', '/tmp/scrapy_temp')
        self.active_files = {}  # 活跃的临时文件
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings)
    
    @contextmanager
    def _get_temp_file(self, prefix: str):
        """
        获取临时文件上下文管理器
        """
        import tempfile
        temp_file = tempfile.NamedTemporaryFile(
            mode='w+b',
            prefix=f"{prefix}_",
            suffix='.tmp',
            dir=self.temp_storage,
            delete=False
        )
        
        try:
            yield temp_file
        finally:
            temp_file.close()
    
    def process_item(self, item, spider):
        """
        处理项目(内存高效)
        """
        adapter = ItemAdapter(item)
        
        # 序列化项目到字节
        item_bytes = self._serialize_item(adapter)
        
        # 检查是否需要写入临时文件
        if len(item_bytes) > self.buffer_size // 4:  # 单个项目过大
            # 直接写入临时文件
            self._write_to_temp_file(item_bytes, spider.name)
        else:
            # 添加到内存缓冲区
            self._add_to_buffer(item_bytes, spider.name)
        
        return item
    
    def _serialize_item(self, adapter) -> bytes:
        """
        序列化项目为字节
        """
        json_str = ScrapyJSONEncoder().encode(dict(adapter))
        return json_str.encode('utf-8')
    
    def _add_to_buffer(self, item_bytes: bytes, spider_name: str):
        """
        添加到内存缓冲区
        """
        buffer_key = f"{spider_name}_buffer"
        if buffer_key not in self.__dict__:
            self.__dict__[buffer_key] = io.BytesIO()
            self.__dict__[f"{buffer_key}_size"] = 0
        
        buffer = self.__dict__[buffer_key]
        buffer_size = self.__dict__[f"{buffer_key}_size"]
        
        if buffer_size + len(item_bytes) > self.buffer_size:
            # 缓冲区满了,写入文件
            self._flush_buffer(buffer_key, spider_name)
        
        buffer.write(item_bytes)
        buffer.write(b'\n')  # 分隔符
        self.__dict__[f"{buffer_key}_size"] += len(item_bytes) + 1
    
    def _flush_buffer(self, buffer_key: str, spider_name: str):
        """
        刷新缓冲区到文件
        """
        buffer = self.__dict__.pop(buffer_key)
        buffer_size = self.__dict__.pop(f"{buffer_key}_size")
        
        if buffer.tell() > 0:
            buffer.seek(0)
            temp_filename = f"{self.temp_storage}/{spider_name}_{int(time.time())}_{id(buffer)}.tmp"
            
            with open(temp_filename, 'wb') as f:
                while True:
                    chunk = buffer.read(8192)  # 8KB chunks
                    if not chunk:
                        break
                    f.write(chunk)
        
        # 重置缓冲区
        self.__dict__[buffer_key] = io.BytesIO()
        self.__dict__[f"{buffer_key}_size"] = 0
    
    def _write_to_temp_file(self, item_bytes: bytes, spider_name: str):
        """
        直接写入临时文件
        """
        temp_filename = f"{self.temp_storage}/{spider_name}_large_{int(time.time())}_{len(item_bytes)}.tmp"
        with open(temp_filename, 'wb') as f:
            f.write(item_bytes)
    
    def close_spider(self, spider):
        """
        爬虫关闭时处理剩余数据
        """
        # 处理所有剩余的缓冲区
        for attr_name in list(self.__dict__.keys()):
            if attr_name.endswith('_buffer'):
                spider_name = attr_name.replace('_buffer', '')
                self._flush_buffer(attr_name, spider_name)

调度与队列优化

调度和队列优化对于大规模爬虫的性能至关重要。

高性能调度器

# high_performance_scheduler.py - 高性能调度器
from scrapy.core.scheduler import Scheduler
from scrapy.utils.misc import load_object
from scrapy.utils.job import job_dir
from scrapy.utils.deprecate import create_deprecated_class
import pickle
import gzip
import os
from queuelib import PriorityQueue
from collections import deque
import time

class HighPerformanceScheduler(Scheduler):
    """
    高性能调度器
    """
    
    def __init__(self, crawler):
        super().__init__(crawler)
        self.crawler = crawler
        self.stats = crawler.stats
        self.dupefilter = self.df_class.from_settings(crawler.settings)
        self.jobdir = job_dir(crawler.settings)
        
        # 队列配置
        self.queue_cls = load_object(crawler.settings['SCHEDULER_QUEUE_CLASS'])
        self.serializer = load_object(crawler.settings['SCHEDULER_SERIALIZER'])
        
        # 性能优化配置
        self.batch_size = crawler.settings.getint('SCHEDULER_BATCH_SIZE', 10)
        self.prefetch_multiplier = crawler.settings.getint('SCHEDULER_PREFETCH_MULTIPLIER', 3)
        
        # 统计信息
        self.enqueued_count = 0
        self.dequeue_count = 0
        self.start_time = time.time()
        
        # 初始化队列
        self._init_queue()
    
    def _init_queue(self):
        """
        初始化队列
        """
        self.mqs = self.pqclass(self._newmq)
        self.df = self.dupefilter
        if self.jobdir:
            self._restore()
    
    def enqueue_request(self, request):
        """
        入队请求(优化版本)
        """
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        
        dqok = self._dqappend(request)
        if dqok:
            self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)
        else:
            self._mqpush(request)
            self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)
        
        self.stats.inc_value('scheduler/enqueued', spider=self.spider)
        self.enqueued_count += 1
        
        return True
    
    def next_request(self):
        """
        获取下一个请求(优化版本)
        """
        # 批量预取请求以提高效率
        requests = []
        
        for _ in range(self.batch_size):
            request = self._next_request_from_queue()
            if request:
                requests.append(request)
            else:
                break
        
        if requests:
            self.dequeue_count += len(requests)
            self.stats.inc_value('scheduler/dequeued', len(requests), spider=self.spider)
            
            # 返回第一个请求,其余的可以缓存供下次使用
            current_request = requests[0]
            if len(requests) > 1:
                # 这里可以实现请求缓存逻辑
                pass
            
            return current_request
        
        return None
    
    def _next_request_from_queue(self):
        """
        从队列获取下一个请求
        """
        request = super().next_request()
        return request
    
    def has_pending_requests(self):
        """
        检查是否有待处理请求(优化版本)
        """
        result = super().has_pending_requests()
        
        # 更新统计信息
        self.stats.set_value('scheduler/pending_count', self._get_pending_count())
        
        return result
    
    def _get_pending_count(self) -> int:
        """
        获取待处理请求数量
        """
        # 这里可以实现更高效的计数逻辑
        return len(self.mqs) if hasattr(self, 'mqs') else 0
    
    def close(self, reason):
        """
        关闭调度器
        """
        if self.jobdir:
            self._persist()
        
        # 输出性能统计
        total_time = time.time() - self.start_time
        self.stats.set_value('scheduler/enqueued_count', self.enqueued_count)
        self.stats.set_value('scheduler/dequeued_count', self.dequeue_count)
        self.stats.set_value('scheduler/efficiency_ratio', 
                           self.dequeue_count / max(self.enqueued_count, 1))
        self.stats.set_value('scheduler/processing_time', total_time)
        
        return super().close(reason)

class PriorityBasedScheduler(HighPerformanceScheduler):
    """
    基于优先级的调度器
    """
    
    def __init__(self, crawler):
        super().__init__(crawler)
        self.priority_weights = crawler.settings.getdict('PRIORITY_WEIGHTS', {
            'high': 10,
            'medium': 5, 
            'low': 1
        })
    
    def enqueue_request(self, request):
        """
        根据优先级入队请求
        """
        # 计算优先级
        priority_factor = self._calculate_priority(request)
        request.priority = request.priority * priority_factor
        
        return super().enqueue_request(request)
    
    def _calculate_priority(self, request) -> float:
        """
        计算请求优先级因子
        """
        # 根据URL模式设置优先级
        url = request.url.lower()
        
        if any(keyword in url for keyword in ['urgent', 'priority', 'important']):
            return self.priority_weights.get('high', 10)
        elif any(keyword in url for keyword in ['normal', 'standard']):
            return self.priority_weights.get('medium', 5)
        else:
            return self.priority_weights.get('low', 1)

队列性能监控

# queue_monitor.py - 队列监控器
import time
from collections import deque
from scrapy import signals
from pydispatch import dispatcher
import logging

class QueuePerformanceMonitor:
    """
    队列性能监控器
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.stats = crawler.stats
        self.logger = logging.getLogger(__name__)
        
        # 性能指标
        self.enqueue_times = deque(maxlen=1000)
        self.dequeue_times = deque(maxlen=1000)
        self.queue_sizes = deque(maxlen=100)
        
        # 统计信息
        self.enqueued_count = 0
        self.dequeued_count = 0
        self.failed_count = 0
        
        # 连接信号
        dispatcher.connect(self.request_enqueued, signal=signals.request_enqueued)
        dispatcher.connect(self.request_dequeued, signal=signals.request_dequeued)
        dispatcher.connect(self.request_dropped, signal=signals.request_dropped)
    
    def request_enqueued(self, request, response, spider):
        """
        请求入队时的监控
        """
        self.enqueued_count += 1
        self.enqueue_times.append(time.time())
        
        # 更新统计
        self.stats.inc_value('queue_monitor/enqueued', spider=spider)
        self._update_queue_stats(spider)
    
    def request_dequeued(self, request, spider):
        """
        请求出队时的监控
        """
        self.dequeued_count += 1
        self.dequeue_times.append(time.time())
        
        # 更新统计
        self.stats.inc_value('queue_monitor/dequeued', spider=spider)
        self._update_queue_stats(spider)
    
    def request_dropped(self, request, response, spider):
        """
        请求被丢弃时的监控
        """
        self.failed_count += 1
        
        # 更新统计
        self.stats.inc_value('queue_monitor/failed', spider=spider)
        self._update_queue_stats(spider)
    
    def _update_queue_stats(self, spider):
        """
        更新队列统计信息
        """
        # 获取队列大小(需要访问调度器)
        if hasattr(self.crawler.engine, 'slot') and self.crawler.engine.slot:
            scheduler = self.crawler.engine.slot.scheduler
            if hasattr(scheduler, 'has_pending_requests'):
                pending = scheduler.has_pending_requests()
                self.stats.set_value('queue_monitor/pending', pending, spider=spider)
        
        # 计算性能指标
        if self.enqueue_times and self.dequeue_times:
            current_time = time.time()
            
            # 计算请求速率
            recent_enqueued = sum(1 for t in self.enqueue_times if current_time - t < 60)
            recent_dequeued = sum(1 for t in self.dequeue_times if current_time - t < 60)
            
            self.stats.set_value('queue_monitor/rate/enqueued_per_minute', recent_enqueued, spider=spider)
            self.stats.set_value('queue_monitor/rate/dequeued_per_minute', recent_dequeued, spider=spider)
    
    def get_performance_report(self) -> dict:
        """
        获取性能报告
        """
        current_time = time.time()
        
        return {
            'enqueued_count': self.enqueued_count,
            'dequeued_count': self.dequeued_count,
            'failed_count': self.failed_count,
            'success_rate': self.dequeued_count / max(self.enqueued_count, 1),
            'average_enqueue_rate': len(self.enqueue_times) / 60 if self.enqueue_times else 0,
            'average_dequeue_rate': len(self.dequeue_times) / 60 if self.dequeue_times else 0,
            'uptime_minutes': (current_time - self.crawler.stats.start_time.timestamp()) / 60
        }

class QueueOptimizationMiddleware:
    """
    队列优化中间件
    """
    
    def __init__(self, crawler):
        self.monitor = QueuePerformanceMonitor(crawler)
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)
    
    def process_spider_output(self, response, result, spider):
        """
        处理蜘蛛输出时优化队列
        """
        for item_or_request in result:
            if hasattr(item_or_request, 'url'):  # Request object
                # 根据响应情况调整请求优先级
                if response.status == 429:  # Too Many Requests
                    # 降低优先级,延后处理
                    item_or_request.priority -= 10
                elif response.status == 200:  # Success
                    # 可以提高相关请求的优先级
                    pass
            
            yield item_or_request

监控与诊断

完善的监控体系是大规模爬虫稳定运行的保障。

系统监控组件

# system_monitor.py - 系统监控组件
import psutil
import time
import logging
from collections import deque
from scrapy import signals
from pydispatch import dispatcher
from prometheus_client import Counter, Gauge, Histogram, start_http_server

# Prometheus指标定义
REQUESTS_TOTAL = Counter('scrapy_requests_total', 'Total requests processed', ['spider', 'status'])
ITEMS_SCRAPED = Counter('scrapy_items_scraped_total', 'Total items scraped', ['spider'])
RESPONSE_TIME = Histogram('scrapy_response_time_seconds', 'Response time in seconds', ['spider'])
MEMORY_USAGE = Gauge('scrapy_memory_usage_bytes', 'Memory usage in bytes', ['spider'])
CPU_USAGE = Gauge('scrapy_cpu_usage_percent', 'CPU usage percentage', ['spider'])

class SystemMonitor:
    """
    系统监控器
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.stats = crawler.stats
        self.logger = logging.getLogger(__name__)
        
        # 系统资源监控
        self.process = psutil.Process()
        self.cpu_samples = deque(maxlen=100)
        self.memory_samples = deque(maxlen=100)
        
        # 性能指标
        self.start_time = time.time()
        self.last_sample_time = time.time()
        
        # Prometheus监控
        self.enable_prometheus = crawler.settings.getbool('MONITOR_PROMETHEUS_ENABLED', False)
        if self.enable_prometheus:
            prometheus_port = crawler.settings.getint('MONITOR_PROMETHEUS_PORT', 8000)
            start_http_server(prometheus_port)
        
        # 连接信号
        dispatcher.connect(self.spider_opened, signal=signals.spider_opened)
        dispatcher.connect(self.spider_closed, signal=signals.spider_closed)
        dispatcher.connect(self.response_received, signal=signals.response_received)
        dispatcher.connect(self.item_scraped, signal=signals.item_scraped)
    
    def spider_opened(self, spider):
        """
        爬虫开启监控
        """
        self.logger.info(f"System monitoring started for spider: {spider.name}")
        spider.crawler.stats.set_value('monitor/start_time', time.time())
    
    def spider_closed(self, spider, reason):
        """
        爬虫关闭监控
        """
        total_time = time.time() - self.start_time
        self.logger.info(f"System monitoring stopped for spider: {spider.name}, runtime: {total_time:.2f}s")
        
        # 输出最终统计
        self._log_final_stats(spider)
    
    def response_received(self, response, request, spider):
        """
        响应接收监控
        """
        # 记录响应时间
        download_latency = response.meta.get('download_latency', 0)
        if download_latency > 0:
            RESPONSE_TIME.labels(spider=spider.name).observe(download_latency)
        
        # 记录请求状态
        REQUESTS_TOTAL.labels(spider=spider.name, status=response.status).inc()
        
        # 采样系统资源
        self._sample_resources(spider)
    
    def item_scraped(self, item, response, spider):
        """
        项目抓取监控
        """
        ITEMS_SCRAPED.labels(spider=spider.name).inc()
        
        # 采样系统资源
        self._sample_resources(spider)
    
    def _sample_resources(self, spider):
        """
        采样系统资源使用情况
        """
        current_time = time.time()
        
        # 每秒采样一次
        if current_time - self.last_sample_time >= 1.0:
            try:
                # CPU使用率
                cpu_percent = self.process.cpu_percent()
                self.cpu_samples.append(cpu_percent)
                CPU_USAGE.labels(spider=spider.name).set(cpu_percent)
                
                # 内存使用
                memory_info = self.process.memory_info()
                self.memory_samples.append(memory_info.rss)
                MEMORY_USAGE.labels(spider=spider.name).set(memory_info.rss)
                
                # 更新统计数据
                spider.crawler.stats.set_value('monitor/cpu_percent', cpu_percent)
                spider.crawler.stats.set_value('monitor/memory_rss', memory_info.rss)
                
                self.last_sample_time = current_time
                
            except Exception as e:
                self.logger.error(f"Error sampling resources: {e}")
    
    def _log_final_stats(self, spider):
        """
        记录最终统计信息
        """
        if self.cpu_samples:
            avg_cpu = sum(self.cpu_samples) / len(self.cpu_samples)
            max_cpu = max(self.cpu_samples) if self.cpu_samples else 0
            spider.crawler.stats.set_value('monitor/avg_cpu_percent', avg_cpu)
            spider.crawler.stats.set_value('monitor/max_cpu_percent', max_cpu)
        
        if self.memory_samples:
            avg_memory = sum(self.memory_samples) / len(self.memory_samples)
            max_memory = max(self.memory_samples) if self.memory_samples else 0
            spider.crawler.stats.set_value('monitor/avg_memory_bytes', avg_memory)
            spider.crawler.stats.set_value('monitor/max_memory_bytes', max_memory)
    
    def get_current_status(self) -> dict:
        """
        获取当前状态
        """
        try:
            cpu_percent = self.process.cpu_percent()
            memory_info = self.process.memory_info()
            
            return {
                'cpu_percent': cpu_percent,
                'memory_rss': memory_info.rss,
                'memory_vms': memory_info.vms,
                'num_threads': self.process.num_threads(),
                'connections': len(self.process.connections()),
                'open_files': len(self.process.open_files()) if self.process.is_running() else 0,
                'uptime': time.time() - self.start_time
            }
        except Exception as e:
            self.logger.error(f"Error getting current status: {e}")
            return {}

class DetailedStatsCollector:
    """
    详细统计收集器
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.stats = crawler.stats
        
        # 详细统计指标
        self.response_status_counts = {}
        self.domain_request_counts = {}
        self.download_sizes = deque(maxlen=1000)
        self.processing_times = deque(maxlen=1000)
    
    def collect_response_stats(self, response, request):
        """
        收集响应统计
        """
        # 状态码统计
        status = response.status
        self.response_status_counts[status] = self.response_status_counts.get(status, 0) + 1
        
        # 域名请求统计
        from urllib.parse import urlparse
        domain = urlparse(request.url).netloc
        self.domain_request_counts[domain] = self.domain_request_counts.get(domain, 0) + 1
        
        # 下载大小统计
        content_length = len(response.body)
        self.download_sizes.append(content_length)
        
        # 更新统计数据
        self.stats.set_value(f'response_stats/status/{status}', 
                           self.response_status_counts[status])
        self.stats.set_value(f'domain_stats/{domain}', 
                           self.domain_request_counts[domain])
        self.stats.set_value('download/avg_size', 
                           sum(self.download_sizes) / len(self.download_sizes) if self.download_sizes else 0)
    
    def collect_processing_stats(self, start_time, end_time):
        """
        收集处理时间统计
        """
        processing_time = end_time - start_time
        self.processing_times.append(processing_time)
        
        avg_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0
        self.stats.set_value('processing/avg_time', avg_time)
        self.stats.set_value('processing/current_time', processing_time)
    
    def get_detailed_report(self) -> dict:
        """
        获取详细报告
        """
        return {
            'response_statuses': dict(self.response_status_counts),
            'domain_requests': dict(self.domain_request_counts),
            'avg_download_size': sum(self.download_sizes) / len(self.download_sizes) if self.download_sizes else 0,
            'max_download_size': max(self.download_sizes) if self.download_sizes else 0,
            'avg_processing_time': sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0,
            'total_requests': sum(self.response_status_counts.values()),
            'successful_requests': sum(v for k, v in self.response_status_counts.items() if 200 <= k < 400)
        }

告警与通知系统

# alert_system.py - 告警系统
import smtplib
import json
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
import logging

class AlertSystem:
    """
    告警系统
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.settings = crawler.settings
        self.logger = logging.getLogger(__name__)
        
        # 告警配置
        self.email_enabled = settings.getbool('ALERT_EMAIL_ENABLED', False)
        self.email_host = settings.get('ALERT_EMAIL_HOST', 'localhost')
        self.email_port = settings.getint('ALERT_EMAIL_PORT', 587)
        self.email_user = settings.get('ALERT_EMAIL_USER')
        self.email_password = settings.get('ALERT_EMAIL_PASSWORD')
        self.alert_recipients = settings.getlist('ALERT_RECIPIENTS', [])
        
        # 告警阈值
        self.error_rate_threshold = settings.getfloat('ALERT_ERROR_RATE_THRESHOLD', 0.1)  # 10%错误率
        self.slow_response_threshold = settings.getfloat('ALERT_SLOW_RESPONSE_THRESHOLD', 10.0)  # 10秒
        self.memory_usage_threshold = settings.getfloat('ALERT_MEMORY_USAGE_THRESHOLD', 90.0)  # 90%内存使用
        self.cpu_usage_threshold = settings.getfloat('ALERT_CPU_USAGE_THRESHOLD', 90.0)  # 90% CPU使用
        
        # 告警历史
        self.alert_history = {}
        self.suppression_window = settings.getint('ALERT_SUPPRESSION_WINDOW', 300)  # 5分钟抑制窗口
    
    def check_and_alert(self, spider, current_stats: dict):
        """
        检查指标并发送告警
        """
        alerts = []
        
        # 检查错误率
        error_rate = self._calculate_error_rate(current_stats)
        if error_rate > self.error_rate_threshold:
            alerts.append({
                'type': 'high_error_rate',
                'message': f'High error rate detected: {error_rate:.2%}',
                'severity': 'HIGH',
                'value': error_rate
            })
        
        # 检查响应时间
        avg_response_time = current_stats.get('avg_response_time', 0)
        if avg_response_time > self.slow_response_threshold:
            alerts.append({
                'type': 'slow_responses',
                'message': f'Slow average response time: {avg_response_time:.2f}s',
                'severity': 'MEDIUM',
                'value': avg_response_time
            })
        
        # 检查内存使用
        memory_percent = current_stats.get('memory_percent', 0)
        if memory_percent > self.memory_usage_threshold:
            alerts.append({
                'type': 'high_memory',
                'message': f'High memory usage: {memory_percent:.1f}%',
                'severity': 'HIGH',
                'value': memory_percent
            })
        
        # 检查CPU使用
        cpu_percent = current_stats.get('cpu_percent', 0)
        if cpu_percent > self.cpu_usage_threshold:
            alerts.append({
                'type': 'high_cpu',
                'message': f'High CPU usage: {cpu_percent:.1f}%',
                'severity': 'MEDIUM',
                'value': cpu_percent
            })
        
        # 发送告警
        for alert in alerts:
            self._send_alert(spider, alert)
    
    def _calculate_error_rate(self, stats: dict) -> float:
        """
        计算错误率
        """
        total_requests = stats.get('scheduler/enqueued', 0)
        if total_requests == 0:
            return 0.0
        
        error_count = stats.get('downloader/exception_count', 0)
        return error_count / total_requests
    
    def _should_send_alert(self, alert_type: str) -> bool:
        """
        检查是否应该发送告警(避免重复告警)
        """
        current_time = time.time()
        last_alert_time = self.alert_history.get(alert_type, 0)
        
        return current_time - last_alert_time > self.suppression_window
    
    def _send_alert(self, spider, alert: dict):
        """
        发送告警
        """
        alert_type = alert['type']
        
        if not self._should_send_alert(alert_type):
            return
        
        # 更新告警历史
        self.alert_history[alert_type] = time.time()
        
        # 构建告警消息
        subject = f"[SCRAZY ALERT] {alert['type'].upper()} - Spider: {spider.name}"
        message = self._build_alert_message(spider, alert)
        
        # 发送告警
        if self.email_enabled:
            self._send_email_alert(subject, message)
        
        # 记录日志
        self.logger.warning(f"Alert sent: {alert['message']}")
    
    def _build_alert_message(self, spider, alert: dict) -> str:
        """
        构建告警消息
        """
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        
        message = f"""
Scrapy Spider Alert

Time: {current_time}
Spider: {spider.name}
Alert Type: {alert['type']}
Severity: {alert['severity']}
Message: {alert['message']}
Value: {alert['value']}

Current Stats:
- Requests Enqueued: {self.crawler.stats.get_value('scheduler/enqueued', 0)}
- Items Scraped: {self.crawler.stats.get_value('item_scraped_count', 0)}
- Error Count: {self.crawler.stats.get_value('downloader/exception_count', 0)}
- Memory Usage: {self.crawler.stats.get_value('monitor/memory_percent', 0):.1f}%
- CPU Usage: {self.crawler.stats.get_value('monitor/cpu_percent', 0):.1f}%

This is an automated alert from your Scrapy monitoring system.
        """
        
        return message
    
    def _send_email_alert(self, subject: str, message: str):
        """
        发送邮件告警
        """
        if not self.alert_recipients:
            self.logger.warning("No alert recipients configured")
            return
        
        try:
            msg = MIMEMultipart()
            msg['From'] = self.email_user
            msg['To'] = ', '.join(self.alert_recipients)
            msg['Subject'] = subject
            
            msg.attach(MIMEText(message, 'plain', 'utf-8'))
            
            server = smtplib.SMTP(self.email_host, self.email_port)
            server.starttls()
            server.login(self.email_user, self.email_password)
            
            text = msg.as_string()
            server.sendmail(self.email_user, self.alert_recipients, text)
            server.quit()
            
            self.logger.info(f"Alert email sent to: {self.alert_recipients}")
            
        except Exception as e:
            self.logger.error(f"Failed to send alert email: {e}")

class HealthCheckMiddleware:
    """
    健康检查中间件
    """
    
    def __init__(self, crawler):
        self.alert_system = AlertSystem(crawler)
        self.check_interval = crawler.settings.getint('HEALTH_CHECK_INTERVAL', 60)  # 60秒检查一次
        self.last_check_time = time.time()
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)
    
    def process_spider_output(self, response, result, spider):
        """
        处理蜘蛛输出时进行健康检查
        """
        current_time = time.time()
        
        # 定期进行健康检查
        if current_time - self.last_check_time > self.check_interval:
            current_stats = dict(spider.crawler.stats.get_stats())
            self.alert_system.check_and_alert(spider, current_stats)
            self.last_check_time = current_time
        
        for item_or_request in result:
            yield item_or_request

故障恢复与容错

健壮的容错机制是大规模爬虫系统可靠性的基础。

容错处理机制

# fault_tolerance.py - 容错处理机制
import time
import random
import logging
from scrapy.exceptions import IgnoreRequest, DropItem
from scrapy.http import Request
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.task import deferLater

class FaultToleranceMiddleware:
    """
    容错中间件
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.settings = crawler.settings
        self.logger = logging.getLogger(__name__)
        
        # 重试配置
        self.max_retries = settings.getint('FAULT_TOLERANCE_MAX_RETRIES', 5)
        self.retry_http_codes = set(settings.getlist('FAULT_TOLERANCE_RETRY_CODES', 
                                                   [500, 502, 503, 504, 408, 429]))
        self.retry_factor = settings.getfloat('FAULT_TOLERANCE_RETRY_FACTOR', 1.5)
        self.retry_max_delay = settings.getfloat('FAULT_TOLERANCE_RETRY_MAX_DELAY', 60)
        
        # 熔断器配置
        self.circuit_breaker_enabled = settings.getbool('FAULT_TOLERANCE_CIRCUIT_BREAKER', True)
        self.failure_threshold = settings.getint('FAULT_TOLERANCE_FAILURE_THRESHOLD', 5)
        self.reset_timeout = settings.getint('FAULT_TOLERANCE_RESET_TIMEOUT', 60)
        
        # 域名级别熔断器
        self.domain_circuit_breakers = {}
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)
    
    def process_request(self, request, spider):
        """
        处理请求时的容错检查
        """
        # 检查熔断器状态
        if self.circuit_breaker_enabled:
            domain = self._get_domain(request.url)
            circuit_breaker = self._get_circuit_breaker(domain)
            
            if circuit_breaker.is_open():
                # 熔断器打开,跳过请求
                spider.logger.warning(f"Circuit breaker open for domain {domain}, skipping request: {request.url}")
                raise IgnoreRequest(f"Circuit breaker open for {domain}")
        
        # 添加重试元数据
        if 'fault_tolerance_retry_times' not in request.meta:
            request.meta['fault_tolerance_retry_times'] = 0
            request.meta['fault_tolerance_original_url'] = request.url
        
        return None
    
    def process_response(self, request, response, spider):
        """
        处理响应时的容错逻辑
        """
        domain = self._get_domain(request.url)
        
        # 检查是否需要重试
        if response.status in self.retry_http_codes:
            retry_times = request.meta.get('fault_tolerance_retry_times', 0)
            
            if retry_times < self.max_retries:
                # 计算重试延迟
                delay = self._calculate_retry_delay(retry_times)
                
                spider.logger.warning(
                    f"Retrying {request.url} after {response.status} (attempt {retry_times + 1}/{self.max_retries}), "
                    f"delay: {delay}s"
                )
                
                # 标记域名为失败(更新熔断器)
                self._record_failure(domain)
                
                # 创建重试请求
                retry_req = request.copy()
                retry_req.meta['fault_tolerance_retry_times'] = retry_times + 1
                
                # 添加重试延迟
                reactor.callLater(delay, lambda: self._schedule_retry(spider, retry_req))
                
                return response.replace(status=200)  # 防止进一步处理原始响应
            else:
                spider.logger.error(f"Max retries exceeded for {request.url}")
        
        # 成功响应,关闭熔断器(如果需要)
        self._record_success(domain)
        
        return response
    
    def process_exception(self, request, exception, spider):
        """
        处理异常时的容错逻辑
        """
        domain = self._get_domain(request.url)
        
        # 记录失败
        self._record_failure(domain)
        
        retry_times = request.meta.get('fault_tolerance_retry_times', 0)
        
        if retry_times < self.max_retries:
            delay = self._calculate_retry_delay(retry_times)
            
            spider.logger.warning(
                f"Retrying {request.url} after exception {type(exception).__name__} "
                f"(attempt {retry_times + 1}/{self.max_retries}), delay: {delay}s"
            )
            
            # 创建重试请求
            retry_req = request.copy()
            retry_req.meta['fault_tolerance_retry_times'] = retry_times + 1
            
            # 添加重试延迟
            reactor.callLater(delay, lambda: self._schedule_retry(spider, retry_req))
            
            return response.replace(status=200)  # 返回占位响应
        
        # 达到最大重试次数,记录错误
        spider.logger.error(f"Max retries exceeded for {request.url} due to {exception}")
        
        return None
    
    def _calculate_retry_delay(self, retry_times: int) -> float:
        """
        计算重试延迟时间(指数退避)
        """
        base_delay = 1.0
        delay = base_delay * (self.retry_factor ** retry_times)
        return min(delay, self.retry_max_delay)
    
    def _schedule_retry(self, spider, request):
        """
        调度重试请求
        """
        spider.crawler.engine.schedule(request, spider)
    
    def _get_domain(self, url: str) -> str:
        """
        从URL获取域名
        """
        from urllib.parse import urlparse
        parsed = urlparse(url)
        return parsed.netloc
    
    def _get_circuit_breaker(self, domain: str):
        """
        获取域名的熔断器
        """
        if domain not in self.domain_circuit_breakers:
            self.domain_circuit_breakers[domain] = CircuitBreaker(
                failure_threshold=self.failure_threshold,
                reset_timeout=self.reset_timeout
            )
        return self.domain_circuit_breakers[domain]
    
    def _record_failure(self, domain: str):
        """
        记录失败
        """
        if self.circuit_breaker_enabled:
            circuit_breaker = self._get_circuit_breaker(domain)
            circuit_breaker.record_failure()

class CircuitBreaker:
    """
    熔断器实现
    """
    
    def __init__(self, failure_threshold: int = 5, reset_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
        self.last_attempt_time = None
    
    def is_open(self) -> bool:
        """
        检查熔断器是否打开
        """
        if self.state == 'OPEN':
            # 检查重置超时
            if time.time() - self.last_failure_time >= self.reset_timeout:
                self.state = 'HALF_OPEN'
                self.last_attempt_time = time.time()
                return False  # 半开状态,允许一次尝试
            return True
        elif self.state == 'HALF_OPEN':
            # 半开状态下,如果尝试间隔太短,仍然返回True
            if (time.time() - self.last_attempt_time) < 5:  # 5秒内不允许重复尝试
                return True
            return False
        else:  # CLOSED
            return False
    
    def record_failure(self):
        """
        记录失败
        """
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = 'OPEN'
    
    def record_success(self):
        """
        记录成功
        """
        self.failure_count = 0
        self.state = 'CLOSED'
    
    def allow_request(self) -> bool:
        """
        是否允许请求
        """
        return not self.is_open()

class CheckpointManager:
    """
    检查点管理器 - 实现断点续传功能
    """
    
    def __init__(self, checkpoint_dir: str = './checkpoints'):
        self.checkpoint_dir = checkpoint_dir
        self.checkpoint_file = os.path.join(checkpoint_dir, 'checkpoint.json')
        self.ensure_checkpoint_dir()
    
    def ensure_checkpoint_dir(self):
        """
        确保检查点目录存在
        """
        os.makedirs(self.checkpoint_dir, exist_ok=True)
    
    def save_checkpoint(self, spider_name: str, stats: dict, queue_state: dict = None):
        """
        保存检查点
        """
        checkpoint_data = {
            'spider_name': spider_name,
            'timestamp': time.time(),
            'stats': dict(stats),
            'queue_state': queue_state or {},
            'version': '1.0'
        }
        
        try:
            with open(self.checkpoint_file, 'w', encoding='utf-8') as f:
                json.dump(checkpoint_data, f, ensure_ascii=False, indent=2)
        except Exception as e:
            logging.error(f"Failed to save checkpoint: {e}")
    
    def load_checkpoint(self) -> dict:
        """
        加载检查点
        """
        if not os.path.exists(self.checkpoint_file):
            return {}
        
        try:
            with open(self.checkpoint_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except Exception as e:
            logging.error(f"Failed to load checkpoint: {e}")
            return {}
    
    def has_valid_checkpoint(self, max_age_hours: int = 24) -> bool:
        """
        检查是否有有效的检查点
        """
        checkpoint = self.load_checkpoint()
        if not checkpoint:
            return False
        
        age_hours = (time.time() - checkpoint.get('timestamp', 0)) / 3600
        return age_hours <= max_age_hours

class FaultTolerantSpider:
    """
    容错爬虫基类
    """
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.checkpoint_manager = CheckpointManager()
        self.restart_count = 0
        self.max_restarts = 5
    
    def start_requests(self):
        """
        开始请求 - 支持断点续传
        """
        # 尝试从检查点恢复
        checkpoint = self.checkpoint_manager.load_checkpoint()
        
        if (checkpoint and 
            checkpoint.get('spider_name') == self.name and
            self.checkpoint_manager.has_valid_checkpoint()):
            
            # 从检查点恢复
            queue_state = checkpoint.get('queue_state', {})
            logging.info(f"Resuming from checkpoint: {checkpoint}")
            
            # 这里可以根据具体需求实现从检查点恢复逻辑
            for request in self._resume_from_checkpoint(queue_state):
                yield request
        else:
            # 正常开始请求
            for request in self._initial_requests():
                yield request
    
    def _resume_from_checkpoint(self, queue_state):
        """
        从检查点恢复请求
        """
        # 子类需要实现具体的恢复逻辑
        pass
    
    def _initial_requests(self):
        """
        初始请求
        """
        # 子类需要实现具体的初始请求逻辑
        pass
    
    def closed(self, reason):
        """
        爬虫关闭时保存检查点
        """
        if hasattr(self, 'crawler'):
            stats = dict(self.crawler.stats.get_stats())
            queue_state = self._get_queue_state()
            
            self.checkpoint_manager.save_checkpoint(self.name, stats, queue_state)
    
    def _get_queue_state(self) -> dict:
        """
        获取队列状态
        """
        # 这里需要根据具体调度器实现
        if hasattr(self.crawler.engine, 'slot') and self.crawler.engine.slot:
            scheduler = self.crawler.engine.slot.scheduler
            # 获取待处理请求等状态信息
            return {
                'pending_requests': getattr(scheduler, 'has_pending_requests', lambda: 0)()
            }
        return {}

## 性能调优实战 \{#性能调优实战}

在实际的大规模爬虫项目中,我们需要综合运用各种优化技术来达到最佳性能。

### 性能基准测试

```python
# performance_benchmark.py - 性能基准测试
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from scrapy.crawler import CrawlerRunner
from scrapy.utils.project import get_project_settings

class PerformanceBenchmark:
    """
    性能基准测试工具
    """
    
    def __init__(self):
        self.results = []
        self.test_results = {}
    
    def benchmark_single_spider(self, spider_class, urls: list, duration: int = 60):
        """
        测试单个爬虫性能
        
        Args:
            spider_class: 爬虫类
            urls: 测试URL列表
            duration: 测试持续时间(秒)
        """
        start_time = time.time()
        result = {
            'spider': spider_class.__name__,
            'urls_count': len(urls),
            'duration': duration,
            'requests_sent': 0,
            'responses_received': 0,
            'items_scraped': 0,
            'error_count': 0,
            'avg_response_time': 0,
            'requests_per_second': 0,
            'items_per_second': 0
        }
        
        # 这里需要实现具体的基准测试逻辑
        # 通常需要创建一个特殊的测试环境
        
        end_time = time.time()
        actual_duration = end_time - start_time
        
        result['actual_duration'] = actual_duration
        result['requests_per_second'] = result['requests_sent'] / actual_duration if actual_duration > 0 else 0
        result['items_per_second'] = result['items_scraped'] / actual_duration if actual_duration > 0 else 0
        
        self.results.append(result)
        return result
    
    def benchmark_concurrent_spiders(self, spider_classes: list, url_sets: list):
        """
        测试并发爬虫性能
        
        Args:
            spider_classes: 爬虫类列表
            url_sets: URL集合列表,每个爬虫对应一组URL
        """
        results = []
        
        with ThreadPoolExecutor(max_workers=len(spider_classes)) as executor:
            futures = []
            
            for spider_class, urls in zip(spider_classes, url_sets):
                future = executor.submit(self.benchmark_single_spider, spider_class, urls)
                futures.append(future)
            
            for future in futures:
                result = future.result()
                results.append(result)
        
        return results
    
    def compare_configurations(self, configs: list, spider_class, test_urls: list):
        """
        比较不同配置的性能
        
        Args:
            configs: 配置列表
            spider_class: 测试爬虫类
            test_urls: 测试URL列表
        """
        comparison_results = []
        
        for i, config in enumerate(configs):
            print(f"Testing configuration {i+1}/{len(configs)}")
            
            # 应用配置并测试
            result = self.benchmark_single_spider(spider_class, test_urls)
            result['configuration'] = f"config_{i+1}"
            result['settings'] = config
            
            comparison_results.append(result)
        
        # 分析比较结果
        self.analyze_comparison_results(comparison_results)
        
        return comparison_results
    
    def analyze_comparison_results(self, results: list):
        """
        分析比较结果
        """
        if not results:
            return
        
        print("\n=== 性能比较分析 ===")
        
        # 按请求速率排序
        sorted_by_rps = sorted(results, key=lambda x: x['requests_per_second'], reverse=True)
        print("\n按请求速率排序:")
        for i, result in enumerate(sorted_by_rps[:3]):  # 显示前三名
            print(f"{i+1}. {result['configuration']}: {result['requests_per_second']:.2f} RPS")
        
        # 按项目速率排序
        sorted_by_ips = sorted(results, key=lambda x: x['items_per_second'], reverse=True)
        print("\n按项目速率排序:")
        for i, result in enumerate(sorted_by_ips[:3]):  # 显示前三名
            print(f"{i+1}. {result['configuration']}: {result['items_per_second']:.2f} IPS")
        
        # 按错误率排序
        sorted_by_error_rate = sorted(results, 
                                    key=lambda x: x['error_count'] / max(x['responses_received'], 1), 
                                    reverse=False)
        print("\n按错误率排序(越低越好):")
        for i, result in enumerate(sorted_by_error_rate[:3]):  # 显示前三名
            error_rate = (result['error_count'] / max(result['responses_received'], 1)) * 100
            print(f"{i+1}. {result['configuration']}: {error_rate:.2f}% errors")

class OptimizationRecommendationEngine:
    """
    优化建议引擎
    """
    
    def __init__(self):
        self.knowledge_base = self._build_knowledge_base()
    
    def _build_knowledge_base(self) -> dict:
        """
        构建知识库
        """
        return {
            'high_memory_usage': {
                'symptoms': ['memory_usage > 80%', 'frequent_gc', 'slow_processing'],
                'causes': ['large_response_handling', 'item_accumulation', 'poor_caching'],
                'solutions': [
                    'enable_response_compression',
                    'implement_streaming_processing',
                    'reduce_concurrent_items',
                    'use_disk_queues'
                ]
            },
            'low_throughput': {
                'symptoms': ['rps < threshold', 'idle_time_high', 'bandwidth_underutilized'],
                'causes': ['excessive_delays', 'connection_pool_limitations', 'rate_limiting'],
                'solutions': [
                    'increase_concurrent_requests',
                    'optimize_network_settings',
                    'implement_connection_pooling',
                    'adjust_autothrottle_settings'
                ]
            },
            'high_error_rate': {
                'symptoms': ['error_rate > 5%', 'frequent_timeouts', 'blocked_by_antibot'],
                'causes': ['aggressive_settings', 'insufficient_delay', 'missing_headers'],
                'solutions': [
                    'increase_download_delay',
                    'rotate_user_agents',
                    'implement_proxy_rotation',
                    'add_retry_middleware'
                ]
            }
        }
    
    def analyze_performance_data(self, stats: dict) -> list:
        """
        分析性能数据并给出建议
        
        Args:
            stats: 性能统计数据
            
        Returns:
            优化建议列表
        """
        recommendations = []
        
        # 检查内存使用
        memory_percent = stats.get('monitor/memory_percent', 0)
        if memory_percent > 80:
            recommendations.append({
                'issue': 'high_memory_usage',
                'severity': 'HIGH',
                'description': f'High memory usage detected: {memory_percent:.1f}%',
                'suggestions': self.knowledge_base['high_memory_usage']['solutions']
            })
        
        # 检查吞吐量
        rps = stats.get('queue_monitor/rate/enqueued_per_minute', 0) / 60
        if rps < 10:  # 假设阈值为10 RPS
            recommendations.append({
                'issue': 'low_throughput',
                'severity': 'MEDIUM',
                'description': f'Low throughput detected: {rps:.2f} RPS',
                'suggestions': self.knowledge_base['low_throughput']['solutions']
            })
        
        # 检查错误率
        total_requests = stats.get('scheduler/enqueued', 1)
        error_count = stats.get('downloader/exception_count', 0)
        error_rate = error_count / total_requests
        
        if error_rate > 0.05:  # 5%错误率阈值
            recommendations.append({
                'issue': 'high_error_rate',
                'severity': 'HIGH',
                'description': f'High error rate detected: {error_rate:.2%}',
                'suggestions': self.knowledge_base['high_error_rate']['solutions']
            })
        
        return recommendations
    
    def generate_optimization_report(self, spider_name: str, stats: dict) -> str:
        """
        生成优化报告
        """
        recommendations = self.analyze_performance_data(stats)
        
        report = f"""
# 性能优化报告 - {spider_name}

## 概述
- 分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- 总请求数: {stats.get('scheduler/enqueued', 0)}
- 成功响应: {stats.get('response_received_count', 0)}
- 错误数量: {stats.get('downloader/exception_count', 0)}

## 性能指标
- 请求速率: {stats.get('queue_monitor/rate/enqueued_per_minute', 0) / 60:.2f} RPS
- 内存使用: {stats.get('monitor/memory_percent', 0):.1f}%
- CPU使用: {stats.get('monitor/cpu_percent', 0):.1f}%
- 平均响应时间: {stats.get('download/response_bytes', 0) / max(stats.get('response_received_count', 1), 1):.2f}s

## 优化建议
"""
        
        for rec in recommendations:
            report += f"\n### {rec['issue']} ({rec['severity']})\n"
            report += f"- **问题**: {rec['description']}\n"
            report += "- **建议方案**:\n"
            for suggestion in rec['suggestions']:
                report += f"  - {suggestion}\n"
        
        if not recommendations:
            report += "\n系统运行良好,暂无优化建议。\n"
        
        return report

## 常见问题与解决方案 \{#常见问题与解决方案}

### 问题1: 内存泄漏

**现象**: 爬虫运行一段时间后内存使用持续增长
**解决方案**:
```python
# 实现内存监控和清理
class MemoryLeakDetector:
    """
    内存泄漏检测器
    """
    
    def __init__(self, crawler):
        self.crawler = crawler
        self.memory_snapshots = deque(maxlen=100)
        self.object_trackers = {}
    
    def take_snapshot(self):
        """
        获取内存快照
        """
        import tracemalloc
        snapshot = tracemalloc.take_snapshot()
        self.memory_snapshots.append(snapshot)
        
        # 分析内存使用情况
        if len(self.memory_snapshots) >= 2:
            top_stats = snapshot.compare_to(self.memory_snapshots[0], 'lineno')
            for stat in top_stats[:5]:
                print(f"Memory growth: {stat}")
    
    def detect_leaks(self) -> list:
        """
        检测内存泄漏
        """
        if len(self.memory_snapshots) < 2:
            return []
        
        # 比较最新快照与初始快照
        initial = self.memory_snapshots[0]
        current = self.memory_snapshots[-1]
        
        # 计算内存增长
        initial_size = sum(stat.size for stat in initial.statistics('lineno'))
        current_size = sum(stat.size for stat in current.statistics('lineno'))
        
        growth_rate = (current_size - initial_size) / initial_size if initial_size > 0 else 0
        
        if growth_rate > 0.5:  # 50%增长认为是潜在泄漏
            return [f"Potential memory leak detected: {growth_rate:.2%} growth"]
        
        return []

# 在设置中启用tracemalloc
import tracemalloc
tracemalloc.start()

问题2: 请求速率不稳定

现象: 请求速率波动很大,有时很快有时很慢 解决方案:

# 实现速率平滑控制器
class RateSmoothController:
    """
    速率平滑控制器
    """
    
    def __init__(self, target_rate: float, smoothing_factor: float = 0.1):
        self.target_rate = target_rate
        self.smoothing_factor = smoothing_factor
        self.current_rate = target_rate
        self.rate_history = deque(maxlen=50)
        self.time_window = 10  # 10秒窗口
    
    def adjust_rate(self, actual_rate: float) -> float:
        """
        调整目标速率
        """
        # 计算平滑后的速率
        if self.rate_history:
            smoothed_rate = self.smoothing_factor * actual_rate + \
                          (1 - self.smoothing_factor) * self.rate_history[-1]
        else:
            smoothed_rate = actual_rate
        
        self.rate_history.append(smoothed_rate)
        
        # 限制调整幅度
        adjustment = min(abs(smoothed_rate - self.current_rate), 
                        self.current_rate * 0.2)  # 最大调整20%
        
        if smoothed_rate > self.current_rate:
            self.current_rate = min(self.current_rate + adjustment, self.target_rate * 1.5)
        else:
            self.current_rate = max(self.current_rate - adjustment, self.target_rate * 0.5)
        
        return self.current_rate

问题3: 频繁被封禁

现象: 爬虫频繁遇到403、429等反爬状态码 解决方案:

# 实现智能反爬对策
class AntiBanStrategy:
    """
    反反爬策略
    """
    
    def __init__(self):
        self.session_rotator = SessionRotator()
        self.header_rotator = HeaderRotator()
        self.delay_adaptor = DelayAdaptor()
    
    def adapt_to_detection(self, response, request):
        """
        根据响应自适应调整策略
        """
        status = response.status
        
        if status == 403:  # Forbidden
            # 立即更换session和headers
            self.session_rotator.rotate_session()
            self.header_rotator.rotate_headers()
            # 增加延迟
            self.delay_adaptor.increase_delay(factor=2.0)
            
        elif status == 429:  # Too Many Requests
            # 增加延迟,可能需要等待更长时间
            retry_after = response.headers.get('Retry-After')
            if retry_after:
                delay = int(retry_after)
                time.sleep(delay)
            else:
                self.delay_adaptor.increase_delay(factor=1.5)
        
        elif status == 404:  # 可能IP被暂时封禁
            self.session_rotator.rotate_session(immediate=True)
        
        # 更新策略
        self.update_strategy(response, request)
    
    def update_strategy(self, response, request):
        """
        更新反爬策略
        """
        # 基于响应内容、状态码等更新策略
        content = response.text.lower()
        
        if 'captcha' in content or 'verify' in content:
            # 需要处理验证码,暂停并人工介入
            pass

class SessionRotator:
    """
    Session轮换器
    """
    
    def __init__(self):
        self.sessions = []
        self.current_session_index = 0
        self.create_sessions()
    
    def create_sessions(self):
        """
        创建多个session
        """
        import requests
        for i in range(10):  # 创建10个session
            session = requests.Session()
            # 配置session
            session.headers.update({
                'User-Agent': self._random_user_agent(),
                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
                'Accept-Language': 'en-US,en;q=0.5',
                'Accept-Encoding': 'gzip, deflate',
                'Connection': 'keep-alive',
            })
            self.sessions.append(session)
    
    def rotate_session(self, immediate: bool = False):
        """
        轮换session
        """
        if immediate:
            # 立即轮换
            self.current_session_index = (self.current_session_index + 1) % len(self.sessions)
        else:
            # 随机轮换
            import random
            self.current_session_index = random.randint(0, len(self.sessions) - 1)
    
    def get_current_session(self):
        """
        获取当前session
        """
        return self.sessions[self.current_session_index]
    
    def _random_user_agent(self):
        """
        获取随机User-Agent
        """
        user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
        ]
        import random
        return random.choice(user_agents)

最佳实践总结

配置优化最佳实践

  1. 内存配置:

    # 生产环境推荐配置
    MEMUSAGE_ENABLED = True
    MEMUSAGE_LIMIT_MB = 4096  # 4GB限制
    MEMUSAGE_WARNING_MB = 3072  # 3GB警告
    
    # 队列配置
    CONCURRENT_REQUESTS = 32
    CONCURRENT_REQUESTS_PER_DOMAIN = 8
    DOWNLOAD_DELAY = 1
    RANDOMIZE_DOWNLOAD_DELAY = 0.5
    
    # 调度器配置
    SCHEDULER_PRIORITY_QUEUE = 'scrapy.pqueues.DownloaderAwarePriorityQueue'
    SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleFifoDiskQueue'
    SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.FifoMemoryQueue'
  2. 网络配置:

    # DNS优化
    DNSCACHE_ENABLED = True
    DNSCACHE_SIZE = 10000
    
    # 下载超时
    DOWNLOAD_TIMEOUT = 60
    DOWNLOAD_MAXSIZE = 50 * 1024 * 1024  # 50MB
    DOWNLOAD_WARNSIZE = 10 * 1024 * 1024  # 10MB警告
    
    # 重试配置
    RETRY_TIMES = 3
    RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 429, 403]

监控最佳实践

  1. 关键指标监控:

    • 请求成功率
    • 响应时间分布
    • 内存使用情况
    • CPU使用率
    • 错误率趋势
  2. 告警设置:

    • 内存使用超过85%告警
    • 错误率超过5%告警
    • 响应时间超过10秒告警
    • 爬虫停止运行告警

容错最佳实践

  1. 优雅降级:

    • 当某个域名无法访问时,自动降低对该域名的请求频率
    • 当内存使用过高时,自动减少并发数
    • 当错误率过高时,启用更保守的请求策略
  2. 恢复机制:

    • 断点续传支持
    • 自动重启失败的爬虫
    • 状态持久化存储

💡 核心要点: 大规模爬虫优化是一个系统工程,需要综合考虑内存、网络、并发、数据处理等多个方面。通过合理的配置、完善的监控和健壮的容错机制,可以构建高效稳定的爬虫系统。

SEO优化策略

  1. 关键词优化: 在标题、内容中合理布局"大规模爬虫", "性能优化", "内存管理", "网络优化", "并发控制", "容错机制"等关键词
  2. 内容结构: 使用清晰的标题层级(H1-H6),便于搜索引擎理解内容结构
  3. 内部链接: 建立与其他相关教程的内部链接,提升页面权重
  4. 元数据优化: 在页面头部包含描述性的标题、描述和标签

🔗 相关教程推荐

🏷️ 标签云: 大规模爬虫 性能优化 内存管理 网络优化 并发控制 容错机制 监控系统 断点续传 爬虫调优 系统稳定性