Scrapy-Redis分布式架构 - 构建高性能分布式爬虫集群

📂 所属阶段:第五阶段 — 战力升级(分布式与进阶篇)
🔗 相关章节:DownloaderMiddleware · Spider中间件深度定制 · 分布式去重与调度

目录

分布式爬虫概述

分布式爬虫是解决大规模数据采集需求的核心技术方案。传统的单机爬虫受限于硬件资源和网络带宽,难以满足海量数据的采集需求。分布式爬虫通过将爬取任务分配到多个节点,实现并行处理,显著提升数据采集效率。

分布式爬虫的优势

"""
分布式爬虫核心优势:

1. 水平扩展能力:
   - 通过增加节点提升整体性能
   - 突破单机资源瓶颈
   - 支持弹性伸缩

2. 容错性:
   - 单节点故障不影响整体运行
   - 自动故障转移
   - 高可用保障

3. 负载均衡:
   - 任务自动分配
   - 资源利用率最大化
   - 避免单点过载

4. 数据一致性:
   - 统一去重机制
   - 避免重复抓取
   - 数据完整性保障
"""

分布式爬虫架构模式

"""
常见的分布式爬虫架构:

1. 中央队列模式:
   - Redis作为中央队列
   - 多个爬虫节点消费
   - 统一去重机制

2. 分片模式:
   - 任务预先分片
   - 每个节点负责特定分片
   - 减少协调开销

3. 混合模式:
   - 结合多种架构优势
   - 动态任务分配
   - 自适应调度
"""

Scrapy-Redis架构原理

Scrapy-Redis是基于Redis的分布式爬虫扩展库,通过Redis实现请求队列和去重集合的共享,使多个Scrapy爬虫实例能够协同工作。

核心组件关系

graph TB
    subgraph "分布式爬虫集群"
        A["爬虫节点1<br/>Scrapy Instance 1"]
        B["爬虫节点2<br/>Scrapy Instance 2"]
        C["爬虫节点N<br/>Scrapy Instance N"]
    end
    
    subgraph "Redis存储层"
        D["请求队列<br/>Request Queue"]
        E["去重集合<br/>Duplicate Filter"]
        F["调度器<br/>Scheduler"]
    end
    
    A --> D
    B --> D
    C --> D
    A --> E
    B --> E
    C --> E
    D --> F
    E --> F
    
    style A fill:#e1f5fe
    style B fill:#e1f5fe
    style C fill:#e1f5fe
    style D fill:#f3e5f5
    style E fill:#f3e5f5
    style F fill:#fff3e0

架构流程详解

  1. 请求分发:中央队列接收初始URL,各节点从中获取请求
  2. 去重检查:每个请求进入前检查是否已处理,避免重复
  3. 并发处理:多个节点并行处理请求,提升效率
  4. 结果汇总:处理结果统一存储,保证数据一致性

安装与配置

环境准备

# 安装Scrapy-Redis
pip install scrapy-redis

# 确保Redis服务已启动
# Ubuntu/Debian
sudo systemctl start redis-server

# CentOS/RHEL
sudo systemctl start redis

# macOS (使用Homebrew)
brew services start redis

基础配置

# settings.py - Scrapy-Redis基础配置
import os

# 启用Redis调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"

# 启用Redis去重过滤器
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"

# Redis连接配置
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379')

# 或者分别配置主机和端口
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', None)
REDIS_DB = int(os.getenv('REDIS_DB', 0))

# 持久化配置 - 关闭时不清空队列
SCHEDULER_PERSIST = True

# 请求队列类型配置
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'  # 优先级队列(默认)
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.FifoQueue'    # FIFO队列
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.LifoQueue'    # LIFO队列

# 最大队列长度
SCHEDULER_QUEUE_KEY = '%(spider)s:requests'  # 队列键名格式
DUPEFILTER_KEY = '%(spider)s:dupefilter'     # 去重键名格式

# 清理Redis配置
REDIS_CLEAR_SCHEUDLER = True  # 启动时是否清空队列
REDIS_START_URLS_AS_SET = True  # 使用Set存储起始URL
REDIS_START_URLS_KEY = '%(name)s:start_urls'  # 起始URL键名

# 连接池配置
REDIS_PARAMS = {
    'socket_timeout': 30,
    'socket_connect_timeout': 30,
    'retry_on_timeout': True,
    'encoding': 'utf-8',
    'health_check_interval': 30,
    'max_connections': 20,
    'db': REDIS_DB,
    'password': REDIS_PASSWORD,
}

# 日志配置
LOG_LEVEL = 'INFO'

高级配置

# advanced_settings.py - 高级配置选项
import redis

# 自定义Redis连接工厂
def get_redis_connection():
    """
    获取Redis连接
    """
    pool = redis.ConnectionPool(
        host=REDIS_HOST,
        port=REDIS_PORT,
        db=REDIS_DB,
        password=REDIS_PASSWORD,
        max_connections=20,
        health_check_interval=30
    )
    return redis.Redis(connection_pool=pool)

# 性能优化配置
SCRAPOXY_REDIS_CONNECTION_POOL_SIZE = 20
SCHEDULER_FLUSH_ON_START = True  # 启动时清空队列

# 监控配置
STATS_CLASS = 'scrapy_redis.stats.RedisStatsCollector'
REDIS_STATS_KEY = '%(spider)s:stats'  # 统计数据键名

# 消息队列配置
REDIS_JOBS_KEY = '%(spider)s:jobs'  # 任务队列键名
REDIS_ITEMS_KEY = '%(spider)s:items'  # 结果队列键名

核心组件详解

Scheduler调度器

# scheduler.py - 调度器详解
from scrapy_redis.scheduler import Scheduler
from scrapy.utils.misc import load_object
import redis

class CustomScheduler(Scheduler):
    """
    自定义调度器
    """
    
    def __init__(self, server, persist=False, flush_on_start=False, queue_cls=None, 
                 dupefilter_cls=None, idle_persist=False, serializer=None, **kwargs):
        """
        初始化调度器
        """
        super().__init__(
            server=server,
            persist=persist,
            flush_on_start=flush_on_start,
            queue_cls=queue_cls,
            dupefilter_cls=dupefilter_cls,
            idle_persist=idle_persist,
            serializer=serializer,
            **kwargs
        )
        
        # 自定义初始化逻辑
        self.custom_stats = {}
        self.task_distribution_strategy = 'round_robin'
    
    def enqueue_request(self, request):
        """
        入队请求
        """
        if not request.dont_filter and self.df.request_seen(request):
            # 已经见过的请求,跳过
            return False
        
        if self.server.llen(self.queue_key) == 0:
            # 队列为空时的处理
            self._initialize_queue()
        
        # 添加请求到队列
        self.queue.push(request)
        return True
    
    def next_request(self):
        """
        获取下一个请求
        """
        request = self.queue.pop()
        if request:
            # 更新统计信息
            self._update_request_stats(request)
        return request
    
    def _update_request_stats(self, request):
        """
        更新请求统计
        """
        # 记录请求来源、优先级等信息
        pass
    
    def _initialize_queue(self):
        """
        初始化队列
        """
        # 预处理队列,如设置初始权重等
        pass

RFPDupeFilter去重过滤器

# dupefilter.py - 去重过滤器详解
from scrapy_redis.dupefilter import RFPDupeFilter
import hashlib
import time

class AdvancedDupeFilter(RFPDupeFilter):
    """
    高级去重过滤器
    """
    
    def __init__(self, server, key, debug=False, expire_time=86400*7):  # 7天过期
        """
        初始化去重过滤器
        """
        super().__init__(server, key, debug)
        self.expire_time = expire_time  # 过期时间
        self.fingerprint_cache = {}     # 指纹缓存
        self.cache_size_limit = 10000   # 缓存大小限制
    
    def request_seen(self, request):
        """
        检查请求是否已见过
        """
        fp = self.request_fingerprint(request)
        
        # 检查本地缓存
        if fp in self.fingerprint_cache:
            return True
        
        # 检查Redis集合
        if self.server.sismember(self.key, fp):
            # 缓存到本地,提高后续检查速度
            self._cache_fingerprint(fp)
            return True
        
        # 添加到Redis集合
        self.server.sadd(self.key, fp)
        
        # 设置过期时间
        if self.expire_time:
            self.server.expire(self.key, self.expire_time)
        
        # 缓存到本地
        self._cache_fingerprint(fp)
        return False
    
    def request_fingerprint(self, request):
        """
        生成请求指纹
        """
        # 使用更复杂的指纹生成算法
        fingerprint_str = f"{request.method}:{request.url}:{request.body.decode('utf-8', errors='ignore')}"
        return hashlib.sha256(fingerprint_str.encode()).hexdigest()
    
    def _cache_fingerprint(self, fp):
        """
        缓存指纹到本地
        """
        if len(self.fingerprint_cache) >= self.cache_size_limit:
            # 清理一半缓存
            keys_to_remove = list(self.fingerprint_cache.keys())[:len(self.fingerprint_cache)//2]
            for key in keys_to_remove:
                del self.fingerprint_cache[key]
        
        self.fingerprint_cache[fp] = time.time()
    
    def clear_cache(self):
        """
        清理缓存
        """
        self.fingerprint_cache.clear()

请求队列实现

# queues.py - 请求队列详解
from scrapy_redis.queue import PriorityQueue, FifoQueue, LifoQueue
import pickle
import json

class OptimizedPriorityQueue(PriorityQueue):
    """
    优化的优先级队列
    """
    
    def __init__(self, server, spider, key, serializer=None):
        """
        初始化队列
        """
        super().__init__(server, spider, key, serializer)
        self.batch_size = 100  # 批量操作大小
        self.compression_enabled = True  # 启用压缩
    
    def _encode_request(self, request):
        """
        编码请求
        """
        obj = super()._encode_request(request)
        
        if self.compression_enabled:
            # 启用压缩以节省Redis空间
            import zlib
            return zlib.compress(pickle.dumps(obj))
        
        return pickle.dumps(obj)
    
    def _decode_request(self, encoded_request):
        """
        解码请求
        """
        if self.compression_enabled:
            import zlib
            obj = pickle.loads(zlib.decompress(encoded_request))
        else:
            obj = pickle.loads(encoded_request)
        
        return self.serializer.loads(obj)
    
    def push_batch(self, requests):
        """
        批量推送请求
        """
        pipe = self.server.pipeline()
        
        for request in requests:
            priority = -request.priority
            encoded_request = self._encode_request(request)
            pipe.zadd(self.key, {encoded_request: priority})
        
        pipe.execute()
    
    def pop_batch(self, batch_size=10):
        """
        批量弹出请求
        """
        # 按优先级获取多个请求
        items = self.server.zrange(self.key, 0, batch_size - 1)
        
        if items:
            pipe = self.server.pipeline()
            
            # 批量删除已获取的请求
            for item in items:
                pipe.zrem(self.key, item)
            
            pipe.execute()
            
            # 解码并返回请求
            return [self._decode_request(item) for item in items]
        
        return []

class AdaptiveQueue:
    """
    自适应队列 - 根据负载动态调整策略
    """
    
    def __init__(self, server, spider, base_key):
        self.server = server
        self.spider = spider
        self.base_key = base_key
        self.queues = {}
        self.load_balancer = LoadBalancer()
    
    def push(self, request):
        """
        智能推送请求到合适队列
        """
        target_queue = self._select_target_queue(request)
        return self.queues[target_queue].push(request)
    
    def pop(self):
        """
        从最优队列获取请求
        """
        selected_queue = self._select_optimal_queue()
        return self.queues[selected_queue].pop()
    
    def _select_target_queue(self, request):
        """
        根据请求特征选择目标队列
        """
        # 根据URL域名、请求类型等因素选择队列
        domain = request.url.split('/')[2]
        return f"{self.base_key}:{domain}"
    
    def _select_optimal_queue(self):
        """
        选择最优队列
        """
        # 根据队列长度、处理速度等因素选择
        queue_lengths = {
            key: self.server.llen(key) 
            for key in self.queues.keys()
        }
        
        # 选择长度最小的队列
        return min(queue_lengths, key=queue_lengths.get)

分布式爬虫实现

RedisSpider基础实现

# distributed_spider.py - 分布式爬虫实现
from scrapy_redis.spiders import RedisSpider
from scrapy.http import Request
import json
import time

class DistributedSpider(RedisSpider):
    """
    分布式爬虫基类
    """
    name = 'distributed_spider'
    
    # Redis键名 - 所有实例共享此队列
    redis_key = 'distributed_spider:start_urls'
    
    # 最大并发请求数
    custom_settings = {
        'CONCURRENT_REQUESTS': 32,
        'CONCURRENT_REQUESTS_PER_DOMAIN': 4,
        'DOWNLOAD_DELAY': 1,
        'RANDOMIZE_DOWNLOAD_DELAY': 0.5,
        'AUTOTHROTTLE_ENABLED': True,
        'AUTOTHROTTLE_START_DELAY': 1,
        'AUTOTHROTTLE_MAX_DELAY': 10,
        'AUTOTHROTTLE_TARGET_CONCURRENCY': 4.0,
    }
    
    def __init__(self, *args, **kwargs):
        """
        初始化分布式爬虫
        """
        super().__init__(*args, **kwargs)
        
        # 初始化统计信息
        self.stats = {
            'requests_sent': 0,
            'responses_received': 0,
            'items_scraped': 0,
            'errors': 0,
            'start_time': time.time()
        }
    
    def make_requests_from_url(self, url):
        """
        从URL创建请求
        """
        # 支持自定义请求参数
        return Request(
            url=url,
            callback=self.parse,
            errback=self.handle_error,
            meta={
                'download_timeout': 30,
                'dont_redirect': False,
                'handle_httpstatus_list': [301, 302, 404, 500]
            }
        )
    
    def parse(self, response):
        """
        解析响应
        """
        try:
            # 更新统计信息
            self.stats['responses_received'] += 1
            
            # 提取数据
            data = {
                'url': response.url,
                'status': response.status,
                'title': response.css('title::text').get(),
                'timestamp': time.time(),
                'spider_name': self.name,
                'node_id': self._get_node_id()
            }
            
            yield data
            
            # 提取链接并加入队列
            for link in response.css('a::attr(href)').getall():
                absolute_url = response.urljoin(link)
                yield Request(
                    url=absolute_url,
                    callback=self.parse,
                    meta={'depth': response.meta.get('depth', 0) + 1}
                )
                
        except Exception as e:
            self.logger.error(f"Parse error: {response.url}, Error: {str(e)}")
            self.stats['errors'] += 1
    
    def handle_error(self, failure):
        """
        处理请求错误
        """
        self.logger.error(f"Request failed: {failure.request.url}")
        self.logger.error(f"Reason: {failure.value}")
        self.stats['errors'] += 1
    
    def _get_node_id(self):
        """
        获取节点ID
        """
        import socket
        return socket.gethostname()
    
    def closed(self, reason):
        """
        爬虫关闭时的清理工作
        """
        self.logger.info(f"Spider closed. Reason: {reason}")
        self.logger.info(f"Final stats: {self.stats}")

class AdvancedDistributedSpider(DistributedSpider):
    """
    高级分布式爬虫
    """
    
    name = 'advanced_distributed_spider'
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        # 初始化高级功能
        self.rate_limiter = RateLimiter()
        self.domain_scheduler = DomainScheduler()
        self.task_manager = TaskManager()
    
    def parse(self, response):
        """
        高级解析逻辑
        """
        # 智能解析策略
        content_type = response.headers.get('content-type', b'').decode('utf-8')
        
        if 'json' in content_type:
            yield from self.parse_json_api(response)
        elif 'html' in content_type:
            yield from self.parse_html_page(response)
        else:
            yield from self.parse_generic_content(response)
    
    def parse_json_api(self, response):
        """
        解析JSON API响应
        """
        try:
            data = json.loads(response.text)
            
            # 递归提取URL
            urls = self._extract_urls_from_json(data)
            for url in urls:
                yield Request(url=url, callback=self.parse)
            
            # 提取业务数据
            yield {
                'type': 'api_data',
                'data': data,
                'url': response.url,
                'timestamp': time.time()
            }
            
        except json.JSONDecodeError:
            self.logger.error(f"Failed to decode JSON: {response.url}")
    
    def parse_html_page(self, response):
        """
        解析HTML页面
        """
        # 提取结构化数据
        structured_data = {
            'url': response.url,
            'title': response.css('title::text').get(),
            'meta_description': response.css('meta[name="description"]::attr(content)').get(),
            'links': response.css('a::attr(href)').getall(),
            'images': response.css('img::attr(src)').getall(),
            'headings': {
                'h1': response.css('h1::text').getall(),
                'h2': response.css('h2::text').getall(),
                'h3': response.css('h3::text').getall(),
            }
        }
        
        yield structured_data
    
    def _extract_urls_from_json(self, data, urls=None):
        """
        从JSON数据中提取URL
        """
        if urls is None:
            urls = []
        
        if isinstance(data, dict):
            for key, value in data.items():
                if key in ['url', 'link', 'href', 'src'] and isinstance(value, str):
                    if value.startswith(('http://', 'https://')):
                        urls.append(value)
                elif isinstance(value, (dict, list)):
                    self._extract_urls_from_json(value, urls)
        elif isinstance(data, list):
            for item in data:
                self._extract_urls_from_json(item, urls)
        
        return urls

class TaskBasedDistributedSpider(RedisSpider):
    """
    基于任务的分布式爬虫
    """
    
    name = 'task_based_spider'
    redis_key = 'task_spider:tasks'
    
    def parse(self, response):
        """
        解析任务
        """
        task_data = json.loads(response.url)  # 任务信息存储在URL中
        
        task_type = task_data.get('type')
        task_params = task_data.get('params', {})
        
        if task_type == 'crawl_page':
            yield from self._crawl_page(task_params)
        elif task_type == 'extract_data':
            yield from self._extract_data(task_params)
        elif task_type == 'api_call':
            yield from self._make_api_call(task_params)
    
    def _crawl_page(self, params):
        """
        爬取页面任务
        """
        url = params['url']
        selectors = params.get('selectors', {})
        
        # 发起页面请求
        yield Request(
            url=url,
            callback=self._parse_crawl_result,
            meta={'selectors': selectors}
        )
    
    def _parse_crawl_result(self, response):
        """
        解析爬取结果
        """
        selectors = response.meta['selectors']
        result = {}
        
        for field, selector in selectors.items():
            result[field] = response.css(selector).get()
        
        yield result

动态任务分发

# task_dispatcher.py - 任务分发器
import redis
import json
import time
from typing import Dict, List, Any

class TaskDispatcher:
    """
    任务分发器
    """
    
    def __init__(self, redis_url='redis://localhost:6379', namespace='distributed_spider'):
        self.redis_client = redis.from_url(redis_url)
        self.namespace = namespace
        self.task_queue_key = f"{namespace}:task_queue"
        self.node_registry_key = f"{namespace}:nodes"
        self.task_status_key = f"{namespace}:task_status"
    
    def register_node(self, node_id: str, capabilities: Dict[str, Any]):
        """
        注册爬虫节点
        """
        node_info = {
            'id': node_id,
            'capabilities': capabilities,
            'registered_at': time.time(),
            'status': 'active'
        }
        
        self.redis_client.hset(
            self.node_registry_key, 
            node_id, 
            json.dumps(node_info)
        )
    
    def submit_task(self, task_data: Dict[str, Any], priority: int = 0):
        """
        提交任务
        """
        task_id = f"task_{int(time.time() * 1000000)}"
        
        task_info = {
            'id': task_id,
            'data': task_data,
            'priority': priority,
            'submitted_at': time.time(),
            'status': 'pending'
        }
        
        # 添加到任务队列
        self.redis_client.zadd(
            self.task_queue_key,
            {json.dumps(task_info): -priority}  # 负号实现降序排序
        )
        
        # 初始化任务状态
        self.redis_client.hset(
            self.task_status_key,
            task_id,
            json.dumps({'status': 'pending', 'submitted_at': time.time()})
        )
        
        return task_id
    
    def get_available_nodes(self) -> List[Dict[str, Any]]:
        """
        获取可用节点
        """
        nodes = []
        node_keys = self.redis_client.hkeys(self.node_registry_key)
        
        for node_key in node_keys:
            node_info = self.redis_client.hget(self.node_registry_key, node_key)
            if node_info:
                node_data = json.loads(node_info)
                if node_data.get('status') == 'active':
                    nodes.append(node_data)
        
        return nodes
    
    def dispatch_tasks(self, max_tasks_per_cycle: int = 100):
        """
        分发任务到节点
        """
        # 获取待处理任务
        pending_tasks = self.redis_client.zrange(
            self.task_queue_key, 
            0, 
            max_tasks_per_cycle - 1
        )
        
        available_nodes = self.get_available_nodes()
        
        if not pending_tasks or not available_nodes:
            return
        
        # 简单轮询分配
        for i, task_data in enumerate(pending_tasks):
            if i >= len(available_nodes):
                break
            
            task_info = json.loads(task_data)
            target_node = available_nodes[i % len(available_nodes)]
            
            # 将任务分发到节点专用队列
            node_queue_key = f"{self.namespace}:node:{target_node['id']}:queue"
            self.redis_client.lpush(node_queue_key, task_data)
            
            # 更新任务状态
            self.redis_client.hset(
                self.task_status_key,
                task_info['id'],
                json.dumps({
                    'status': 'dispatched',
                    'dispatched_to': target_node['id'],
                    'dispatched_at': time.time()
                })
            )
            
            # 从主队列移除任务
            self.redis_client.zrem(self.task_queue_key, task_data)

class LoadBalancer:
    """
    负载均衡器
    """
    
    def __init__(self, redis_client):
        self.redis_client = redis_client
        self.node_loads_key = 'distributed_spider:node_loads'
        self.task_distribution_key = 'distributed_spider:task_distribution'
    
    def get_optimal_node(self, task_type: str = 'default') -> str:
        """
        获取最优节点
        """
        # 获取所有节点负载
        node_loads = self.redis_client.hgetall(self.node_loads_key)
        
        if not node_loads:
            return 'default_node'
        
        # 找到负载最低的节点
        min_load = float('inf')
        optimal_node = None
        
        for node_id, load_str in node_loads.items():
            try:
                load = int(load_str.decode('utf-8'))
                if load < min_load:
                    min_load = load
                    optimal_node = node_id.decode('utf-8')
            except (ValueError, UnicodeDecodeError):
                continue
        
        return optimal_node or 'default_node'
    
    def update_node_load(self, node_id: str, increment: int = 1):
        """
        更新节点负载
        """
        self.redis_client.hincrby(self.node_loads_key, node_id, increment)
    
    def record_task_distribution(self, node_id: str, task_type: str):
        """
        记录任务分布
        """
        distribution_key = f"{self.task_distribution_key}:{task_type}"
        self.redis_client.hincrby(distribution_key, node_id, 1)

# 使用示例
if __name__ == "__main__":
    dispatcher = TaskDispatcher()
    
    # 注册节点
    dispatcher.register_node(
        'node_1', 
        {'cpu_cores': 8, 'memory_gb': 16, 'bandwidth_mbps': 100}
    )
    
    # 提交任务
    task_id = dispatcher.submit_task({
        'type': 'crawl',
        'url': 'https://example.com',
        'depth': 2
    }, priority=1)
    
    print(f"Submitted task: {task_id}")

队列管理策略

多队列策略

# queue_strategy.py - 队列管理策略
from scrapy_redis.queue import PriorityQueue
import time
import hashlib

class MultiQueueStrategy:
    """
    多队列管理策略
    """
    
    def __init__(self, redis_client, base_namespace='distributed_spider'):
        self.redis_client = redis_client
        self.base_namespace = base_namespace
        self.queue_configs = {}
        self.routing_rules = []
    
    def add_queue_config(self, queue_name: str, config: dict):
        """
        添加队列配置
        """
        self.queue_configs[queue_name] = config
    
    def add_routing_rule(self, rule_func, target_queue: str):
        """
        添加路由规则
        """
        self.routing_rules.append((rule_func, target_queue))
    
    def route_request(self, request):
        """
        路由请求到合适队列
        """
        for rule_func, target_queue in self.routing_rules:
            if rule_func(request):
                return target_queue
        
        # 默认队列
        return 'default_queue'
    
    def prioritize_request(self, request, base_priority: int = 0):
        """
        为请求计算优先级
        """
        priority = base_priority
        
        # 根据URL特征调整优先级
        url = request.url
        if 'important' in url:
            priority += 100
        elif 'urgent' in url:
            priority += 50
        
        # 根据域名频率限制调整
        domain = url.split('/')[2]
        recent_requests = self.redis_client.get(f"recent_requests:{domain}")
        if recent_requests and int(recent_requests) > 10:
            priority -= 20  # 降低频繁域名的优先级
        
        # 根据请求深度调整
        depth = request.meta.get('depth', 0)
        if depth > 5:
            priority -= depth * 5  # 深度越大优先级越低
        
        return priority

class PriorityAwareQueue(PriorityQueue):
    """
    支持优先级感知的队列
    """
    
    def __init__(self, server, spider, key, serializer=None):
        super().__init__(server, spider, key, serializer)
        self.priority_stats_key = f"{key}:priority_stats"
        self.stats_window = 3600  # 1小时统计窗口
    
    def push(self, request):
        """
        推送请求时更新优先级统计
        """
        priority = -request.priority
        result = super().push(request)
        
        # 更新优先级统计
        self._update_priority_stats(priority)
        return result
    
    def _update_priority_stats(self, priority: int):
        """
        更新优先级统计
        """
        current_time = int(time.time())
        hour_bucket = current_time - (current_time % self.stats_window)
        
        # 记录优先级分布
        self.redis_client.hincrby(
            f"{self.priority_stats_key}:{hour_bucket}",
            f"priority_{priority}",
            1
        )
    
    def get_queue_statistics(self):
        """
        获取队列统计信息
        """
        current_time = int(time.time())
        hour_bucket = current_time - (current_time % self.stats_window)
        
        stats = self.redis_client.hgetall(f"{self.priority_stats_key}:{hour_bucket}")
        return {
            key.decode('utf-8'): int(value.decode('utf-8'))
            for key, value in stats.items()
        }

class AdaptiveQueueManager:
    """
    自适应队列管理器
    """
    
    def __init__(self, redis_client):
        self.redis_client = redis_client
        self.queue_performance_key = 'queue_performance'
        self.adaptation_threshold = 0.8  # 性能阈值
    
    def evaluate_queue_performance(self, queue_name: str) -> float:
        """
        评估队列性能
        """
        # 获取队列性能指标
        processed_count = self.redis_client.get(f"queue:{queue_name}:processed")
        error_count = self.redis_client.get(f"queue:{queue_name}:errors")
        timeout_count = self.redis_client.get(f"queue:{queue_name}:timeouts")
        
        processed = int(processed_count) if processed_count else 0
        errors = int(error_count) if error_count else 0
        timeouts = int(timeout_count) if timeout_count else 0
        
        total = processed + errors + timeouts
        if total == 0:
            return 1.0  # 默认性能
        
        success_rate = processed / total if total > 0 else 0
        error_rate = (errors + timeouts) / total if total > 0 else 1
        
        # 综合评估
        performance = success_rate * 0.7 - error_rate * 0.3
        return max(0, min(1, performance))  # 归一化到[0,1]
    
    def adapt_queue_configuration(self, queue_name: str):
        """
        自适应调整队列配置
        """
        performance = self.evaluate_queue_performance(queue_name)
        
        if performance < self.adaptation_threshold:
            # 性能不佳,降低负载
            current_concurrency = self.redis_client.get(f"queue:{queue_name}:concurrency")
            if current_concurrency:
                new_concurrency = max(1, int(current_concurrency) // 2)
                self.redis_client.set(f"queue:{queue_name}:concurrency", new_concurrency)
        else:
            # 性能良好,可以增加负载
            current_concurrency = self.redis_client.get(f"queue:{queue_name}:concurrency")
            if current_concurrency:
                new_concurrency = min(100, int(current_concurrency) * 2)
                self.redis_client.set(f"queue:{queue_name}:concurrency", new_concurrency)

class QueuePartitioner:
    """
    队列分区器
    """
    
    def __init__(self, num_partitions: int = 10):
        self.num_partitions = num_partitions
    
    def partition_by_domain(self, url: str) -> int:
        """
        按域名分区
        """
        domain = url.split('/')[2]
        hash_value = int(hashlib.md5(domain.encode()).hexdigest(), 16)
        return hash_value % self.num_partitions
    
    def partition_by_content_type(self, content_type: str) -> int:
        """
        按内容类型分区
        """
        type_hash = int(hashlib.md5(content_type.encode()).hexdigest(), 16)
        return type_hash % self.num_partitions
    
    def partition_by_priority(self, priority: int) -> int:
        """
        按优先级分区
        """
        return abs(priority) % self.num_partitions

去重机制分析

去重算法优化

# deduplication_analysis.py - 去重机制分析
import hashlib
import mmh3
import time
from bitarray import bitarray
import redis

class BloomFilter:
    """
    布隆过滤器实现
    """
    
    def __init__(self, capacity: int, error_rate: float = 0.01):
        """
        初始化布隆过滤器
        """
        self.capacity = capacity
        self.error_rate = error_rate
        
        # 计算位数组大小和哈希函数数量
        self.bit_array_size = self._get_bit_array_size()
        self.hash_count = self._get_hash_count()
        
        self.bit_array = bitarray(self.bit_array_size)
        self.bit_array.setall(0)
    
    def _get_bit_array_size(self) -> int:
        """
        计算位数组大小
        """
        import math
        m = -(self.capacity * math.log(self.error_rate)) / (math.log(2) ** 2)
        return int(m)
    
    def _get_hash_count(self) -> int:
        """
        计算哈希函数数量
        """
        import math
        k = (self.bit_array_size / self.capacity) * math.log(2)
        return int(k)
    
    def _hash(self, item: str, seed: int) -> int:
        """
        生成哈希值
        """
        return mmh3.hash(item, seed) % self.bit_array_size
    
    def add(self, item: str):
        """
        添加元素
        """
        for i in range(self.hash_count):
            index = self._hash(item, i)
            self.bit_array[index] = 1
    
    def contains(self, item: str) -> bool:
        """
        检查元素是否存在
        """
        for i in range(self.hash_count):
            index = self._hash(item, i)
            if not self.bit_array[index]:
                return False
        return True

class RedisBloomFilter:
    """
    基于Redis的布隆过滤器
    """
    
    def __init__(self, redis_client, key_prefix: str, capacity: int = 1000000, error_rate: float = 0.01):
        self.redis_client = redis_client
        self.key_prefix = key_prefix
        self.capacity = capacity
        self.error_rate = error_rate
        
        # 计算参数
        import math
        self.bit_array_size = int(-(capacity * math.log(error_rate)) / (math.log(2) ** 2))
        self.hash_count = int((self.bit_array_size / capacity) * math.log(2))
        
        # Redis位图键
        self.bitmap_key = f"{key_prefix}:bloom_bitmap"
    
    def _get_hashes(self, item: str) -> list:
        """
        获取多个哈希值
        """
        hashes = []
        for i in range(self.hash_count):
            hash_val = mmh3.hash(item, i) % self.bit_array_size
            hashes.append(hash_val)
        return hashes
    
    def add(self, item: str):
        """
        添加元素到布隆过滤器
        """
        hashes = self._get_hashes(item)
        pipe = self.redis_client.pipeline()
        
        for hash_val in hashes:
            pipe.setbit(self.bitmap_key, hash_val, 1)
        
        pipe.execute()
    
    def contains(self, item: str) -> bool:
        """
        检查元素是否存在
        """
        hashes = self._get_hashes(item)
        
        for hash_val in hashes:
            if not self.redis_client.getbit(self.bitmap_key, hash_val):
                return False
        
        return True
    
    def clear(self):
        """
        清空布隆过滤器
        """
        self.redis_client.delete(self.bitmap_key)

class AdvancedDupeFilter:
    """
    高级去重过滤器
    """
    
    def __init__(self, redis_client, namespace: str = 'distributed_spider'):
        self.redis_client = redis_client
        self.namespace = namespace
        
        # 多层去重机制
        self.local_cache = {}  # 本地缓存
        self.redis_set_key = f"{namespace}:dupefilter:set"  # Redis集合
        self.bloom_filter = RedisBloomFilter(
            redis_client, 
            f"{namespace}:dupefilter:bloom"
        )
        
        # 性能统计
        self.stats = {
            'local_hits': 0,
            'redis_hits': 0,
            'bloom_hits': 0,
            'misses': 0,
            'false_positives': 0
        }
    
    def request_seen(self, request) -> bool:
        """
        检查请求是否已见过
        """
        fingerprint = self._get_fingerprint(request)
        
        # 1. 检查本地缓存(最快)
        if fingerprint in self.local_cache:
            self.stats['local_hits'] += 1
            return True
        
        # 2. 检查布隆过滤器(可能存在误判)
        if self.bloom_filter.contains(fingerprint):
            # 布隆过滤器可能误判,需要检查Redis集合确认
            if self.redis_client.sismember(self.redis_set_key, fingerprint):
                self.stats['redis_hits'] += 1
                self._cache_fingerprint(fingerprint)  # 缓存到本地
                return True
            else:
                # 布隆过滤器误判
                self.stats['false_positives'] += 1
        
        # 3. 检查Redis集合
        if self.redis_client.sismember(self.redis_set_key, fingerprint):
            self.stats['redis_hits'] += 1
            self._cache_fingerprint(fingerprint)
            return True
        
        # 4. 添加到所有层级
        pipe = self.redis_client.pipeline()
        pipe.sadd(self.redis_set_key, fingerprint)
        pipe.execute()
        
        self.bloom_filter.add(fingerprint)
        self._cache_fingerprint(fingerprint)
        
        self.stats['misses'] += 1
        return False
    
    def _get_fingerprint(self, request) -> str:
        """
        生成请求指纹
        """
        # 使用URL、方法、参数等生成唯一指纹
        fingerprint_str = f"{request.method}:{request.url}:{request.body.decode('utf-8', errors='ignore')}"
        return hashlib.sha256(fingerprint_str.encode()).hexdigest()
    
    def _cache_fingerprint(self, fingerprint: str):
        """
        缓存指纹到本地
        """
        # 限制缓存大小
        if len(self.local_cache) > 10000:
            # 移除最早的一半缓存
            keys_to_remove = list(self.local_cache.keys())[:5000]
            for key in keys_to_remove:
                del self.local_cache[key]
        
        self.local_cache[fingerprint] = time.time()
    
    def get_stats(self) -> dict:
        """
        获取统计信息
        """
        total_checks = sum(self.stats.values())
        hit_rate = (self.stats['local_hits'] + self.stats['redis_hits']) / total_checks if total_checks > 0 else 0
        
        return {
            **self.stats,
            'hit_rate': hit_rate,
            'cache_size': len(self.local_cache)
        }

class DuplicateAnalysis:
    """
    重复数据分析师
    """
    
    def __init__(self, redis_client):
        self.redis_client = redis_client
        self.analysis_key = 'duplicate_analysis'
    
    def analyze_duplication_patterns(self, days: int = 7) -> dict:
        """
        分析重复模式
        """
        patterns = {
            'daily_duplication_rate': {},
            'top_duplicate_domains': {},
            'duplication_by_hour': {},
            'similarity_clusters': []
        }
        
        # 分析每日重复率
        for day in range(days):
            date_key = f"{self.analysis_key}:day_{day}"
            daily_stats = self.redis_client.hgetall(date_key)
            
            if daily_stats:
                processed = int(daily_stats.get(b'processed', 0))
                duplicates = int(daily_stats.get(b'duplicates', 0))
                
                if processed > 0:
                    duplication_rate = duplicates / processed
                    patterns['daily_duplication_rate'][day] = duplication_rate
        
        # 分析域名重复情况
        domain_stats = self.redis_client.hgetall(f"{self.analysis_key}:domains")
        for domain, count in domain_stats.items():
            patterns['top_duplicate_domains'][domain.decode()] = int(count)
        
        # 按小时分析
        hourly_stats = self.redis_client.hgetall(f"{self.analysis_key}:hours")
        for hour, count in hourly_stats.items():
            patterns['duplication_by_hour'][int(hour)] = int(count)
        
        return patterns
    
    def identify_similar_requests(self, threshold: float = 0.8) -> list:
        """
        识别相似请求
        """
        # 获取最近的请求样本
        recent_requests = self.redis_client.lrange(
            f"{self.analysis_key}:recent_requests", 0, 1000
        )
        
        similar_pairs = []
        
        for i, req1 in enumerate(recent_requests):
            for j, req2 in enumerate(recent_requests[i+1:], i+1):
                similarity = self._calculate_similarity(req1, req2)
                if similarity > threshold:
                    similar_pairs.append({
                        'request1': req1.decode(),
                        'request2': req2.decode(),
                        'similarity': similarity
                    })
        
        return sorted(similar_pairs, key=lambda x: x['similarity'], reverse=True)
    
    def _calculate_similarity(self, req1: bytes, req2: bytes) -> float:
        """
        计算请求相似度
        """
        str1, str2 = req1.decode(), req2.decode()
        
        # 简单的余弦相似度计算
        set1, set2 = set(str1), set(str2)
        intersection = len(set1.intersection(set2))
        union = len(set1.union(set2))
        
        return intersection / union if union > 0 else 0

负载均衡策略

负载均衡算法

# load_balancing.py - 负载均衡策略
import time
import random
from typing import List, Dict, Any
import heapq

class LoadBalancer:
    """
    负载均衡器
    """
    
    def __init__(self, nodes: List[Dict[str, Any]]):
        self.nodes = nodes
        self.node_stats = {node['id']: {'load': 0, 'success_rate': 1.0, 'response_time': 0.1} 
                          for node in nodes}
        self.algorithm = 'weighted_round_robin'  # 默认算法
    
    def select_node(self, request_info: Dict[str, Any] = None) -> str:
        """
        选择最优节点
        """
        if self.algorithm == 'random':
            return self._random_selection()
        elif self.algorithm == 'round_robin':
            return self._round_robin_selection()
        elif self.algorithm == 'weighted_round_robin':
            return self._weighted_round_robin_selection()
        elif self.algorithm == 'least_connections':
            return self._least_connections_selection()
        elif self.algorithm == 'response_time':
            return self._response_time_selection()
        elif self.algorithm == 'adaptive':
            return self._adaptive_selection(request_info)
        else:
            return self._round_robin_selection()
    
    def _random_selection(self) -> str:
        """
        随机选择
        """
        return random.choice(self.nodes)['id']
    
    def _round_robin_selection(self) -> str:
        """
        轮询选择
        """
        # 简单实现,实际中需要维护轮询状态
        active_nodes = [node for node in self.nodes if node.get('status') == 'active']
        if not active_nodes:
            return self.nodes[0]['id'] if self.nodes else None
        return active_nodes[hash(time.time()) % len(active_nodes)]['id']
    
    def _weighted_round_robin_selection(self) -> str:
        """
        加权轮询选择
        """
        # 根据节点能力和当前负载计算权重
        weighted_nodes = []
        
        for node in self.nodes:
            if node.get('status') != 'active':
                continue
            
            # 计算综合权重
            capability_weight = node.get('cpu_cores', 1) * node.get('memory_gb', 1)
            load_factor = 1.0 / (1.0 + self.node_stats[node['id']]['load'])
            success_weight = self.node_stats[node['id']]['success_rate']
            
            weight = capability_weight * load_factor * success_weight
            weighted_nodes.append((weight, node['id']))
        
        if not weighted_nodes:
            return self.nodes[0]['id'] if self.nodes else None
        
        # 根据权重随机选择
        total_weight = sum(weight for weight, _ in weighted_nodes)
        rand_num = random.uniform(0, total_weight)
        
        cumulative_weight = 0
        for weight, node_id in weighted_nodes:
            cumulative_weight += weight
            if rand_num <= cumulative_weight:
                return node_id
        
        return weighted_nodes[-1][1]
    
    def _least_connections_selection(self) -> str:
        """
        最少连接数选择
        """
        active_nodes = [node for node in self.nodes if node.get('status') == 'active']
        if not active_nodes:
            return self.nodes[0]['id'] if self.nodes else None
        
        min_load_node = min(active_nodes, 
                           key=lambda n: self.node_stats[n['id']]['load'])
        return min_load_node['id']
    
    def _response_time_selection(self) -> str:
        """
        响应时间选择
        """
        active_nodes = [node for node in self.nodes if node.get('status') == 'active']
        if not active_nodes:
            return self.nodes[0]['id'] if self.nodes else None
        
        # 选择平均响应时间最短的节点
        best_node = min(active_nodes,
                       key=lambda n: self.node_stats[n['id']]['response_time'])
        return best_node['id']
    
    def _adaptive_selection(self, request_info: Dict[str, Any]) -> str:
        """
        自适应选择
        """
        # 根据请求特征和节点能力进行智能匹配
        if request_info:
            # 如果请求包含特定要求,优先匹配有能力的节点
            required_resources = request_info.get('resources', {})
            
            for node in self.nodes:
                if node.get('status') != 'active':
                    continue
                
                # 检查资源匹配度
                resource_match = self._check_resource_match(node, required_resources)
                if resource_match:
                    return node['id']
        
        # 回退到加权轮询
        return self._weighted_round_robin_selection()
    
    def _check_resource_match(self, node: dict, required_resources: dict) -> bool:
        """
        检查资源匹配度
        """
        for resource, required_value in required_resources.items():
            node_value = node.get(resource, 0)
            if node_value < required_value:
                return False
        return True
    
    def update_node_stats(self, node_id: str, load_change: int = 0, 
                         success: bool = True, response_time: float = 0.0):
        """
        更新节点统计信息
        """
        if node_id not in self.node_stats:
            return
        
        stats = self.node_stats[node_id]
        stats['load'] += load_change
        
        # 更新成功率(指数移动平均)
        alpha = 0.1  # 平滑因子
        current_success_rate = 1.0 if success else 0.0
        stats['success_rate'] = (alpha * current_success_rate + 
                                (1 - alpha) * stats['success_rate'])
        
        # 更新响应时间(指数移动平均)
        current_response_time = response_time or stats['response_time']
        stats['response_time'] = (alpha * current_response_time + 
                                 (1 - alpha) * stats['response_time'])

class ConsistentHashBalancer:
    """
    一致性哈希负载均衡器
    """
    
    def __init__(self, nodes: List[str], virtual_nodes: int = 150):
        self.nodes = nodes
        self.virtual_nodes = virtual_nodes
        self.ring = {}  # 哈希环
        self.sorted_keys = []  # 排序的哈希值
        
        self._initialize_ring()
    
    def _initialize_ring(self):
        """
        初始化哈希环
        """
        for node in self.nodes:
            for i in range(self.virtual_nodes):
                virtual_key = f"{node}:{i}"
                hash_key = hash(virtual_key) % (2**32)
                self.ring[hash_key] = node
                self.sorted_keys.append(hash_key)
        
        self.sorted_keys.sort()
    
    def get_node(self, key: str) -> str:
        """
        根据键获取节点
        """
        if not self.ring:
            return None
        
        hash_key = hash(key) % (2**32)
        
        # 在排序的键中找到第一个大于等于hash_key的位置
        for ring_key in self.sorted_keys:
            if hash_key <= ring_key:
                return self.ring[ring_key]
        
        # 如果没找到,返回第一个节点(环形结构)
        return self.ring[self.sorted_keys[0]]

class TaskAffinityBalancer:
    """
    任务亲和性负载均衡器
    """
    
    def __init__(self, nodes: List[Dict[str, Any]]):
        self.nodes = nodes
        self.task_affinities = {}  # 任务-节点亲和性映射
        self.node_loads = {node['id']: 0 for node in nodes}
        self.max_affinity_duration = 3600  # 亲和性持续时间(秒)
    
    def assign_task(self, task_id: str, task_attributes: Dict[str, Any]) -> str:
        """
        分配任务到节点
        """
        # 检查是否有亲和性节点
        affinity_node = self._get_affinity_node(task_id)
        if affinity_node:
            return affinity_node
        
        # 根据任务属性选择最优节点
        optimal_node = self._select_optimal_node(task_attributes)
        
        # 设置亲和性(如果适用)
        if self._should_create_affinity(task_attributes):
            self._create_affinity(task_id, optimal_node)
        
        # 更新节点负载
        self.node_loads[optimal_node] += 1
        
        return optimal_node
    
    def _get_affinity_node(self, task_id: str) -> str:
        """
        获取亲和性节点
        """
        if task_id in self.task_affinities:
            node_id, timestamp = self.task_affinities[task_id]
            if time.time() - timestamp < self.max_affinity_duration:
                return node_id
            else:
                # 亲和性过期,删除
                del self.task_affinities[task_id]
        
        return None
    
    def _select_optimal_node(self, task_attributes: Dict[str, Any]) -> str:
        """
        选择最优节点
        """
        # 根据任务属性和节点能力进行匹配
        domain = task_attributes.get('domain', '')
        content_type = task_attributes.get('content_type', 'html')
        
        # 优先选择处理过相同域名的节点
        for node_id in self.node_loads.keys():
            if self._has_domain_experience(node_id, domain):
                if self.node_loads[node_id] < max(self.node_loads.values()) * 0.8:
                    return node_id
        
        # 使用加权负载均衡选择
        min_load = min(self.node_loads.values())
        candidates = [node_id for node_id, load in self.node_loads.items() 
                     if load == min_load]
        
        return random.choice(candidates) if candidates else list(self.node_loads.keys())[0]
    
    def _has_domain_experience(self, node_id: str, domain: str) -> bool:
        """
        检查节点是否有域名处理经验
        """
        # 这里可以实现具体的域名经验检查逻辑
        experience_key = f"node_experience:{node_id}:{domain}"
        # 实际实现中可能需要查询Redis或其他存储
        return False
    
    def _should_create_affinity(self, task_attributes: Dict[str, Any]) -> bool:
        """
        是否应该创建亲和性
        """
        # 对于需要保持会话或状态的任务创建亲和性
        return task_attributes.get('requires_session', False)
    
    def _create_affinity(self, task_id: str, node_id: str):
        """
        创建任务-节点亲和性
        """
        self.task_affinities[task_id] = (node_id, time.time())
    
    def release_task(self, task_id: str, node_id: str):
        """
        释放任务
        """
        self.node_loads[node_id] -= 1
        
        # 删除亲和性映射
        if task_id in self.task_affinities:
            del self.task_affinities[task_id]

class LoadBalancerManager:
    """
    负载均衡管理器
    """
    
    def __init__(self):
        self.balancers = {}
        self.performance_monitor = PerformanceMonitor()
    
    def register_balancer(self, name: str, balancer):
        """
        注册负载均衡器
        """
        self.balancers[name] = balancer
    
    def get_balancer(self, name: str):
        """
        获取负载均衡器
        """
        return self.balancers.get(name)
    
    def switch_algorithm(self, name: str, algorithm: str):
        """
        切换负载均衡算法
        """
        if name in self.balancers:
            balancer = self.balancers[name]
            if hasattr(balancer, 'algorithm'):
                balancer.algorithm = algorithm
    
    def get_optimal_balancer(self, traffic_pattern: str) -> str:
        """
        根据流量模式选择最优负载均衡器
        """
        performance_stats = self.performance_monitor.get_stats()
        
        if traffic_pattern == 'uniform':
            return 'round_robin'
        elif traffic_pattern == 'bursty':
            return 'least_connections'
        elif traffic_pattern == 'sticky':
            return 'consistent_hash'
        else:
            return 'adaptive'

class PerformanceMonitor:
    """
    性能监控器
    """
    
    def __init__(self):
        self.metrics = {}
        self.history = {}
    
    def record_metric(self, balancer_name: str, node_id: str, 
                     response_time: float, success: bool):
        """
        记录性能指标
        """
        key = f"{balancer_name}:{node_id}"
        
        if key not in self.metrics:
            self.metrics[key] = {
                'response_times': [],
                'success_rates': [],
                'throughput': 0
            }
        
        metrics = self.metrics[key]
        metrics['response_times'].append(response_time)
        metrics['success_rates'].append(1 if success else 0)
        metrics['throughput'] += 1
    
    def get_stats(self) -> Dict[str, Any]:
        """
        获取统计信息
        """
        stats = {}
        
        for key, metrics in self.metrics.items():
            response_times = metrics['response_times']
            success_rates = metrics['success_rates']
            
            if response_times:
                avg_response_time = sum(response_times) / len(response_times)
                success_rate = sum(success_rates) / len(success_rates) if success_rates else 0
                
                stats[key] = {
                    'avg_response_time': avg_response_time,
                    'success_rate': success_rate,
                    'throughput': metrics['throughput']
                }
        
        return stats

集群部署方案

集群架构设计

# docker-compose-cluster.yml - 分布式爬虫集群
version: '3.8'

services:
  # Redis集群
  redis-master:
    image: redis:6.2-alpine
    container_name: redis-master
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes --maxmemory 2gb --maxmemory-policy allkeys-lru
    volumes:
      - redis-data:/data
    networks:
      - crawler-network

  redis-slave:
    image: redis:6.2-alpine
    container_name: redis-slave
    ports:
      - "6380:6379"
    command: redis-server --slaveof redis-master 6379
    depends_on:
      - redis-master
    networks:
      - crawler-network

  # 监控服务
  redis-commander:
    image: rediscommander/redis-commander:latest
    container_name: redis-commander
    environment:
      - REDIS_HOSTS=local:redis-master:6379
    ports:
      - "8081:8081"
    depends_on:
      - redis-master
    networks:
      - crawler-network

  # 爬虫节点1
  spider-node-1:
    build: .
    container_name: spider-node-1
    environment:
      - REDIS_URL=redis://redis-master:6379
      - NODE_ID=node-1
    volumes:
      - ./output:/app/output
      - ./logs:/app/logs
    depends_on:
      - redis-master
    networks:
      - crawler-network
    deploy:
      resources:
        limits:
          cpus: '2'
          memory: 2G

  # 爬虫节点2
  spider-node-2:
    build: .
    container_name: spider-node-2
    environment:
      - REDIS_URL=redis://redis-master:6379
      - NODE_ID=node-2
    volumes:
      - ./output:/app/output
      - ./logs:/app/logs
    depends_on:
      - redis-master
    networks:
      - crawler-network
    deploy:
      resources:
        limits:
          cpus: '2'
          memory: 2G

  # 爬虫节点3
  spider-node-3:
    build: .
    container_name: spider-node-3
    environment:
      - REDIS_URL=redis://redis-master:6379
      - NODE_ID=node-3
    volumes:
      - ./output:/app/output
      - ./logs:/app/logs
    depends_on:
      - redis-master
    networks:
      - crawler-network
    deploy:
      resources:
        limits:
          cpus: '2'
          memory: 2G

  # 数据处理节点
  processor-node:
    build: 
      context: .
      dockerfile: Dockerfile.processor
    container_name: processor-node
    environment:
      - REDIS_URL=redis://redis-master:6379
      - MONGODB_URI=mongodb://mongo:27017/crawler_db
    depends_on:
      - redis-master
      - mongo
    networks:
      - crawler-network

  # 数据库
  mongo:
    image: mongo:4.4
    container_name: mongodb
    ports:
      - "27017:27017"
    volumes:
      - mongo-data:/data/db
    networks:
      - crawler-network

  # 监控面板
  grafana:
    image: grafana/grafana-enterprise
    container_name: grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana-storage:/var/lib/grafana
    networks:
      - crawler-network
    depends_on:
      - prometheus

  # 指标收集
  prometheus:
    image: prom/prometheus
    container_name: prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    networks:
      - crawler-network

volumes:
  redis-data:
  mongo-data:
  grafana-storage:

networks:
  crawler-network:
    driver: bridge

部署配置文件

# cluster_config.py - 集群配置
import os
from dataclasses import dataclass
from typing import List, Dict, Any

@dataclass
class NodeConfig:
    """
    节点配置
    """
    node_id: str
    role: str  # spider, processor, coordinator
    cpu_cores: int
    memory_gb: int
    bandwidth_mbps: int
    max_concurrent_requests: int
    download_delay: float
    retry_times: int

@dataclass
class ClusterConfig:
    """
    集群配置
    """
    cluster_name: str
    redis_config: Dict[str, Any]
    nodes: List[NodeConfig]
    scheduling_policy: str
    load_balancing_algorithm: str
    monitoring_enabled: bool
    auto_scaling_enabled: bool

class ClusterManager:
    """
    集群管理器
    """
    
    def __init__(self, config: ClusterConfig):
        self.config = config
        self.nodes = {}
        self.cluster_state = 'initialized'
    
    def start_cluster(self):
        """
        启动集群
        """
        print(f"Starting cluster: {self.config.cluster_name}")
        
        # 启动Redis
        self._start_redis()
        
        # 启动各节点
        for node_config in self.config.nodes:
            self._start_node(node_config)
        
        # 启动监控
        if self.config.monitoring_enabled:
            self._start_monitoring()
        
        self.cluster_state = 'running'
        print("Cluster started successfully")
    
    def _start_redis(self):
        """
        启动Redis服务
        """
        redis_config = self.config.redis_config
        print(f"Starting Redis: {redis_config['host']}:{redis_config['port']}")
        # 实际实现中会启动Redis服务
    
    def _start_node(self, node_config: NodeConfig):
        """
        启动节点
        """
        print(f"Starting node {node_config.node_id} with role {node_config.role}")
        
        # 根据节点角色启动相应的服务
        if node_config.role == 'spider':
            self._start_spider_node(node_config)
        elif node_config.role == 'processor':
            self._start_processor_node(node_config)
        elif node_config.role == 'coordinator':
            self._start_coordinator_node(node_config)
    
    def _start_spider_node(self, node_config: NodeConfig):
        """
        启动爬虫节点
        """
        # 配置爬虫参数
        spider_config = {
            'CONCURRENT_REQUESTS': node_config.max_concurrent_requests,
            'DOWNLOAD_DELAY': node_config.download_delay,
            'RETRY_TIMES': node_config.retry_times,
        }
        
        print(f"Spider node {node_config.node_id} configured with: {spider_config}")
    
    def _start_processor_node(self, node_config: NodeConfig):
        """
        启动处理节点
        """
        print(f"Processor node {node_config.node_id} started")
    
    def _start_coordinator_node(self, node_config: NodeConfig):
        """
        启动协调节点
        """
        print(f"Coordinator node {node_config.node_id} started")
    
    def _start_monitoring(self):
        """
        启动监控
        """
        print("Starting monitoring services...")
        # 启动Prometheus, Grafana等监控服务
    
    def scale_up(self, additional_nodes: List[NodeConfig]):
        """
        扩容集群
        """
        for node_config in additional_nodes:
            self._start_node(node_config)
            self.config.nodes.append(node_config)
        
        print(f"Scaled up cluster with {len(additional_nodes)} nodes")
    
    def scale_down(self, node_ids: List[str]):
        """
        缩容集群
        """
        nodes_to_remove = []
        for node_id in node_ids:
            node = next((n for n in self.config.nodes if n.node_id == node_id), None)
            if node:
                self._graceful_shutdown_node(node)
                nodes_to_remove.append(node)
        
        for node in nodes_to_remove:
            self.config.nodes.remove(node)
        
        print(f"Scaled down cluster by removing {len(nodes_to_remove)} nodes")
    
    def _graceful_shutdown_node(self, node_config: NodeConfig):
        """
        优雅关闭节点
        """
        print(f"Gracefully shutting down node {node_config.node_id}")
        # 实现节点优雅关闭逻辑
    
    def get_cluster_status(self) -> Dict[str, Any]:
        """
        获取集群状态
        """
        return {
            'state': self.cluster_state,
            'total_nodes': len(self.config.nodes),
            'active_nodes': len([n for n in self.config.nodes if n.role != 'inactive']),
            'nodes': [
                {
                    'id': node.node_id,
                    'role': node.role,
                    'cpu_cores': node.cpu_cores,
                    'memory_gb': node.memory_gb
                } for node in self.config.nodes
            ]
        }

class AutoScaler:
    """
    自动扩缩容器
    """
    
    def __init__(self, cluster_manager: ClusterManager):
        self.cluster_manager = cluster_manager
        self.metrics_collector = MetricsCollector()
        self.scaling_policies = {
            'cpu_threshold': 80,  # CPU使用率阈值
            'memory_threshold': 85,  # 内存使用率阈值
            'queue_length_threshold': 1000,  # 队列长度阈值
            'response_time_threshold': 5.0,  # 响应时间阈值(秒)
        }
    
    def check_scaling_requirements(self) -> Dict[str, Any]:
        """
        检查扩缩容需求
        """
        metrics = self.metrics_collector.get_cluster_metrics()
        
        scaling_decision = {
            'scale_up': False,
            'scale_down': False,
            'reasons': [],
            'recommended_nodes': 0
        }
        
        # 检查CPU使用率
        avg_cpu = sum(metrics.get('cpu_usage', [])) / len(metrics.get('cpu_usage', [1])) if metrics.get('cpu_usage') else 0
        if avg_cpu > self.scaling_policies['cpu_threshold']:
            scaling_decision['scale_up'] = True
            scaling_decision['reasons'].append(f"High CPU usage: {avg_cpu}%")
            scaling_decision['recommended_nodes'] += 2
        
        # 检查内存使用率
        avg_memory = sum(metrics.get('memory_usage', [])) / len(metrics.get('memory_usage', [1])) if metrics.get('memory_usage') else 0
        if avg_memory > self.scaling_policies['memory_threshold']:
            scaling_decision['scale_up'] = True
            scaling_decision['reasons'].append(f"High memory usage: {avg_memory}%")
            scaling_decision['recommended_nodes'] += 1
        
        # 检查队列长度
        queue_length = metrics.get('queue_length', 0)
        if queue_length > self.scaling_policies['queue_length_threshold']:
            scaling_decision['scale_up'] = True
            scaling_decision['reasons'].append(f"Long queue: {queue_length} items")
            scaling_decision['recommended_nodes'] += max(1, queue_length // 1000)
        
        # 检查响应时间
        avg_response_time = metrics.get('avg_response_time', 0)
        if avg_response_time > self.scaling_policies['response_time_threshold']:
            scaling_decision['scale_up'] = True
            scaling_decision['reasons'].append(f"Slow response: {avg_response_time}s")
            scaling_decision['recommended_nodes'] += 1
        
        return scaling_decision
    
    def perform_scaling(self):
        """
        执行扩缩容
        """
        decision = self.check_scaling_requirements()
        
        if decision['scale_up']:
            # 计算需要增加的节点数
            current_nodes = len(self.cluster_manager.config.nodes)
            target_nodes = current_nodes + decision['recommended_nodes']
            
            # 创建新的节点配置
            new_nodes = []
            for i in range(decision['recommended_nodes']):
                new_node = NodeConfig(
                    node_id=f"auto_node_{current_nodes + i + 1}",
                    role='spider',
                    cpu_cores=2,
                    memory_gb=4,
                    bandwidth_mbps=100,
                    max_concurrent_requests=16,
                    download_delay=1.0,
                    retry_times=3
                )
                new_nodes.append(new_node)
            
            # 扩容集群
            self.cluster_manager.scale_up(new_nodes)
            print(f"Auto scaled up cluster by {decision['recommended_nodes']} nodes")
        
        elif decision['scale_down']:
            # 实现缩容逻辑(略)
            pass

class MetricsCollector:
    """
    指标收集器
    """
    
    def __init__(self):
        self.metrics = {}
    
    def get_cluster_metrics(self) -> Dict[str, Any]:
        """
        获取集群指标
        """
        # 这里会从各个节点收集指标
        # 实际实现中会连接到Prometheus或其他监控系统
        return {
            'cpu_usage': [75, 80, 65],  # 各节点CPU使用率
            'memory_usage': [80, 70, 85],  # 各节点内存使用率
            'queue_length': 1200,  # 队列长度
            'avg_response_time': 3.2,  # 平均响应时间
            'requests_per_second': 50,  # 每秒请求数
        }

## 性能优化技巧 \{#性能优化技巧}

### Redis性能优化

```python
# redis_optimization.py - Redis性能优化
import redis
from redis.sentinel import Sentinel
import time

class RedisOptimizer:
    """
    Redis性能优化器
    """
    
    def __init__(self, redis_config):
        self.redis_config = redis_config
        self.connection_pool = None
        self.optimization_strategies = {
            'connection_pooling': True,
            'pipelining': True,
            'compression': True,
            'serialization': 'pickle'
        }
    
    def create_optimized_connection(self):
        """
        创建优化的Redis连接
        """
        self.connection_pool = redis.ConnectionPool(
            host=self.redis_config['host'],
            port=self.redis_config['port'],
            db=self.redis_config['db'],
            password=self.redis_config['password'],
            max_connections=self.redis_config.get('max_connections', 20),
            retry_on_timeout=True,
            health_check_interval=30,
            socket_keepalive=True,
            socket_keepalive_options={},
            encoding='utf-8',
            decode_responses=False,
            retry_on_timeout=True
        )
        
        return redis.Redis(connection_pool=self.connection_pool)
    
    def optimize_for_crawling(self):
        """
        针对爬虫场景优化Redis
        """
        redis_client = self.create_optimized_connection()
        
        # 设置适合爬虫的Redis配置
        optimization_commands = [
            "CONFIG SET maxmemory 2gb",
            "CONFIG SET maxmemory-policy allkeys-lru",
            "CONFIG SET tcp-keepalive 300",
            "CONFIG SET timeout 300",
            "CONFIG SET hz 10",
            "CONFIG SET notify-keyspace-events KEA"
        ]
        
        for command in optimization_commands:
            try:
                redis_client.execute_command(*command.split())
            except:
                # 某些配置可能不支持,忽略错误
                pass
        
        return redis_client
    
    def batch_operations(self, redis_client, operations):
        """
        批量操作优化
        """
        pipeline = redis_client.pipeline()
        
        for operation in operations:
            op_type, key, value = operation
            if op_type == 'set':
                pipeline.set(key, value)
            elif op_type == 'sadd':
                pipeline.sadd(key, value)
            elif op_type == 'lpush':
                pipeline.lpush(key, value)
            elif op_type == 'zadd':
                score, member = value
                pipeline.zadd(key, {member: score})
        
        return pipeline.execute()
    
    def compression_enabled_operations(self, redis_client, key, value):
        """
        启用压缩的操作
        """
        import zlib
        compressed_value = zlib.compress(str(value).encode('utf-8'))
        return redis_client.set(key, compressed_value)
    
    def decompression_enabled_get(self, redis_client, key):
        """
        启用解压的获取操作
        """
        import zlib
        compressed_value = redis_client.get(key)
        if compressed_value:
            return zlib.decompress(compressed_value).decode('utf-8')
        return None

class QueuePerformanceOptimizer:
    """
    队列性能优化器
    """
    
    def __init__(self, redis_client):
        self.redis_client = redis_client
        self.queue_optimization_strategies = {
            'batch_processing': True,
            'async_operations': True,
            'connection_pooling': True,
            'pipeline_usage': True
        }
    
    def optimize_queue_operations(self, queue_key, batch_size=100):
        """
        优化队列操作性能
        """
        # 批量获取请求
        def batch_pop():
            pipeline = self.redis_client.pipeline()
            
            # 获取多个请求
            for _ in range(batch_size):
                pipeline.lpop(queue_key)
            
            results = pipeline.execute()
            return [result for result in results if result is not None]
        
        # 批量推送请求
        def batch_push(items):
            pipeline = self.redis_client.pipeline()
            
            for item in items:
                pipeline.rpush(queue_key, item)
            
            return pipeline.execute()
        
        return batch_pop, batch_push
    
    def async_queue_operations(self):
        """
        异步队列操作
        """
        import asyncio
        import aioredis
        
        async def get_async_redis():
            redis = await aioredis.from_url(
                f"redis://{self.redis_client.connection_pool.connection_kwargs['host']}:"
                f"{self.redis_client.connection_pool.connection_kwargs['port']}"
            )
            return redis

class ConnectionPoolOptimizer:
    """
    连接池优化器
    """
    
    def __init__(self):
        self.pool_config = {
            'max_connections': 20,
            'timeout': 30,
            'retry_on_timeout': True,
            'health_check_interval': 30
        }
    
    def create_optimized_pool(self, **overrides):
        """
        创建优化的连接池
        """
        config = {**self.pool_config, **overrides}
        
        return redis.ConnectionPool(
            max_connections=config['max_connections'],
            socket_timeout=config['timeout'],
            socket_connect_timeout=config['timeout'],
            retry_on_timeout=config['retry_on_timeout'],
            health_check_interval=config['health_check_interval'],
            **config.get('extra_kwargs', {})
        )

### 爬虫性能优化

```python
# crawler_optimization.py - 爬虫性能优化
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import threading
from functools import wraps

class CrawlerPerformanceOptimizer:
    """
    爬虫性能优化器
    """
    
    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=10)
        self.session = None
        self.performance_metrics = {
            'requests_per_second': 0,
            'average_response_time': 0,
            'success_rate': 0,
            'memory_usage': 0
        }
    
    def async_http_client(self):
        """
        异步HTTP客户端
        """
        connector = aiohttp.TCPConnector(
            limit=100,  # 并发连接数
            limit_per_host=30,  # 每个主机的并发连接数
            ttl_dns_cache=300,  # DNS缓存时间
            use_dns_cache=True,
            keepalive_timeout=75,
        )
        
        timeout = aiohttp.ClientTimeout(total=60, connect=10)
        
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
            headers={
                'User-Agent': 'Mozilla/5.0 (compatible; DistributedBot/1.0)'
            }
        )
        
        return self.session
    
    def optimize_middleware(self):
        """
        优化中间件性能
        """
        middleware_config = {
            'enable_retry_middleware': True,
            'enable_proxy_middleware': True,
            'enable_cache_middleware': False,  # 分布式环境下谨慎使用
            'enable_rate_limit_middleware': True,
            'middleware_order': [
                'custom_user_agent',
                'proxy_middleware',
                'retry_middleware',
                'rate_limit_middleware'
            ]
        }
        return middleware_config
    
    def memory_efficient_parsing(self, response_text, selectors):
        """
        内存高效的解析
        """
        from lxml import html
        import cProfile
        
        # 使用lxml进行高效解析
        tree = html.fromstring(response_text)
        
        results = {}
        for field, selector in selectors.items():
            elements = tree.xpath(selector)
            results[field] = [elem.strip() for elem in elements if elem.strip()]
        
        return results
    
    def async_item_processing(self, items):
        """
        异步处理项目
        """
        async def process_item(item):
            # 异步处理单个项目
            # 可以包括数据清洗、验证、存储等操作
            processed_item = await self._process_single_item(item)
            return processed_item
        
        async def process_all():
            tasks = [process_item(item) for item in items]
            return await asyncio.gather(*tasks)
        
        return asyncio.run(process_all())
    
    async def _process_single_item(self, item):
        """
        处理单个项目
        """
        # 模拟异步处理
        await asyncio.sleep(0.01)  # 模拟处理时间
        return item

def performance_monitor(func):
    """
    性能监控装饰器
    """
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        start_memory = 0  # 可以集成内存监控
        
        try:
            result = await func(*args, **kwargs)
            success = True
        except Exception as e:
            result = None
            success = False
            print(f"Function {func.__name__} failed: {str(e)}")
        
        end_time = time.time()
        duration = end_time - start_time
        
        # 记录性能指标
        print(f"Function {func.__name__} took {duration:.2f}s, Success: {success}")
        
        return result
    return wrapper

class LoadTestingTool:
    """
    负载测试工具
    """
    
    def __init__(self, target_url, concurrency_level=10):
        self.target_url = target_url
        self.concurrency_level = concurrency_level
        self.results = []
    
    async def single_request(self, session, url):
        """
        发送单个请求
        """
        start_time = time.time()
        
        try:
            async with session.get(url) as response:
                content = await response.text()
                status = response.status
        except Exception as e:
            status = 0
            content = str(e)
        
        end_time = time.time()
        
        return {
            'url': url,
            'status': status,
            'response_time': end_time - start_time,
            'content_length': len(content) if content else 0
        }
    
    async def load_test(self, num_requests=100):
        """
        执行负载测试
        """
        connector = aiohttp.TCPConnector(limit=self.concurrency_level)
        timeout = aiohttp.ClientTimeout(total=30)
        
        async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
            tasks = []
            for _ in range(num_requests):
                task = self.single_request(session, self.target_url)
                tasks.append(task)
            
            results = await asyncio.gather(*tasks, return_exceptions=True)
            self.results = [r for r in results if not isinstance(r, Exception)]
        
        return self.analyze_results()
    
    def analyze_results(self):
        """
        分析测试结果
        """
        if not self.results:
            return {}
        
        response_times = [r['response_time'] for r in self.results]
        statuses = [r['status'] for r in self.results]
        
        analysis = {
            'total_requests': len(self.results),
            'successful_requests': len([s for s in statuses if 200 <= s < 400]),
            'failed_requests': len([s for s in statuses if s == 0 or s >= 400]),
            'average_response_time': sum(response_times) / len(response_times),
            'min_response_time': min(response_times),
            'max_response_time': max(response_times),
            'requests_per_second': len(self.results) / max(response_times)
        }
        
        return analysis

监控与运维

监控系统设计

# monitoring_system.py - 监控系统
import psutil
import time
import json
from datetime import datetime
import redis
from typing import Dict, List, Any

class DistributedCrawlerMonitor:
    """
    分布式爬虫监控系统
    """
    
    def __init__(self, redis_client, namespace='crawler_monitor'):
        self.redis_client = redis_client
        self.namespace = namespace
        self.metrics = {}
        self.alerts = []
        self.health_check_interval = 30
    
    def collect_system_metrics(self) -> Dict[str, Any]:
        """
        收集系统指标
        """
        return {
            'cpu_percent': psutil.cpu_percent(interval=1),
            'memory_percent': psutil.virtual_memory().percent,
            'disk_usage': psutil.disk_usage('/').percent,
            'network_io': psutil.net_io_counters()._asdict(),
            'process_count': len(psutil.pids()),
            'timestamp': datetime.now().isoformat()
        }
    
    def collect_redis_metrics(self) -> Dict[str, Any]:
        """
        收集Redis指标
        """
        info = self.redis_client.info()
        
        return {
            'used_memory': info.get('used_memory_human'),
            'connected_clients': info.get('connected_clients'),
            'total_commands_processed': info.get('total_commands_processed'),
            'instantaneous_ops_per_sec': info.get('instantaneous_ops_per_sec'),
            'keyspace_hits': info.get('keyspace_hits'),
            'keyspace_misses': info.get('keyspace_misses'),
            'hit_rate': info.get('keyspace_hits', 0) / (info.get('keyspace_hits', 0) + info.get('keyspace_misses', 1)) * 100,
            'timestamp': datetime.now().isoformat()
        }
    
    def collect_crawler_metrics(self) -> Dict[str, Any]:
        """
        收集爬虫指标
        """
        # 从Redis获取爬虫统计数据
        stats_keys = self.redis_client.keys(f"{self.namespace}:stats:*")
        
        stats = {}
        for key in stats_keys:
            node_stats = self.redis_client.hgetall(key)
            stats[key.decode()] = {k.decode(): v.decode() for k, v in node_stats.items()}
        
        return stats
    
    def check_health(self) -> Dict[str, Any]:
        """
        检查健康状态
        """
        system_metrics = self.collect_system_metrics()
        redis_metrics = self.collect_redis_metrics()
        
        health_status = {
            'system_healthy': (
                system_metrics['cpu_percent'] < 80 and
                system_metrics['memory_percent'] < 85
            ),
            'redis_healthy': (
                redis_metrics['connected_clients'] > 0 and
                redis_metrics['hit_rate'] > 80
            ),
            'crawler_healthy': self._check_crawler_health(),
            'overall_status': 'healthy'
        }
        
        if not health_status['system_healthy']:
            health_status['overall_status'] = 'degraded'
            self._add_alert('System resources overloaded')
        
        if not health_status['redis_healthy']:
            health_status['overall_status'] = 'critical'
            self._add_alert('Redis performance issues')
        
        return health_status
    
    def _check_crawler_health(self) -> bool:
        """
        检查爬虫健康状态
        """
        # 检查队列长度是否正常
        queue_keys = self.redis_client.keys('*:requests')
        for key in queue_keys:
            if self.redis_client.llen(key) > 10000:  # 队列过长
                self._add_alert(f'Queue {key.decode()} too long')
                return False
        
        # 检查去重集合大小
        dupefilter_keys = self.redis_client.keys('*:dupefilter')
        for key in dupefilter_keys:
            if self.redis_client.scard(key) > 1000000:  # 去重集合过大
                self._add_alert(f'Dupefilter {key.decode()} too large')
                return False
        
        return True
    
    def _add_alert(self, message: str):
        """
        添加告警
        """
        alert = {
            'message': message,
            'timestamp': datetime.now().isoformat(),
            'severity': 'warning'
        }
        self.alerts.append(alert)
        
        # 存储到Redis
        self.redis_client.lpush(f"{self.namespace}:alerts", json.dumps(alert))
        self.redis_client.ltrim(f"{self.namespace}:alerts", 0, 99)  # 只保留最近100个告警

class AlertManager:
    """
    告警管理器
    """
    
    def __init__(self, redis_client):
        self.redis_client = redis_client
        self.alert_handlers = []
        self.severity_levels = {
            'info': 0,
            'warning': 1,
            'error': 2,
            'critical': 3
        }
    
    def add_alert_handler(self, handler):
        """
        添加告警处理器
        """
        self.alert_handlers.append(handler)
    
    def trigger_alert(self, message: str, severity: str = 'warning', **context):
        """
        触发告警
        """
        alert = {
            'message': message,
            'severity': severity,
            'timestamp': datetime.now().isoformat(),
            'context': context
        }
        
        # 存储告警
        self.redis_client.lpush('crawler_alerts', json.dumps(alert))
        self.redis_client.ltrim('crawler_alerts', 0, 99)  # 只保留最近100个告警
        
        # 执行告警处理器
        for handler in self.alert_handlers:
            try:
                handler(alert)
            except Exception as e:
                print(f"Alert handler failed: {str(e)}")
    
    def email_alert_handler(self, smtp_config):
        """
        邮件告警处理器
        """
        import smtplib
        from email.mime.text import MIMEText
        from email.mime.multipart import MIMEMultipart
        
        def send_email(alert):
            msg = MIMEMultipart()
            msg['Subject'] = f"Crawler Alert: {alert['severity'].upper()}"
            msg['From'] = smtp_config['from']
            msg['To'] = smtp_config['to']
            
            body = f"""
            Crawler Alert
            
            Message: {alert['message']}
            Severity: {alert['severity']}
            Time: {alert['timestamp']}
            Context: {json.dumps(alert['context'], indent=2)}
            """
            
            msg.attach(MIMEText(body, 'plain'))
            
            with smtplib.SMTP(smtp_config['server'], smtp_config['port']) as server:
                server.starttls()
                server.login(smtp_config['username'], smtp_config['password'])
                server.send_message(msg)
        
        return send_email
    
    def webhook_alert_handler(self, webhook_url: str):
        """
        Webhook告警处理器
        """
        import requests
        
        def send_webhook(alert):
            try:
                response = requests.post(webhook_url, json=alert)
                response.raise_for_status()
            except Exception as e:
                print(f"Webhook alert failed: {str(e)}")
        
        return send_webhook

class DashboardGenerator:
    """
    仪表板生成器
    """
    
    def __init__(self, redis_client):
        self.redis_client = redis_client
    
    def generate_overview_dashboard(self) -> Dict[str, Any]:
        """
        生成概览仪表板
        """
        dashboard = {
            'summary': self._get_summary_stats(),
            'performance_trends': self._get_performance_trends(),
            'health_status': self._get_health_status(),
            'recent_alerts': self._get_recent_alerts(),
            'node_status': self._get_node_status(),
            'generated_at': datetime.now().isoformat()
        }
        
        return dashboard
    
    def _get_summary_stats(self) -> Dict[str, Any]:
        """
        获取汇总统计
        """
        # 获取总的请求数量
        total_requests = 0
        queue_keys = self.redis_client.keys('*:requests')
        for key in queue_keys:
            total_requests += self.redis_client.llen(key)
        
        # 获取总的去重请求数量
        total_processed = 0
        stats_keys = self.redis_client.keys('*:stats')
        for key in stats_keys:
            stats = self.redis_client.hgetall(key)
            if b'response_count' in stats:
                total_processed += int(stats[b'response_count'])
        
        # 获取活跃节点数
        node_keys = self.redis_client.keys('distributed_spider:nodes:*')
        active_nodes = len(node_keys)
        
        return {
            'total_requests_in_queue': total_requests,
            'total_processed_requests': total_processed,
            'active_nodes': active_nodes,
            'uptime_hours': self._calculate_uptime()
        }
    
    def _get_performance_trends(self) -> Dict[str, Any]:
        """
        获取性能趋势
        """
        # 这里可以实现历史数据的趋势分析
        # 从Redis的时间序列数据中获取
        return {
            'requests_per_minute': self._get_requests_per_minute(),
            'average_response_time': self._get_average_response_time(),
            'success_rate_trend': self._get_success_rate_trend()
        }
    
    def _get_health_status(self) -> Dict[str, str]:
        """
        获取健康状态
        """
        # 检查各个组件的健康状态
        redis_ping = self.redis_client.ping()
        
        return {
            'redis': 'healthy' if redis_ping else 'unhealthy',
            'nodes': self._assess_node_health(),
            'queues': self._assess_queue_health()
        }
    
    def _get_recent_alerts(self) -> List[Dict[str, Any]]:
        """
        获取最近的告警
        """
        alert_data = self.redis_client.lrange('crawler_alerts', 0, 9)
        alerts = []
        
        for data in alert_data:
            try:
                alert = json.loads(data.decode())
                alerts.append(alert)
            except:
                continue
        
        return alerts
    
    def _get_node_status(self) -> Dict[str, Any]:
        """
        获取节点状态
        """
        node_keys = self.redis_client.keys('distributed_spider:nodes:*')
        node_status = {}
        
        for key in node_keys:
            node_info = self.redis_client.hgetall(key)
            node_id = key.decode().split(':')[-1]
            
            node_status[node_id] = {
                'status': node_info.get(b'status', b'unknown').decode(),
                'registered_at': node_info.get(b'registered_at', b'').decode(),
                'capabilities': json.loads(node_info.get(b'capabilities', b'{}').decode())
            }
        
        return node_status
    
    def _calculate_uptime(self) -> float:
        """
        计算运行时间
        """
        # 可以从Redis中存储的启动时间计算
        startup_time = self.redis_client.get('crawler_startup_time')
        if startup_time:
            startup_timestamp = float(startup_time.decode())
            uptime_seconds = time.time() - startup_timestamp
            return uptime_seconds / 3600  # 转换为小时
        return 0

### 自动运维

```python
# automation.py - 自动运维
import schedule
import time
import subprocess
from datetime import datetime, timedelta

class AutoOpsManager:
    """
    自动运维管理器
    """
    
    def __init__(self, monitor, redis_client):
        self.monitor = monitor
        self.redis_client = redis_client
        self.automation_rules = []
        self.job_scheduler = schedule.Scheduler()
    
    def setup_regular_tasks(self):
        """
        设置定期任务
        """
        # 每分钟检查系统健康
        self.job_scheduler.every(1).minutes.do(self._regular_health_check)
        
        # 每5分钟清理过期数据
        self.job_scheduler.every(5).minutes.do(self._cleanup_expired_data)
        
        # 每小时生成报告
        self.job_scheduler.every(1).hour.do(self._generate_hourly_report)
        
        # 每天清理日志
        self.job_scheduler.every().day.at("02:00").do(self._cleanup_logs)
    
    def _regular_health_check(self):
        """
        定期健康检查
        """
        health_status = self.monitor.check_health()
        
        if health_status['overall_status'] == 'critical':
            self._trigger_emergency_procedures()
        elif health_status['overall_status'] == 'degraded':
            self._adjust_performance_settings()
    
    def _cleanup_expired_data(self):
        """
        清理过期数据
        """
        # 清理超过7天的统计数据
        week_ago = (datetime.now() - timedelta(days=7)).isoformat()
        
        # 这里可以实现具体的数据清理逻辑
        print(f"[{datetime.now()}] Cleaned up expired data")
    
    def _generate_hourly_report(self):
        """
        生成小时报告
        """
        dashboard = self.monitor.generate_overview_dashboard()
        
        # 存储报告
        report_key = f"crawler_reports:{datetime.now().strftime('%Y%m%d_%H')}"
        self.redis_client.setex(report_key, 86400 * 30, json.dumps(dashboard))
        
        print(f"[{datetime.now()}] Generated hourly report")
    
    def _cleanup_logs(self):
        """
        清理日志
        """
        # 实现日志清理逻辑
        print(f"[{datetime.now()}] Cleaned up old logs")
    
    def _trigger_emergency_procedures(self):
        """
        触发应急程序
        """
        print(f"[{datetime.now()}] Emergency procedures triggered")
        
        # 可以实现降级、暂停爬虫等操作
        self._pause_crawling_if_needed()
    
    def _adjust_performance_settings(self):
        """
        调整性能设置
        """
        print(f"[{datetime.now()}] Adjusting performance settings")
        
        # 根据负载动态调整并发数等参数
        self._adjust_concurrency_based_on_load()
    
    def _pause_crawling_if_needed(self):
        """
        根据需要暂停爬虫
        """
        # 设置暂停标志
        self.redis_client.setex('crawler_pause_flag', 300, 'paused')  # 暂停5分钟
    
    def _adjust_concurrency_based_on_load(self):
        """
        根据负载调整并发数
        """
        system_metrics = self.monitor.collect_system_metrics()
        
        if system_metrics['cpu_percent'] > 80:
            # 降低并发数
            self._reduce_concurrency()
        elif system_metrics['cpu_percent'] < 50:
            # 增加并发数
            self._increase_concurrency()
    
    def _reduce_concurrency(self):
        """
        降低并发数
        """
        # 向所有节点发送降低并发数的指令
        self.redis_client.publish('crawler_control', json.dumps({
            'action': 'reduce_concurrency',
            'factor': 0.8
        }))
    
    def _increase_concurrency(self):
        """
        增加并发数
        """
        # 向所有节点发送增加并发数的指令
        self.redis_client.publish('crawler_control', json.dumps({
            'action': 'increase_concurrency',
            'factor': 1.2
        }))

class DeploymentManager:
    """
    部署管理器
    """
    
    def __init__(self):
        self.deployment_configs = {}
    
    def deploy_new_version(self, version: str, nodes: List[str] = None):
        """
        部署新版本
        """
        deployment_plan = {
            'version': version,
            'nodes': nodes or self._get_all_nodes(),
            'strategy': 'rolling_update',
            'rollback_point': self._create_rollback_point(),
            'start_time': datetime.now().isoformat()
        }
        
        # 执行滚动更新
        self._execute_rolling_update(deployment_plan)
        
        return deployment_plan
    
    def _get_all_nodes(self) -> List[str]:
        """
        获取所有节点
        """
        # 从Redis获取所有注册的节点
        node_keys = self.redis_client.keys('distributed_spider:nodes:*')
        return [key.decode().split(':')[-1] for key in node_keys]
    
    def _create_rollback_point(self) -> str:
        """
        创建回滚点
        """
        rollback_id = f"rollback_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        # 这里可以保存当前配置的状态
        return rollback_id
    
    def _execute_rolling_update(self, plan: Dict[str, Any]):
        """
        执行滚动更新
        """
        print(f"Starting rolling update for version {plan['version']}")
        
        for node in plan['nodes']:
            print(f"Updating node: {node}")
            
            # 停止节点
            self._stop_node(node)
            
            # 更新代码
            self._update_node_code(node, plan['version'])
            
            # 启动节点
            self._start_node(node)
            
            # 等待节点健康检查通过
            if not self._wait_for_node_healthy(node):
                print(f"Node {node} failed to become healthy, initiating rollback")
                self._rollback_deployment(plan)
                return
        
        print("Rolling update completed successfully")
    
    def _stop_node(self, node: str):
        """
        停止节点
        """
        # 发送停止指令到节点
        self.redis_client.publish(f'node_control:{node}', json.dumps({'action': 'stop'}))
    
    def _update_node_code(self, node: str, version: str):
        """
        更新节点代码
        """
        # 这里实现具体的代码更新逻辑
        # 可以通过git pull、scp等方式更新代码
        pass
    
    def _start_node(self, node: str):
        """
        启动节点
        """
        # 发送启动指令到节点
        self.redis_client.publish(f'node_control:{node}', json.dumps({'action': 'start'}))
    
    def _wait_for_node_healthy(self, node: str, timeout: int = 60) -> bool:
        """
        等待节点健康
        """
        start_time = time.time()
        while time.time() - start_time < timeout:
            # 检查节点状态
            node_status = self.redis_client.hgetall(f'distributed_spider:nodes:{node}')
            if node_status and node_status.get(b'status') == b'active':
                return True
            time.sleep(1)
        return False
    
    def _rollback_deployment(self, plan: Dict[str, Any]):
        """
        回滚部署
        """
        print(f"Initiating rollback to {plan['rollback_point']}")
        # 实现回滚逻辑

最佳实践总结

设计原则

"""
分布式爬虫系统设计最佳实践:

1. 高可用性原则:
   - 多节点部署,避免单点故障
   - 健康检查和自动恢复机制
   - 数据备份和恢复策略

2. 可扩展性原则:
   - 水平扩展能力
   - 模块化设计
   - 配置驱动的扩展

3. 性能优化原则:
   - 连接池复用
   - 批量操作
   - 缓存策略
   - 异步处理

4. 安全性原则:
   - 认证授权机制
   - 数据加密传输
   - 访问控制策略

5. 监控治理原则:
   - 全链路监控
   - 告警通知
   - 性能分析
   - 日志审计
"""

### 性能调优建议

```python
"""
性能调优最佳实践:

1. Redis优化:
   - 合理设置内存限制和淘汰策略
   - 使用连接池避免频繁连接创建
   - 批量操作减少网络开销
   - 启用压缩节省存储空间

2. 爬虫配置优化:
   - 合理设置并发数避免过度负载
   - 适当的延迟策略避免被封禁
   - 有效的重试机制处理临时故障
   - 智能的请求调度算法

3. 网络优化:
   - 使用连接复用减少握手开销
   - 启用HTTP/2提升传输效率
   - 合理的超时设置避免阻塞
   - DNS缓存减少解析时间

4. 数据处理优化:
   - 流式处理大数据集
   - 内存复用减少GC压力
   - 异步IO提升吞吐量
   - 数据分区并行处理
"""

### 部署运维最佳实践

```python
"""
部署运维最佳实践:

1. 容器化部署:
   - 使用Docker标准化环境
   - Docker Compose简化多服务编排
   - Kubernetes实现自动化运维

2. 监控告警:
   - 实时监控系统状态
   - 自定义业务指标监控
   - 多渠道告警通知
   - 告警分级处理

3. 日志管理:
   - 结构化日志输出
   - 集中化日志收集
   - 日志轮转防止单文件过大
   - 日志分析挖掘价值信息

4. 安全防护:
   - 网络隔离和访问控制
   - 敏感信息加密存储
   - 定期安全扫描和漏洞修复
   - 安全审计和合规检查
"""

通过本章的学习,你应该已经掌握了Scrapy-Redis分布式架构的核心技术和实现方法。分布式爬虫系统不仅能显著提升数据采集效率,还能提供高可用性和可扩展性。在实际项目中,需要根据具体需求和环境特点,灵活运用这些技术和最佳实践,构建稳定高效的分布式爬虫系统。