#Scrapy-Redis分布式架构 - 构建高性能分布式爬虫集群
📂 所属阶段:第五阶段 — 战力升级(分布式与进阶篇)
🔗 相关章节:DownloaderMiddleware · Spider中间件深度定制 · 分布式去重与调度
#目录
#分布式爬虫概述
分布式爬虫是解决大规模数据采集需求的核心技术方案。传统的单机爬虫受限于硬件资源和网络带宽,难以满足海量数据的采集需求。分布式爬虫通过将爬取任务分配到多个节点,实现并行处理,显著提升数据采集效率。
#分布式爬虫的优势
"""
分布式爬虫核心优势:
1. 水平扩展能力:
- 通过增加节点提升整体性能
- 突破单机资源瓶颈
- 支持弹性伸缩
2. 容错性:
- 单节点故障不影响整体运行
- 自动故障转移
- 高可用保障
3. 负载均衡:
- 任务自动分配
- 资源利用率最大化
- 避免单点过载
4. 数据一致性:
- 统一去重机制
- 避免重复抓取
- 数据完整性保障
"""#分布式爬虫架构模式
"""
常见的分布式爬虫架构:
1. 中央队列模式:
- Redis作为中央队列
- 多个爬虫节点消费
- 统一去重机制
2. 分片模式:
- 任务预先分片
- 每个节点负责特定分片
- 减少协调开销
3. 混合模式:
- 结合多种架构优势
- 动态任务分配
- 自适应调度
"""#Scrapy-Redis架构原理
Scrapy-Redis是基于Redis的分布式爬虫扩展库,通过Redis实现请求队列和去重集合的共享,使多个Scrapy爬虫实例能够协同工作。
#核心组件关系
graph TB
subgraph "分布式爬虫集群"
A["爬虫节点1<br/>Scrapy Instance 1"]
B["爬虫节点2<br/>Scrapy Instance 2"]
C["爬虫节点N<br/>Scrapy Instance N"]
end
subgraph "Redis存储层"
D["请求队列<br/>Request Queue"]
E["去重集合<br/>Duplicate Filter"]
F["调度器<br/>Scheduler"]
end
A --> D
B --> D
C --> D
A --> E
B --> E
C --> E
D --> F
E --> F
style A fill:#e1f5fe
style B fill:#e1f5fe
style C fill:#e1f5fe
style D fill:#f3e5f5
style E fill:#f3e5f5
style F fill:#fff3e0#架构流程详解
- 请求分发:中央队列接收初始URL,各节点从中获取请求
- 去重检查:每个请求进入前检查是否已处理,避免重复
- 并发处理:多个节点并行处理请求,提升效率
- 结果汇总:处理结果统一存储,保证数据一致性
#安装与配置
#环境准备
# 安装Scrapy-Redis
pip install scrapy-redis
# 确保Redis服务已启动
# Ubuntu/Debian
sudo systemctl start redis-server
# CentOS/RHEL
sudo systemctl start redis
# macOS (使用Homebrew)
brew services start redis#基础配置
# settings.py - Scrapy-Redis基础配置
import os
# 启用Redis调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 启用Redis去重过滤器
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# Redis连接配置
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379')
# 或者分别配置主机和端口
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', None)
REDIS_DB = int(os.getenv('REDIS_DB', 0))
# 持久化配置 - 关闭时不清空队列
SCHEDULER_PERSIST = True
# 请求队列类型配置
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue' # 优先级队列(默认)
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.FifoQueue' # FIFO队列
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.LifoQueue' # LIFO队列
# 最大队列长度
SCHEDULER_QUEUE_KEY = '%(spider)s:requests' # 队列键名格式
DUPEFILTER_KEY = '%(spider)s:dupefilter' # 去重键名格式
# 清理Redis配置
REDIS_CLEAR_SCHEUDLER = True # 启动时是否清空队列
REDIS_START_URLS_AS_SET = True # 使用Set存储起始URL
REDIS_START_URLS_KEY = '%(name)s:start_urls' # 起始URL键名
# 连接池配置
REDIS_PARAMS = {
'socket_timeout': 30,
'socket_connect_timeout': 30,
'retry_on_timeout': True,
'encoding': 'utf-8',
'health_check_interval': 30,
'max_connections': 20,
'db': REDIS_DB,
'password': REDIS_PASSWORD,
}
# 日志配置
LOG_LEVEL = 'INFO'#高级配置
# advanced_settings.py - 高级配置选项
import redis
# 自定义Redis连接工厂
def get_redis_connection():
"""
获取Redis连接
"""
pool = redis.ConnectionPool(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_DB,
password=REDIS_PASSWORD,
max_connections=20,
health_check_interval=30
)
return redis.Redis(connection_pool=pool)
# 性能优化配置
SCRAPOXY_REDIS_CONNECTION_POOL_SIZE = 20
SCHEDULER_FLUSH_ON_START = True # 启动时清空队列
# 监控配置
STATS_CLASS = 'scrapy_redis.stats.RedisStatsCollector'
REDIS_STATS_KEY = '%(spider)s:stats' # 统计数据键名
# 消息队列配置
REDIS_JOBS_KEY = '%(spider)s:jobs' # 任务队列键名
REDIS_ITEMS_KEY = '%(spider)s:items' # 结果队列键名#核心组件详解
#Scheduler调度器
# scheduler.py - 调度器详解
from scrapy_redis.scheduler import Scheduler
from scrapy.utils.misc import load_object
import redis
class CustomScheduler(Scheduler):
"""
自定义调度器
"""
def __init__(self, server, persist=False, flush_on_start=False, queue_cls=None,
dupefilter_cls=None, idle_persist=False, serializer=None, **kwargs):
"""
初始化调度器
"""
super().__init__(
server=server,
persist=persist,
flush_on_start=flush_on_start,
queue_cls=queue_cls,
dupefilter_cls=dupefilter_cls,
idle_persist=idle_persist,
serializer=serializer,
**kwargs
)
# 自定义初始化逻辑
self.custom_stats = {}
self.task_distribution_strategy = 'round_robin'
def enqueue_request(self, request):
"""
入队请求
"""
if not request.dont_filter and self.df.request_seen(request):
# 已经见过的请求,跳过
return False
if self.server.llen(self.queue_key) == 0:
# 队列为空时的处理
self._initialize_queue()
# 添加请求到队列
self.queue.push(request)
return True
def next_request(self):
"""
获取下一个请求
"""
request = self.queue.pop()
if request:
# 更新统计信息
self._update_request_stats(request)
return request
def _update_request_stats(self, request):
"""
更新请求统计
"""
# 记录请求来源、优先级等信息
pass
def _initialize_queue(self):
"""
初始化队列
"""
# 预处理队列,如设置初始权重等
pass#RFPDupeFilter去重过滤器
# dupefilter.py - 去重过滤器详解
from scrapy_redis.dupefilter import RFPDupeFilter
import hashlib
import time
class AdvancedDupeFilter(RFPDupeFilter):
"""
高级去重过滤器
"""
def __init__(self, server, key, debug=False, expire_time=86400*7): # 7天过期
"""
初始化去重过滤器
"""
super().__init__(server, key, debug)
self.expire_time = expire_time # 过期时间
self.fingerprint_cache = {} # 指纹缓存
self.cache_size_limit = 10000 # 缓存大小限制
def request_seen(self, request):
"""
检查请求是否已见过
"""
fp = self.request_fingerprint(request)
# 检查本地缓存
if fp in self.fingerprint_cache:
return True
# 检查Redis集合
if self.server.sismember(self.key, fp):
# 缓存到本地,提高后续检查速度
self._cache_fingerprint(fp)
return True
# 添加到Redis集合
self.server.sadd(self.key, fp)
# 设置过期时间
if self.expire_time:
self.server.expire(self.key, self.expire_time)
# 缓存到本地
self._cache_fingerprint(fp)
return False
def request_fingerprint(self, request):
"""
生成请求指纹
"""
# 使用更复杂的指纹生成算法
fingerprint_str = f"{request.method}:{request.url}:{request.body.decode('utf-8', errors='ignore')}"
return hashlib.sha256(fingerprint_str.encode()).hexdigest()
def _cache_fingerprint(self, fp):
"""
缓存指纹到本地
"""
if len(self.fingerprint_cache) >= self.cache_size_limit:
# 清理一半缓存
keys_to_remove = list(self.fingerprint_cache.keys())[:len(self.fingerprint_cache)//2]
for key in keys_to_remove:
del self.fingerprint_cache[key]
self.fingerprint_cache[fp] = time.time()
def clear_cache(self):
"""
清理缓存
"""
self.fingerprint_cache.clear()#请求队列实现
# queues.py - 请求队列详解
from scrapy_redis.queue import PriorityQueue, FifoQueue, LifoQueue
import pickle
import json
class OptimizedPriorityQueue(PriorityQueue):
"""
优化的优先级队列
"""
def __init__(self, server, spider, key, serializer=None):
"""
初始化队列
"""
super().__init__(server, spider, key, serializer)
self.batch_size = 100 # 批量操作大小
self.compression_enabled = True # 启用压缩
def _encode_request(self, request):
"""
编码请求
"""
obj = super()._encode_request(request)
if self.compression_enabled:
# 启用压缩以节省Redis空间
import zlib
return zlib.compress(pickle.dumps(obj))
return pickle.dumps(obj)
def _decode_request(self, encoded_request):
"""
解码请求
"""
if self.compression_enabled:
import zlib
obj = pickle.loads(zlib.decompress(encoded_request))
else:
obj = pickle.loads(encoded_request)
return self.serializer.loads(obj)
def push_batch(self, requests):
"""
批量推送请求
"""
pipe = self.server.pipeline()
for request in requests:
priority = -request.priority
encoded_request = self._encode_request(request)
pipe.zadd(self.key, {encoded_request: priority})
pipe.execute()
def pop_batch(self, batch_size=10):
"""
批量弹出请求
"""
# 按优先级获取多个请求
items = self.server.zrange(self.key, 0, batch_size - 1)
if items:
pipe = self.server.pipeline()
# 批量删除已获取的请求
for item in items:
pipe.zrem(self.key, item)
pipe.execute()
# 解码并返回请求
return [self._decode_request(item) for item in items]
return []
class AdaptiveQueue:
"""
自适应队列 - 根据负载动态调整策略
"""
def __init__(self, server, spider, base_key):
self.server = server
self.spider = spider
self.base_key = base_key
self.queues = {}
self.load_balancer = LoadBalancer()
def push(self, request):
"""
智能推送请求到合适队列
"""
target_queue = self._select_target_queue(request)
return self.queues[target_queue].push(request)
def pop(self):
"""
从最优队列获取请求
"""
selected_queue = self._select_optimal_queue()
return self.queues[selected_queue].pop()
def _select_target_queue(self, request):
"""
根据请求特征选择目标队列
"""
# 根据URL域名、请求类型等因素选择队列
domain = request.url.split('/')[2]
return f"{self.base_key}:{domain}"
def _select_optimal_queue(self):
"""
选择最优队列
"""
# 根据队列长度、处理速度等因素选择
queue_lengths = {
key: self.server.llen(key)
for key in self.queues.keys()
}
# 选择长度最小的队列
return min(queue_lengths, key=queue_lengths.get)#分布式爬虫实现
#RedisSpider基础实现
# distributed_spider.py - 分布式爬虫实现
from scrapy_redis.spiders import RedisSpider
from scrapy.http import Request
import json
import time
class DistributedSpider(RedisSpider):
"""
分布式爬虫基类
"""
name = 'distributed_spider'
# Redis键名 - 所有实例共享此队列
redis_key = 'distributed_spider:start_urls'
# 最大并发请求数
custom_settings = {
'CONCURRENT_REQUESTS': 32,
'CONCURRENT_REQUESTS_PER_DOMAIN': 4,
'DOWNLOAD_DELAY': 1,
'RANDOMIZE_DOWNLOAD_DELAY': 0.5,
'AUTOTHROTTLE_ENABLED': True,
'AUTOTHROTTLE_START_DELAY': 1,
'AUTOTHROTTLE_MAX_DELAY': 10,
'AUTOTHROTTLE_TARGET_CONCURRENCY': 4.0,
}
def __init__(self, *args, **kwargs):
"""
初始化分布式爬虫
"""
super().__init__(*args, **kwargs)
# 初始化统计信息
self.stats = {
'requests_sent': 0,
'responses_received': 0,
'items_scraped': 0,
'errors': 0,
'start_time': time.time()
}
def make_requests_from_url(self, url):
"""
从URL创建请求
"""
# 支持自定义请求参数
return Request(
url=url,
callback=self.parse,
errback=self.handle_error,
meta={
'download_timeout': 30,
'dont_redirect': False,
'handle_httpstatus_list': [301, 302, 404, 500]
}
)
def parse(self, response):
"""
解析响应
"""
try:
# 更新统计信息
self.stats['responses_received'] += 1
# 提取数据
data = {
'url': response.url,
'status': response.status,
'title': response.css('title::text').get(),
'timestamp': time.time(),
'spider_name': self.name,
'node_id': self._get_node_id()
}
yield data
# 提取链接并加入队列
for link in response.css('a::attr(href)').getall():
absolute_url = response.urljoin(link)
yield Request(
url=absolute_url,
callback=self.parse,
meta={'depth': response.meta.get('depth', 0) + 1}
)
except Exception as e:
self.logger.error(f"Parse error: {response.url}, Error: {str(e)}")
self.stats['errors'] += 1
def handle_error(self, failure):
"""
处理请求错误
"""
self.logger.error(f"Request failed: {failure.request.url}")
self.logger.error(f"Reason: {failure.value}")
self.stats['errors'] += 1
def _get_node_id(self):
"""
获取节点ID
"""
import socket
return socket.gethostname()
def closed(self, reason):
"""
爬虫关闭时的清理工作
"""
self.logger.info(f"Spider closed. Reason: {reason}")
self.logger.info(f"Final stats: {self.stats}")
class AdvancedDistributedSpider(DistributedSpider):
"""
高级分布式爬虫
"""
name = 'advanced_distributed_spider'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 初始化高级功能
self.rate_limiter = RateLimiter()
self.domain_scheduler = DomainScheduler()
self.task_manager = TaskManager()
def parse(self, response):
"""
高级解析逻辑
"""
# 智能解析策略
content_type = response.headers.get('content-type', b'').decode('utf-8')
if 'json' in content_type:
yield from self.parse_json_api(response)
elif 'html' in content_type:
yield from self.parse_html_page(response)
else:
yield from self.parse_generic_content(response)
def parse_json_api(self, response):
"""
解析JSON API响应
"""
try:
data = json.loads(response.text)
# 递归提取URL
urls = self._extract_urls_from_json(data)
for url in urls:
yield Request(url=url, callback=self.parse)
# 提取业务数据
yield {
'type': 'api_data',
'data': data,
'url': response.url,
'timestamp': time.time()
}
except json.JSONDecodeError:
self.logger.error(f"Failed to decode JSON: {response.url}")
def parse_html_page(self, response):
"""
解析HTML页面
"""
# 提取结构化数据
structured_data = {
'url': response.url,
'title': response.css('title::text').get(),
'meta_description': response.css('meta[name="description"]::attr(content)').get(),
'links': response.css('a::attr(href)').getall(),
'images': response.css('img::attr(src)').getall(),
'headings': {
'h1': response.css('h1::text').getall(),
'h2': response.css('h2::text').getall(),
'h3': response.css('h3::text').getall(),
}
}
yield structured_data
def _extract_urls_from_json(self, data, urls=None):
"""
从JSON数据中提取URL
"""
if urls is None:
urls = []
if isinstance(data, dict):
for key, value in data.items():
if key in ['url', 'link', 'href', 'src'] and isinstance(value, str):
if value.startswith(('http://', 'https://')):
urls.append(value)
elif isinstance(value, (dict, list)):
self._extract_urls_from_json(value, urls)
elif isinstance(data, list):
for item in data:
self._extract_urls_from_json(item, urls)
return urls
class TaskBasedDistributedSpider(RedisSpider):
"""
基于任务的分布式爬虫
"""
name = 'task_based_spider'
redis_key = 'task_spider:tasks'
def parse(self, response):
"""
解析任务
"""
task_data = json.loads(response.url) # 任务信息存储在URL中
task_type = task_data.get('type')
task_params = task_data.get('params', {})
if task_type == 'crawl_page':
yield from self._crawl_page(task_params)
elif task_type == 'extract_data':
yield from self._extract_data(task_params)
elif task_type == 'api_call':
yield from self._make_api_call(task_params)
def _crawl_page(self, params):
"""
爬取页面任务
"""
url = params['url']
selectors = params.get('selectors', {})
# 发起页面请求
yield Request(
url=url,
callback=self._parse_crawl_result,
meta={'selectors': selectors}
)
def _parse_crawl_result(self, response):
"""
解析爬取结果
"""
selectors = response.meta['selectors']
result = {}
for field, selector in selectors.items():
result[field] = response.css(selector).get()
yield result#动态任务分发
# task_dispatcher.py - 任务分发器
import redis
import json
import time
from typing import Dict, List, Any
class TaskDispatcher:
"""
任务分发器
"""
def __init__(self, redis_url='redis://localhost:6379', namespace='distributed_spider'):
self.redis_client = redis.from_url(redis_url)
self.namespace = namespace
self.task_queue_key = f"{namespace}:task_queue"
self.node_registry_key = f"{namespace}:nodes"
self.task_status_key = f"{namespace}:task_status"
def register_node(self, node_id: str, capabilities: Dict[str, Any]):
"""
注册爬虫节点
"""
node_info = {
'id': node_id,
'capabilities': capabilities,
'registered_at': time.time(),
'status': 'active'
}
self.redis_client.hset(
self.node_registry_key,
node_id,
json.dumps(node_info)
)
def submit_task(self, task_data: Dict[str, Any], priority: int = 0):
"""
提交任务
"""
task_id = f"task_{int(time.time() * 1000000)}"
task_info = {
'id': task_id,
'data': task_data,
'priority': priority,
'submitted_at': time.time(),
'status': 'pending'
}
# 添加到任务队列
self.redis_client.zadd(
self.task_queue_key,
{json.dumps(task_info): -priority} # 负号实现降序排序
)
# 初始化任务状态
self.redis_client.hset(
self.task_status_key,
task_id,
json.dumps({'status': 'pending', 'submitted_at': time.time()})
)
return task_id
def get_available_nodes(self) -> List[Dict[str, Any]]:
"""
获取可用节点
"""
nodes = []
node_keys = self.redis_client.hkeys(self.node_registry_key)
for node_key in node_keys:
node_info = self.redis_client.hget(self.node_registry_key, node_key)
if node_info:
node_data = json.loads(node_info)
if node_data.get('status') == 'active':
nodes.append(node_data)
return nodes
def dispatch_tasks(self, max_tasks_per_cycle: int = 100):
"""
分发任务到节点
"""
# 获取待处理任务
pending_tasks = self.redis_client.zrange(
self.task_queue_key,
0,
max_tasks_per_cycle - 1
)
available_nodes = self.get_available_nodes()
if not pending_tasks or not available_nodes:
return
# 简单轮询分配
for i, task_data in enumerate(pending_tasks):
if i >= len(available_nodes):
break
task_info = json.loads(task_data)
target_node = available_nodes[i % len(available_nodes)]
# 将任务分发到节点专用队列
node_queue_key = f"{self.namespace}:node:{target_node['id']}:queue"
self.redis_client.lpush(node_queue_key, task_data)
# 更新任务状态
self.redis_client.hset(
self.task_status_key,
task_info['id'],
json.dumps({
'status': 'dispatched',
'dispatched_to': target_node['id'],
'dispatched_at': time.time()
})
)
# 从主队列移除任务
self.redis_client.zrem(self.task_queue_key, task_data)
class LoadBalancer:
"""
负载均衡器
"""
def __init__(self, redis_client):
self.redis_client = redis_client
self.node_loads_key = 'distributed_spider:node_loads'
self.task_distribution_key = 'distributed_spider:task_distribution'
def get_optimal_node(self, task_type: str = 'default') -> str:
"""
获取最优节点
"""
# 获取所有节点负载
node_loads = self.redis_client.hgetall(self.node_loads_key)
if not node_loads:
return 'default_node'
# 找到负载最低的节点
min_load = float('inf')
optimal_node = None
for node_id, load_str in node_loads.items():
try:
load = int(load_str.decode('utf-8'))
if load < min_load:
min_load = load
optimal_node = node_id.decode('utf-8')
except (ValueError, UnicodeDecodeError):
continue
return optimal_node or 'default_node'
def update_node_load(self, node_id: str, increment: int = 1):
"""
更新节点负载
"""
self.redis_client.hincrby(self.node_loads_key, node_id, increment)
def record_task_distribution(self, node_id: str, task_type: str):
"""
记录任务分布
"""
distribution_key = f"{self.task_distribution_key}:{task_type}"
self.redis_client.hincrby(distribution_key, node_id, 1)
# 使用示例
if __name__ == "__main__":
dispatcher = TaskDispatcher()
# 注册节点
dispatcher.register_node(
'node_1',
{'cpu_cores': 8, 'memory_gb': 16, 'bandwidth_mbps': 100}
)
# 提交任务
task_id = dispatcher.submit_task({
'type': 'crawl',
'url': 'https://example.com',
'depth': 2
}, priority=1)
print(f"Submitted task: {task_id}")#队列管理策略
#多队列策略
# queue_strategy.py - 队列管理策略
from scrapy_redis.queue import PriorityQueue
import time
import hashlib
class MultiQueueStrategy:
"""
多队列管理策略
"""
def __init__(self, redis_client, base_namespace='distributed_spider'):
self.redis_client = redis_client
self.base_namespace = base_namespace
self.queue_configs = {}
self.routing_rules = []
def add_queue_config(self, queue_name: str, config: dict):
"""
添加队列配置
"""
self.queue_configs[queue_name] = config
def add_routing_rule(self, rule_func, target_queue: str):
"""
添加路由规则
"""
self.routing_rules.append((rule_func, target_queue))
def route_request(self, request):
"""
路由请求到合适队列
"""
for rule_func, target_queue in self.routing_rules:
if rule_func(request):
return target_queue
# 默认队列
return 'default_queue'
def prioritize_request(self, request, base_priority: int = 0):
"""
为请求计算优先级
"""
priority = base_priority
# 根据URL特征调整优先级
url = request.url
if 'important' in url:
priority += 100
elif 'urgent' in url:
priority += 50
# 根据域名频率限制调整
domain = url.split('/')[2]
recent_requests = self.redis_client.get(f"recent_requests:{domain}")
if recent_requests and int(recent_requests) > 10:
priority -= 20 # 降低频繁域名的优先级
# 根据请求深度调整
depth = request.meta.get('depth', 0)
if depth > 5:
priority -= depth * 5 # 深度越大优先级越低
return priority
class PriorityAwareQueue(PriorityQueue):
"""
支持优先级感知的队列
"""
def __init__(self, server, spider, key, serializer=None):
super().__init__(server, spider, key, serializer)
self.priority_stats_key = f"{key}:priority_stats"
self.stats_window = 3600 # 1小时统计窗口
def push(self, request):
"""
推送请求时更新优先级统计
"""
priority = -request.priority
result = super().push(request)
# 更新优先级统计
self._update_priority_stats(priority)
return result
def _update_priority_stats(self, priority: int):
"""
更新优先级统计
"""
current_time = int(time.time())
hour_bucket = current_time - (current_time % self.stats_window)
# 记录优先级分布
self.redis_client.hincrby(
f"{self.priority_stats_key}:{hour_bucket}",
f"priority_{priority}",
1
)
def get_queue_statistics(self):
"""
获取队列统计信息
"""
current_time = int(time.time())
hour_bucket = current_time - (current_time % self.stats_window)
stats = self.redis_client.hgetall(f"{self.priority_stats_key}:{hour_bucket}")
return {
key.decode('utf-8'): int(value.decode('utf-8'))
for key, value in stats.items()
}
class AdaptiveQueueManager:
"""
自适应队列管理器
"""
def __init__(self, redis_client):
self.redis_client = redis_client
self.queue_performance_key = 'queue_performance'
self.adaptation_threshold = 0.8 # 性能阈值
def evaluate_queue_performance(self, queue_name: str) -> float:
"""
评估队列性能
"""
# 获取队列性能指标
processed_count = self.redis_client.get(f"queue:{queue_name}:processed")
error_count = self.redis_client.get(f"queue:{queue_name}:errors")
timeout_count = self.redis_client.get(f"queue:{queue_name}:timeouts")
processed = int(processed_count) if processed_count else 0
errors = int(error_count) if error_count else 0
timeouts = int(timeout_count) if timeout_count else 0
total = processed + errors + timeouts
if total == 0:
return 1.0 # 默认性能
success_rate = processed / total if total > 0 else 0
error_rate = (errors + timeouts) / total if total > 0 else 1
# 综合评估
performance = success_rate * 0.7 - error_rate * 0.3
return max(0, min(1, performance)) # 归一化到[0,1]
def adapt_queue_configuration(self, queue_name: str):
"""
自适应调整队列配置
"""
performance = self.evaluate_queue_performance(queue_name)
if performance < self.adaptation_threshold:
# 性能不佳,降低负载
current_concurrency = self.redis_client.get(f"queue:{queue_name}:concurrency")
if current_concurrency:
new_concurrency = max(1, int(current_concurrency) // 2)
self.redis_client.set(f"queue:{queue_name}:concurrency", new_concurrency)
else:
# 性能良好,可以增加负载
current_concurrency = self.redis_client.get(f"queue:{queue_name}:concurrency")
if current_concurrency:
new_concurrency = min(100, int(current_concurrency) * 2)
self.redis_client.set(f"queue:{queue_name}:concurrency", new_concurrency)
class QueuePartitioner:
"""
队列分区器
"""
def __init__(self, num_partitions: int = 10):
self.num_partitions = num_partitions
def partition_by_domain(self, url: str) -> int:
"""
按域名分区
"""
domain = url.split('/')[2]
hash_value = int(hashlib.md5(domain.encode()).hexdigest(), 16)
return hash_value % self.num_partitions
def partition_by_content_type(self, content_type: str) -> int:
"""
按内容类型分区
"""
type_hash = int(hashlib.md5(content_type.encode()).hexdigest(), 16)
return type_hash % self.num_partitions
def partition_by_priority(self, priority: int) -> int:
"""
按优先级分区
"""
return abs(priority) % self.num_partitions#去重机制分析
#去重算法优化
# deduplication_analysis.py - 去重机制分析
import hashlib
import mmh3
import time
from bitarray import bitarray
import redis
class BloomFilter:
"""
布隆过滤器实现
"""
def __init__(self, capacity: int, error_rate: float = 0.01):
"""
初始化布隆过滤器
"""
self.capacity = capacity
self.error_rate = error_rate
# 计算位数组大小和哈希函数数量
self.bit_array_size = self._get_bit_array_size()
self.hash_count = self._get_hash_count()
self.bit_array = bitarray(self.bit_array_size)
self.bit_array.setall(0)
def _get_bit_array_size(self) -> int:
"""
计算位数组大小
"""
import math
m = -(self.capacity * math.log(self.error_rate)) / (math.log(2) ** 2)
return int(m)
def _get_hash_count(self) -> int:
"""
计算哈希函数数量
"""
import math
k = (self.bit_array_size / self.capacity) * math.log(2)
return int(k)
def _hash(self, item: str, seed: int) -> int:
"""
生成哈希值
"""
return mmh3.hash(item, seed) % self.bit_array_size
def add(self, item: str):
"""
添加元素
"""
for i in range(self.hash_count):
index = self._hash(item, i)
self.bit_array[index] = 1
def contains(self, item: str) -> bool:
"""
检查元素是否存在
"""
for i in range(self.hash_count):
index = self._hash(item, i)
if not self.bit_array[index]:
return False
return True
class RedisBloomFilter:
"""
基于Redis的布隆过滤器
"""
def __init__(self, redis_client, key_prefix: str, capacity: int = 1000000, error_rate: float = 0.01):
self.redis_client = redis_client
self.key_prefix = key_prefix
self.capacity = capacity
self.error_rate = error_rate
# 计算参数
import math
self.bit_array_size = int(-(capacity * math.log(error_rate)) / (math.log(2) ** 2))
self.hash_count = int((self.bit_array_size / capacity) * math.log(2))
# Redis位图键
self.bitmap_key = f"{key_prefix}:bloom_bitmap"
def _get_hashes(self, item: str) -> list:
"""
获取多个哈希值
"""
hashes = []
for i in range(self.hash_count):
hash_val = mmh3.hash(item, i) % self.bit_array_size
hashes.append(hash_val)
return hashes
def add(self, item: str):
"""
添加元素到布隆过滤器
"""
hashes = self._get_hashes(item)
pipe = self.redis_client.pipeline()
for hash_val in hashes:
pipe.setbit(self.bitmap_key, hash_val, 1)
pipe.execute()
def contains(self, item: str) -> bool:
"""
检查元素是否存在
"""
hashes = self._get_hashes(item)
for hash_val in hashes:
if not self.redis_client.getbit(self.bitmap_key, hash_val):
return False
return True
def clear(self):
"""
清空布隆过滤器
"""
self.redis_client.delete(self.bitmap_key)
class AdvancedDupeFilter:
"""
高级去重过滤器
"""
def __init__(self, redis_client, namespace: str = 'distributed_spider'):
self.redis_client = redis_client
self.namespace = namespace
# 多层去重机制
self.local_cache = {} # 本地缓存
self.redis_set_key = f"{namespace}:dupefilter:set" # Redis集合
self.bloom_filter = RedisBloomFilter(
redis_client,
f"{namespace}:dupefilter:bloom"
)
# 性能统计
self.stats = {
'local_hits': 0,
'redis_hits': 0,
'bloom_hits': 0,
'misses': 0,
'false_positives': 0
}
def request_seen(self, request) -> bool:
"""
检查请求是否已见过
"""
fingerprint = self._get_fingerprint(request)
# 1. 检查本地缓存(最快)
if fingerprint in self.local_cache:
self.stats['local_hits'] += 1
return True
# 2. 检查布隆过滤器(可能存在误判)
if self.bloom_filter.contains(fingerprint):
# 布隆过滤器可能误判,需要检查Redis集合确认
if self.redis_client.sismember(self.redis_set_key, fingerprint):
self.stats['redis_hits'] += 1
self._cache_fingerprint(fingerprint) # 缓存到本地
return True
else:
# 布隆过滤器误判
self.stats['false_positives'] += 1
# 3. 检查Redis集合
if self.redis_client.sismember(self.redis_set_key, fingerprint):
self.stats['redis_hits'] += 1
self._cache_fingerprint(fingerprint)
return True
# 4. 添加到所有层级
pipe = self.redis_client.pipeline()
pipe.sadd(self.redis_set_key, fingerprint)
pipe.execute()
self.bloom_filter.add(fingerprint)
self._cache_fingerprint(fingerprint)
self.stats['misses'] += 1
return False
def _get_fingerprint(self, request) -> str:
"""
生成请求指纹
"""
# 使用URL、方法、参数等生成唯一指纹
fingerprint_str = f"{request.method}:{request.url}:{request.body.decode('utf-8', errors='ignore')}"
return hashlib.sha256(fingerprint_str.encode()).hexdigest()
def _cache_fingerprint(self, fingerprint: str):
"""
缓存指纹到本地
"""
# 限制缓存大小
if len(self.local_cache) > 10000:
# 移除最早的一半缓存
keys_to_remove = list(self.local_cache.keys())[:5000]
for key in keys_to_remove:
del self.local_cache[key]
self.local_cache[fingerprint] = time.time()
def get_stats(self) -> dict:
"""
获取统计信息
"""
total_checks = sum(self.stats.values())
hit_rate = (self.stats['local_hits'] + self.stats['redis_hits']) / total_checks if total_checks > 0 else 0
return {
**self.stats,
'hit_rate': hit_rate,
'cache_size': len(self.local_cache)
}
class DuplicateAnalysis:
"""
重复数据分析师
"""
def __init__(self, redis_client):
self.redis_client = redis_client
self.analysis_key = 'duplicate_analysis'
def analyze_duplication_patterns(self, days: int = 7) -> dict:
"""
分析重复模式
"""
patterns = {
'daily_duplication_rate': {},
'top_duplicate_domains': {},
'duplication_by_hour': {},
'similarity_clusters': []
}
# 分析每日重复率
for day in range(days):
date_key = f"{self.analysis_key}:day_{day}"
daily_stats = self.redis_client.hgetall(date_key)
if daily_stats:
processed = int(daily_stats.get(b'processed', 0))
duplicates = int(daily_stats.get(b'duplicates', 0))
if processed > 0:
duplication_rate = duplicates / processed
patterns['daily_duplication_rate'][day] = duplication_rate
# 分析域名重复情况
domain_stats = self.redis_client.hgetall(f"{self.analysis_key}:domains")
for domain, count in domain_stats.items():
patterns['top_duplicate_domains'][domain.decode()] = int(count)
# 按小时分析
hourly_stats = self.redis_client.hgetall(f"{self.analysis_key}:hours")
for hour, count in hourly_stats.items():
patterns['duplication_by_hour'][int(hour)] = int(count)
return patterns
def identify_similar_requests(self, threshold: float = 0.8) -> list:
"""
识别相似请求
"""
# 获取最近的请求样本
recent_requests = self.redis_client.lrange(
f"{self.analysis_key}:recent_requests", 0, 1000
)
similar_pairs = []
for i, req1 in enumerate(recent_requests):
for j, req2 in enumerate(recent_requests[i+1:], i+1):
similarity = self._calculate_similarity(req1, req2)
if similarity > threshold:
similar_pairs.append({
'request1': req1.decode(),
'request2': req2.decode(),
'similarity': similarity
})
return sorted(similar_pairs, key=lambda x: x['similarity'], reverse=True)
def _calculate_similarity(self, req1: bytes, req2: bytes) -> float:
"""
计算请求相似度
"""
str1, str2 = req1.decode(), req2.decode()
# 简单的余弦相似度计算
set1, set2 = set(str1), set(str2)
intersection = len(set1.intersection(set2))
union = len(set1.union(set2))
return intersection / union if union > 0 else 0#负载均衡策略
#负载均衡算法
# load_balancing.py - 负载均衡策略
import time
import random
from typing import List, Dict, Any
import heapq
class LoadBalancer:
"""
负载均衡器
"""
def __init__(self, nodes: List[Dict[str, Any]]):
self.nodes = nodes
self.node_stats = {node['id']: {'load': 0, 'success_rate': 1.0, 'response_time': 0.1}
for node in nodes}
self.algorithm = 'weighted_round_robin' # 默认算法
def select_node(self, request_info: Dict[str, Any] = None) -> str:
"""
选择最优节点
"""
if self.algorithm == 'random':
return self._random_selection()
elif self.algorithm == 'round_robin':
return self._round_robin_selection()
elif self.algorithm == 'weighted_round_robin':
return self._weighted_round_robin_selection()
elif self.algorithm == 'least_connections':
return self._least_connections_selection()
elif self.algorithm == 'response_time':
return self._response_time_selection()
elif self.algorithm == 'adaptive':
return self._adaptive_selection(request_info)
else:
return self._round_robin_selection()
def _random_selection(self) -> str:
"""
随机选择
"""
return random.choice(self.nodes)['id']
def _round_robin_selection(self) -> str:
"""
轮询选择
"""
# 简单实现,实际中需要维护轮询状态
active_nodes = [node for node in self.nodes if node.get('status') == 'active']
if not active_nodes:
return self.nodes[0]['id'] if self.nodes else None
return active_nodes[hash(time.time()) % len(active_nodes)]['id']
def _weighted_round_robin_selection(self) -> str:
"""
加权轮询选择
"""
# 根据节点能力和当前负载计算权重
weighted_nodes = []
for node in self.nodes:
if node.get('status') != 'active':
continue
# 计算综合权重
capability_weight = node.get('cpu_cores', 1) * node.get('memory_gb', 1)
load_factor = 1.0 / (1.0 + self.node_stats[node['id']]['load'])
success_weight = self.node_stats[node['id']]['success_rate']
weight = capability_weight * load_factor * success_weight
weighted_nodes.append((weight, node['id']))
if not weighted_nodes:
return self.nodes[0]['id'] if self.nodes else None
# 根据权重随机选择
total_weight = sum(weight for weight, _ in weighted_nodes)
rand_num = random.uniform(0, total_weight)
cumulative_weight = 0
for weight, node_id in weighted_nodes:
cumulative_weight += weight
if rand_num <= cumulative_weight:
return node_id
return weighted_nodes[-1][1]
def _least_connections_selection(self) -> str:
"""
最少连接数选择
"""
active_nodes = [node for node in self.nodes if node.get('status') == 'active']
if not active_nodes:
return self.nodes[0]['id'] if self.nodes else None
min_load_node = min(active_nodes,
key=lambda n: self.node_stats[n['id']]['load'])
return min_load_node['id']
def _response_time_selection(self) -> str:
"""
响应时间选择
"""
active_nodes = [node for node in self.nodes if node.get('status') == 'active']
if not active_nodes:
return self.nodes[0]['id'] if self.nodes else None
# 选择平均响应时间最短的节点
best_node = min(active_nodes,
key=lambda n: self.node_stats[n['id']]['response_time'])
return best_node['id']
def _adaptive_selection(self, request_info: Dict[str, Any]) -> str:
"""
自适应选择
"""
# 根据请求特征和节点能力进行智能匹配
if request_info:
# 如果请求包含特定要求,优先匹配有能力的节点
required_resources = request_info.get('resources', {})
for node in self.nodes:
if node.get('status') != 'active':
continue
# 检查资源匹配度
resource_match = self._check_resource_match(node, required_resources)
if resource_match:
return node['id']
# 回退到加权轮询
return self._weighted_round_robin_selection()
def _check_resource_match(self, node: dict, required_resources: dict) -> bool:
"""
检查资源匹配度
"""
for resource, required_value in required_resources.items():
node_value = node.get(resource, 0)
if node_value < required_value:
return False
return True
def update_node_stats(self, node_id: str, load_change: int = 0,
success: bool = True, response_time: float = 0.0):
"""
更新节点统计信息
"""
if node_id not in self.node_stats:
return
stats = self.node_stats[node_id]
stats['load'] += load_change
# 更新成功率(指数移动平均)
alpha = 0.1 # 平滑因子
current_success_rate = 1.0 if success else 0.0
stats['success_rate'] = (alpha * current_success_rate +
(1 - alpha) * stats['success_rate'])
# 更新响应时间(指数移动平均)
current_response_time = response_time or stats['response_time']
stats['response_time'] = (alpha * current_response_time +
(1 - alpha) * stats['response_time'])
class ConsistentHashBalancer:
"""
一致性哈希负载均衡器
"""
def __init__(self, nodes: List[str], virtual_nodes: int = 150):
self.nodes = nodes
self.virtual_nodes = virtual_nodes
self.ring = {} # 哈希环
self.sorted_keys = [] # 排序的哈希值
self._initialize_ring()
def _initialize_ring(self):
"""
初始化哈希环
"""
for node in self.nodes:
for i in range(self.virtual_nodes):
virtual_key = f"{node}:{i}"
hash_key = hash(virtual_key) % (2**32)
self.ring[hash_key] = node
self.sorted_keys.append(hash_key)
self.sorted_keys.sort()
def get_node(self, key: str) -> str:
"""
根据键获取节点
"""
if not self.ring:
return None
hash_key = hash(key) % (2**32)
# 在排序的键中找到第一个大于等于hash_key的位置
for ring_key in self.sorted_keys:
if hash_key <= ring_key:
return self.ring[ring_key]
# 如果没找到,返回第一个节点(环形结构)
return self.ring[self.sorted_keys[0]]
class TaskAffinityBalancer:
"""
任务亲和性负载均衡器
"""
def __init__(self, nodes: List[Dict[str, Any]]):
self.nodes = nodes
self.task_affinities = {} # 任务-节点亲和性映射
self.node_loads = {node['id']: 0 for node in nodes}
self.max_affinity_duration = 3600 # 亲和性持续时间(秒)
def assign_task(self, task_id: str, task_attributes: Dict[str, Any]) -> str:
"""
分配任务到节点
"""
# 检查是否有亲和性节点
affinity_node = self._get_affinity_node(task_id)
if affinity_node:
return affinity_node
# 根据任务属性选择最优节点
optimal_node = self._select_optimal_node(task_attributes)
# 设置亲和性(如果适用)
if self._should_create_affinity(task_attributes):
self._create_affinity(task_id, optimal_node)
# 更新节点负载
self.node_loads[optimal_node] += 1
return optimal_node
def _get_affinity_node(self, task_id: str) -> str:
"""
获取亲和性节点
"""
if task_id in self.task_affinities:
node_id, timestamp = self.task_affinities[task_id]
if time.time() - timestamp < self.max_affinity_duration:
return node_id
else:
# 亲和性过期,删除
del self.task_affinities[task_id]
return None
def _select_optimal_node(self, task_attributes: Dict[str, Any]) -> str:
"""
选择最优节点
"""
# 根据任务属性和节点能力进行匹配
domain = task_attributes.get('domain', '')
content_type = task_attributes.get('content_type', 'html')
# 优先选择处理过相同域名的节点
for node_id in self.node_loads.keys():
if self._has_domain_experience(node_id, domain):
if self.node_loads[node_id] < max(self.node_loads.values()) * 0.8:
return node_id
# 使用加权负载均衡选择
min_load = min(self.node_loads.values())
candidates = [node_id for node_id, load in self.node_loads.items()
if load == min_load]
return random.choice(candidates) if candidates else list(self.node_loads.keys())[0]
def _has_domain_experience(self, node_id: str, domain: str) -> bool:
"""
检查节点是否有域名处理经验
"""
# 这里可以实现具体的域名经验检查逻辑
experience_key = f"node_experience:{node_id}:{domain}"
# 实际实现中可能需要查询Redis或其他存储
return False
def _should_create_affinity(self, task_attributes: Dict[str, Any]) -> bool:
"""
是否应该创建亲和性
"""
# 对于需要保持会话或状态的任务创建亲和性
return task_attributes.get('requires_session', False)
def _create_affinity(self, task_id: str, node_id: str):
"""
创建任务-节点亲和性
"""
self.task_affinities[task_id] = (node_id, time.time())
def release_task(self, task_id: str, node_id: str):
"""
释放任务
"""
self.node_loads[node_id] -= 1
# 删除亲和性映射
if task_id in self.task_affinities:
del self.task_affinities[task_id]
class LoadBalancerManager:
"""
负载均衡管理器
"""
def __init__(self):
self.balancers = {}
self.performance_monitor = PerformanceMonitor()
def register_balancer(self, name: str, balancer):
"""
注册负载均衡器
"""
self.balancers[name] = balancer
def get_balancer(self, name: str):
"""
获取负载均衡器
"""
return self.balancers.get(name)
def switch_algorithm(self, name: str, algorithm: str):
"""
切换负载均衡算法
"""
if name in self.balancers:
balancer = self.balancers[name]
if hasattr(balancer, 'algorithm'):
balancer.algorithm = algorithm
def get_optimal_balancer(self, traffic_pattern: str) -> str:
"""
根据流量模式选择最优负载均衡器
"""
performance_stats = self.performance_monitor.get_stats()
if traffic_pattern == 'uniform':
return 'round_robin'
elif traffic_pattern == 'bursty':
return 'least_connections'
elif traffic_pattern == 'sticky':
return 'consistent_hash'
else:
return 'adaptive'
class PerformanceMonitor:
"""
性能监控器
"""
def __init__(self):
self.metrics = {}
self.history = {}
def record_metric(self, balancer_name: str, node_id: str,
response_time: float, success: bool):
"""
记录性能指标
"""
key = f"{balancer_name}:{node_id}"
if key not in self.metrics:
self.metrics[key] = {
'response_times': [],
'success_rates': [],
'throughput': 0
}
metrics = self.metrics[key]
metrics['response_times'].append(response_time)
metrics['success_rates'].append(1 if success else 0)
metrics['throughput'] += 1
def get_stats(self) -> Dict[str, Any]:
"""
获取统计信息
"""
stats = {}
for key, metrics in self.metrics.items():
response_times = metrics['response_times']
success_rates = metrics['success_rates']
if response_times:
avg_response_time = sum(response_times) / len(response_times)
success_rate = sum(success_rates) / len(success_rates) if success_rates else 0
stats[key] = {
'avg_response_time': avg_response_time,
'success_rate': success_rate,
'throughput': metrics['throughput']
}
return stats#集群部署方案
#集群架构设计
# docker-compose-cluster.yml - 分布式爬虫集群
version: '3.8'
services:
# Redis集群
redis-master:
image: redis:6.2-alpine
container_name: redis-master
ports:
- "6379:6379"
command: redis-server --appendonly yes --maxmemory 2gb --maxmemory-policy allkeys-lru
volumes:
- redis-data:/data
networks:
- crawler-network
redis-slave:
image: redis:6.2-alpine
container_name: redis-slave
ports:
- "6380:6379"
command: redis-server --slaveof redis-master 6379
depends_on:
- redis-master
networks:
- crawler-network
# 监控服务
redis-commander:
image: rediscommander/redis-commander:latest
container_name: redis-commander
environment:
- REDIS_HOSTS=local:redis-master:6379
ports:
- "8081:8081"
depends_on:
- redis-master
networks:
- crawler-network
# 爬虫节点1
spider-node-1:
build: .
container_name: spider-node-1
environment:
- REDIS_URL=redis://redis-master:6379
- NODE_ID=node-1
volumes:
- ./output:/app/output
- ./logs:/app/logs
depends_on:
- redis-master
networks:
- crawler-network
deploy:
resources:
limits:
cpus: '2'
memory: 2G
# 爬虫节点2
spider-node-2:
build: .
container_name: spider-node-2
environment:
- REDIS_URL=redis://redis-master:6379
- NODE_ID=node-2
volumes:
- ./output:/app/output
- ./logs:/app/logs
depends_on:
- redis-master
networks:
- crawler-network
deploy:
resources:
limits:
cpus: '2'
memory: 2G
# 爬虫节点3
spider-node-3:
build: .
container_name: spider-node-3
environment:
- REDIS_URL=redis://redis-master:6379
- NODE_ID=node-3
volumes:
- ./output:/app/output
- ./logs:/app/logs
depends_on:
- redis-master
networks:
- crawler-network
deploy:
resources:
limits:
cpus: '2'
memory: 2G
# 数据处理节点
processor-node:
build:
context: .
dockerfile: Dockerfile.processor
container_name: processor-node
environment:
- REDIS_URL=redis://redis-master:6379
- MONGODB_URI=mongodb://mongo:27017/crawler_db
depends_on:
- redis-master
- mongo
networks:
- crawler-network
# 数据库
mongo:
image: mongo:4.4
container_name: mongodb
ports:
- "27017:27017"
volumes:
- mongo-data:/data/db
networks:
- crawler-network
# 监控面板
grafana:
image: grafana/grafana-enterprise
container_name: grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana-storage:/var/lib/grafana
networks:
- crawler-network
depends_on:
- prometheus
# 指标收集
prometheus:
image: prom/prometheus
container_name: prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
networks:
- crawler-network
volumes:
redis-data:
mongo-data:
grafana-storage:
networks:
crawler-network:
driver: bridge#部署配置文件
# cluster_config.py - 集群配置
import os
from dataclasses import dataclass
from typing import List, Dict, Any
@dataclass
class NodeConfig:
"""
节点配置
"""
node_id: str
role: str # spider, processor, coordinator
cpu_cores: int
memory_gb: int
bandwidth_mbps: int
max_concurrent_requests: int
download_delay: float
retry_times: int
@dataclass
class ClusterConfig:
"""
集群配置
"""
cluster_name: str
redis_config: Dict[str, Any]
nodes: List[NodeConfig]
scheduling_policy: str
load_balancing_algorithm: str
monitoring_enabled: bool
auto_scaling_enabled: bool
class ClusterManager:
"""
集群管理器
"""
def __init__(self, config: ClusterConfig):
self.config = config
self.nodes = {}
self.cluster_state = 'initialized'
def start_cluster(self):
"""
启动集群
"""
print(f"Starting cluster: {self.config.cluster_name}")
# 启动Redis
self._start_redis()
# 启动各节点
for node_config in self.config.nodes:
self._start_node(node_config)
# 启动监控
if self.config.monitoring_enabled:
self._start_monitoring()
self.cluster_state = 'running'
print("Cluster started successfully")
def _start_redis(self):
"""
启动Redis服务
"""
redis_config = self.config.redis_config
print(f"Starting Redis: {redis_config['host']}:{redis_config['port']}")
# 实际实现中会启动Redis服务
def _start_node(self, node_config: NodeConfig):
"""
启动节点
"""
print(f"Starting node {node_config.node_id} with role {node_config.role}")
# 根据节点角色启动相应的服务
if node_config.role == 'spider':
self._start_spider_node(node_config)
elif node_config.role == 'processor':
self._start_processor_node(node_config)
elif node_config.role == 'coordinator':
self._start_coordinator_node(node_config)
def _start_spider_node(self, node_config: NodeConfig):
"""
启动爬虫节点
"""
# 配置爬虫参数
spider_config = {
'CONCURRENT_REQUESTS': node_config.max_concurrent_requests,
'DOWNLOAD_DELAY': node_config.download_delay,
'RETRY_TIMES': node_config.retry_times,
}
print(f"Spider node {node_config.node_id} configured with: {spider_config}")
def _start_processor_node(self, node_config: NodeConfig):
"""
启动处理节点
"""
print(f"Processor node {node_config.node_id} started")
def _start_coordinator_node(self, node_config: NodeConfig):
"""
启动协调节点
"""
print(f"Coordinator node {node_config.node_id} started")
def _start_monitoring(self):
"""
启动监控
"""
print("Starting monitoring services...")
# 启动Prometheus, Grafana等监控服务
def scale_up(self, additional_nodes: List[NodeConfig]):
"""
扩容集群
"""
for node_config in additional_nodes:
self._start_node(node_config)
self.config.nodes.append(node_config)
print(f"Scaled up cluster with {len(additional_nodes)} nodes")
def scale_down(self, node_ids: List[str]):
"""
缩容集群
"""
nodes_to_remove = []
for node_id in node_ids:
node = next((n for n in self.config.nodes if n.node_id == node_id), None)
if node:
self._graceful_shutdown_node(node)
nodes_to_remove.append(node)
for node in nodes_to_remove:
self.config.nodes.remove(node)
print(f"Scaled down cluster by removing {len(nodes_to_remove)} nodes")
def _graceful_shutdown_node(self, node_config: NodeConfig):
"""
优雅关闭节点
"""
print(f"Gracefully shutting down node {node_config.node_id}")
# 实现节点优雅关闭逻辑
def get_cluster_status(self) -> Dict[str, Any]:
"""
获取集群状态
"""
return {
'state': self.cluster_state,
'total_nodes': len(self.config.nodes),
'active_nodes': len([n for n in self.config.nodes if n.role != 'inactive']),
'nodes': [
{
'id': node.node_id,
'role': node.role,
'cpu_cores': node.cpu_cores,
'memory_gb': node.memory_gb
} for node in self.config.nodes
]
}
class AutoScaler:
"""
自动扩缩容器
"""
def __init__(self, cluster_manager: ClusterManager):
self.cluster_manager = cluster_manager
self.metrics_collector = MetricsCollector()
self.scaling_policies = {
'cpu_threshold': 80, # CPU使用率阈值
'memory_threshold': 85, # 内存使用率阈值
'queue_length_threshold': 1000, # 队列长度阈值
'response_time_threshold': 5.0, # 响应时间阈值(秒)
}
def check_scaling_requirements(self) -> Dict[str, Any]:
"""
检查扩缩容需求
"""
metrics = self.metrics_collector.get_cluster_metrics()
scaling_decision = {
'scale_up': False,
'scale_down': False,
'reasons': [],
'recommended_nodes': 0
}
# 检查CPU使用率
avg_cpu = sum(metrics.get('cpu_usage', [])) / len(metrics.get('cpu_usage', [1])) if metrics.get('cpu_usage') else 0
if avg_cpu > self.scaling_policies['cpu_threshold']:
scaling_decision['scale_up'] = True
scaling_decision['reasons'].append(f"High CPU usage: {avg_cpu}%")
scaling_decision['recommended_nodes'] += 2
# 检查内存使用率
avg_memory = sum(metrics.get('memory_usage', [])) / len(metrics.get('memory_usage', [1])) if metrics.get('memory_usage') else 0
if avg_memory > self.scaling_policies['memory_threshold']:
scaling_decision['scale_up'] = True
scaling_decision['reasons'].append(f"High memory usage: {avg_memory}%")
scaling_decision['recommended_nodes'] += 1
# 检查队列长度
queue_length = metrics.get('queue_length', 0)
if queue_length > self.scaling_policies['queue_length_threshold']:
scaling_decision['scale_up'] = True
scaling_decision['reasons'].append(f"Long queue: {queue_length} items")
scaling_decision['recommended_nodes'] += max(1, queue_length // 1000)
# 检查响应时间
avg_response_time = metrics.get('avg_response_time', 0)
if avg_response_time > self.scaling_policies['response_time_threshold']:
scaling_decision['scale_up'] = True
scaling_decision['reasons'].append(f"Slow response: {avg_response_time}s")
scaling_decision['recommended_nodes'] += 1
return scaling_decision
def perform_scaling(self):
"""
执行扩缩容
"""
decision = self.check_scaling_requirements()
if decision['scale_up']:
# 计算需要增加的节点数
current_nodes = len(self.cluster_manager.config.nodes)
target_nodes = current_nodes + decision['recommended_nodes']
# 创建新的节点配置
new_nodes = []
for i in range(decision['recommended_nodes']):
new_node = NodeConfig(
node_id=f"auto_node_{current_nodes + i + 1}",
role='spider',
cpu_cores=2,
memory_gb=4,
bandwidth_mbps=100,
max_concurrent_requests=16,
download_delay=1.0,
retry_times=3
)
new_nodes.append(new_node)
# 扩容集群
self.cluster_manager.scale_up(new_nodes)
print(f"Auto scaled up cluster by {decision['recommended_nodes']} nodes")
elif decision['scale_down']:
# 实现缩容逻辑(略)
pass
class MetricsCollector:
"""
指标收集器
"""
def __init__(self):
self.metrics = {}
def get_cluster_metrics(self) -> Dict[str, Any]:
"""
获取集群指标
"""
# 这里会从各个节点收集指标
# 实际实现中会连接到Prometheus或其他监控系统
return {
'cpu_usage': [75, 80, 65], # 各节点CPU使用率
'memory_usage': [80, 70, 85], # 各节点内存使用率
'queue_length': 1200, # 队列长度
'avg_response_time': 3.2, # 平均响应时间
'requests_per_second': 50, # 每秒请求数
}
## 性能优化技巧 \{#性能优化技巧}
### Redis性能优化
```python
# redis_optimization.py - Redis性能优化
import redis
from redis.sentinel import Sentinel
import time
class RedisOptimizer:
"""
Redis性能优化器
"""
def __init__(self, redis_config):
self.redis_config = redis_config
self.connection_pool = None
self.optimization_strategies = {
'connection_pooling': True,
'pipelining': True,
'compression': True,
'serialization': 'pickle'
}
def create_optimized_connection(self):
"""
创建优化的Redis连接
"""
self.connection_pool = redis.ConnectionPool(
host=self.redis_config['host'],
port=self.redis_config['port'],
db=self.redis_config['db'],
password=self.redis_config['password'],
max_connections=self.redis_config.get('max_connections', 20),
retry_on_timeout=True,
health_check_interval=30,
socket_keepalive=True,
socket_keepalive_options={},
encoding='utf-8',
decode_responses=False,
retry_on_timeout=True
)
return redis.Redis(connection_pool=self.connection_pool)
def optimize_for_crawling(self):
"""
针对爬虫场景优化Redis
"""
redis_client = self.create_optimized_connection()
# 设置适合爬虫的Redis配置
optimization_commands = [
"CONFIG SET maxmemory 2gb",
"CONFIG SET maxmemory-policy allkeys-lru",
"CONFIG SET tcp-keepalive 300",
"CONFIG SET timeout 300",
"CONFIG SET hz 10",
"CONFIG SET notify-keyspace-events KEA"
]
for command in optimization_commands:
try:
redis_client.execute_command(*command.split())
except:
# 某些配置可能不支持,忽略错误
pass
return redis_client
def batch_operations(self, redis_client, operations):
"""
批量操作优化
"""
pipeline = redis_client.pipeline()
for operation in operations:
op_type, key, value = operation
if op_type == 'set':
pipeline.set(key, value)
elif op_type == 'sadd':
pipeline.sadd(key, value)
elif op_type == 'lpush':
pipeline.lpush(key, value)
elif op_type == 'zadd':
score, member = value
pipeline.zadd(key, {member: score})
return pipeline.execute()
def compression_enabled_operations(self, redis_client, key, value):
"""
启用压缩的操作
"""
import zlib
compressed_value = zlib.compress(str(value).encode('utf-8'))
return redis_client.set(key, compressed_value)
def decompression_enabled_get(self, redis_client, key):
"""
启用解压的获取操作
"""
import zlib
compressed_value = redis_client.get(key)
if compressed_value:
return zlib.decompress(compressed_value).decode('utf-8')
return None
class QueuePerformanceOptimizer:
"""
队列性能优化器
"""
def __init__(self, redis_client):
self.redis_client = redis_client
self.queue_optimization_strategies = {
'batch_processing': True,
'async_operations': True,
'connection_pooling': True,
'pipeline_usage': True
}
def optimize_queue_operations(self, queue_key, batch_size=100):
"""
优化队列操作性能
"""
# 批量获取请求
def batch_pop():
pipeline = self.redis_client.pipeline()
# 获取多个请求
for _ in range(batch_size):
pipeline.lpop(queue_key)
results = pipeline.execute()
return [result for result in results if result is not None]
# 批量推送请求
def batch_push(items):
pipeline = self.redis_client.pipeline()
for item in items:
pipeline.rpush(queue_key, item)
return pipeline.execute()
return batch_pop, batch_push
def async_queue_operations(self):
"""
异步队列操作
"""
import asyncio
import aioredis
async def get_async_redis():
redis = await aioredis.from_url(
f"redis://{self.redis_client.connection_pool.connection_kwargs['host']}:"
f"{self.redis_client.connection_pool.connection_kwargs['port']}"
)
return redis
class ConnectionPoolOptimizer:
"""
连接池优化器
"""
def __init__(self):
self.pool_config = {
'max_connections': 20,
'timeout': 30,
'retry_on_timeout': True,
'health_check_interval': 30
}
def create_optimized_pool(self, **overrides):
"""
创建优化的连接池
"""
config = {**self.pool_config, **overrides}
return redis.ConnectionPool(
max_connections=config['max_connections'],
socket_timeout=config['timeout'],
socket_connect_timeout=config['timeout'],
retry_on_timeout=config['retry_on_timeout'],
health_check_interval=config['health_check_interval'],
**config.get('extra_kwargs', {})
)
### 爬虫性能优化
```python
# crawler_optimization.py - 爬虫性能优化
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import threading
from functools import wraps
class CrawlerPerformanceOptimizer:
"""
爬虫性能优化器
"""
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=10)
self.session = None
self.performance_metrics = {
'requests_per_second': 0,
'average_response_time': 0,
'success_rate': 0,
'memory_usage': 0
}
def async_http_client(self):
"""
异步HTTP客户端
"""
connector = aiohttp.TCPConnector(
limit=100, # 并发连接数
limit_per_host=30, # 每个主机的并发连接数
ttl_dns_cache=300, # DNS缓存时间
use_dns_cache=True,
keepalive_timeout=75,
)
timeout = aiohttp.ClientTimeout(total=60, connect=10)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={
'User-Agent': 'Mozilla/5.0 (compatible; DistributedBot/1.0)'
}
)
return self.session
def optimize_middleware(self):
"""
优化中间件性能
"""
middleware_config = {
'enable_retry_middleware': True,
'enable_proxy_middleware': True,
'enable_cache_middleware': False, # 分布式环境下谨慎使用
'enable_rate_limit_middleware': True,
'middleware_order': [
'custom_user_agent',
'proxy_middleware',
'retry_middleware',
'rate_limit_middleware'
]
}
return middleware_config
def memory_efficient_parsing(self, response_text, selectors):
"""
内存高效的解析
"""
from lxml import html
import cProfile
# 使用lxml进行高效解析
tree = html.fromstring(response_text)
results = {}
for field, selector in selectors.items():
elements = tree.xpath(selector)
results[field] = [elem.strip() for elem in elements if elem.strip()]
return results
def async_item_processing(self, items):
"""
异步处理项目
"""
async def process_item(item):
# 异步处理单个项目
# 可以包括数据清洗、验证、存储等操作
processed_item = await self._process_single_item(item)
return processed_item
async def process_all():
tasks = [process_item(item) for item in items]
return await asyncio.gather(*tasks)
return asyncio.run(process_all())
async def _process_single_item(self, item):
"""
处理单个项目
"""
# 模拟异步处理
await asyncio.sleep(0.01) # 模拟处理时间
return item
def performance_monitor(func):
"""
性能监控装饰器
"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
start_memory = 0 # 可以集成内存监控
try:
result = await func(*args, **kwargs)
success = True
except Exception as e:
result = None
success = False
print(f"Function {func.__name__} failed: {str(e)}")
end_time = time.time()
duration = end_time - start_time
# 记录性能指标
print(f"Function {func.__name__} took {duration:.2f}s, Success: {success}")
return result
return wrapper
class LoadTestingTool:
"""
负载测试工具
"""
def __init__(self, target_url, concurrency_level=10):
self.target_url = target_url
self.concurrency_level = concurrency_level
self.results = []
async def single_request(self, session, url):
"""
发送单个请求
"""
start_time = time.time()
try:
async with session.get(url) as response:
content = await response.text()
status = response.status
except Exception as e:
status = 0
content = str(e)
end_time = time.time()
return {
'url': url,
'status': status,
'response_time': end_time - start_time,
'content_length': len(content) if content else 0
}
async def load_test(self, num_requests=100):
"""
执行负载测试
"""
connector = aiohttp.TCPConnector(limit=self.concurrency_level)
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
tasks = []
for _ in range(num_requests):
task = self.single_request(session, self.target_url)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
self.results = [r for r in results if not isinstance(r, Exception)]
return self.analyze_results()
def analyze_results(self):
"""
分析测试结果
"""
if not self.results:
return {}
response_times = [r['response_time'] for r in self.results]
statuses = [r['status'] for r in self.results]
analysis = {
'total_requests': len(self.results),
'successful_requests': len([s for s in statuses if 200 <= s < 400]),
'failed_requests': len([s for s in statuses if s == 0 or s >= 400]),
'average_response_time': sum(response_times) / len(response_times),
'min_response_time': min(response_times),
'max_response_time': max(response_times),
'requests_per_second': len(self.results) / max(response_times)
}
return analysis#监控与运维
#监控系统设计
# monitoring_system.py - 监控系统
import psutil
import time
import json
from datetime import datetime
import redis
from typing import Dict, List, Any
class DistributedCrawlerMonitor:
"""
分布式爬虫监控系统
"""
def __init__(self, redis_client, namespace='crawler_monitor'):
self.redis_client = redis_client
self.namespace = namespace
self.metrics = {}
self.alerts = []
self.health_check_interval = 30
def collect_system_metrics(self) -> Dict[str, Any]:
"""
收集系统指标
"""
return {
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_usage': psutil.disk_usage('/').percent,
'network_io': psutil.net_io_counters()._asdict(),
'process_count': len(psutil.pids()),
'timestamp': datetime.now().isoformat()
}
def collect_redis_metrics(self) -> Dict[str, Any]:
"""
收集Redis指标
"""
info = self.redis_client.info()
return {
'used_memory': info.get('used_memory_human'),
'connected_clients': info.get('connected_clients'),
'total_commands_processed': info.get('total_commands_processed'),
'instantaneous_ops_per_sec': info.get('instantaneous_ops_per_sec'),
'keyspace_hits': info.get('keyspace_hits'),
'keyspace_misses': info.get('keyspace_misses'),
'hit_rate': info.get('keyspace_hits', 0) / (info.get('keyspace_hits', 0) + info.get('keyspace_misses', 1)) * 100,
'timestamp': datetime.now().isoformat()
}
def collect_crawler_metrics(self) -> Dict[str, Any]:
"""
收集爬虫指标
"""
# 从Redis获取爬虫统计数据
stats_keys = self.redis_client.keys(f"{self.namespace}:stats:*")
stats = {}
for key in stats_keys:
node_stats = self.redis_client.hgetall(key)
stats[key.decode()] = {k.decode(): v.decode() for k, v in node_stats.items()}
return stats
def check_health(self) -> Dict[str, Any]:
"""
检查健康状态
"""
system_metrics = self.collect_system_metrics()
redis_metrics = self.collect_redis_metrics()
health_status = {
'system_healthy': (
system_metrics['cpu_percent'] < 80 and
system_metrics['memory_percent'] < 85
),
'redis_healthy': (
redis_metrics['connected_clients'] > 0 and
redis_metrics['hit_rate'] > 80
),
'crawler_healthy': self._check_crawler_health(),
'overall_status': 'healthy'
}
if not health_status['system_healthy']:
health_status['overall_status'] = 'degraded'
self._add_alert('System resources overloaded')
if not health_status['redis_healthy']:
health_status['overall_status'] = 'critical'
self._add_alert('Redis performance issues')
return health_status
def _check_crawler_health(self) -> bool:
"""
检查爬虫健康状态
"""
# 检查队列长度是否正常
queue_keys = self.redis_client.keys('*:requests')
for key in queue_keys:
if self.redis_client.llen(key) > 10000: # 队列过长
self._add_alert(f'Queue {key.decode()} too long')
return False
# 检查去重集合大小
dupefilter_keys = self.redis_client.keys('*:dupefilter')
for key in dupefilter_keys:
if self.redis_client.scard(key) > 1000000: # 去重集合过大
self._add_alert(f'Dupefilter {key.decode()} too large')
return False
return True
def _add_alert(self, message: str):
"""
添加告警
"""
alert = {
'message': message,
'timestamp': datetime.now().isoformat(),
'severity': 'warning'
}
self.alerts.append(alert)
# 存储到Redis
self.redis_client.lpush(f"{self.namespace}:alerts", json.dumps(alert))
self.redis_client.ltrim(f"{self.namespace}:alerts", 0, 99) # 只保留最近100个告警
class AlertManager:
"""
告警管理器
"""
def __init__(self, redis_client):
self.redis_client = redis_client
self.alert_handlers = []
self.severity_levels = {
'info': 0,
'warning': 1,
'error': 2,
'critical': 3
}
def add_alert_handler(self, handler):
"""
添加告警处理器
"""
self.alert_handlers.append(handler)
def trigger_alert(self, message: str, severity: str = 'warning', **context):
"""
触发告警
"""
alert = {
'message': message,
'severity': severity,
'timestamp': datetime.now().isoformat(),
'context': context
}
# 存储告警
self.redis_client.lpush('crawler_alerts', json.dumps(alert))
self.redis_client.ltrim('crawler_alerts', 0, 99) # 只保留最近100个告警
# 执行告警处理器
for handler in self.alert_handlers:
try:
handler(alert)
except Exception as e:
print(f"Alert handler failed: {str(e)}")
def email_alert_handler(self, smtp_config):
"""
邮件告警处理器
"""
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
def send_email(alert):
msg = MIMEMultipart()
msg['Subject'] = f"Crawler Alert: {alert['severity'].upper()}"
msg['From'] = smtp_config['from']
msg['To'] = smtp_config['to']
body = f"""
Crawler Alert
Message: {alert['message']}
Severity: {alert['severity']}
Time: {alert['timestamp']}
Context: {json.dumps(alert['context'], indent=2)}
"""
msg.attach(MIMEText(body, 'plain'))
with smtplib.SMTP(smtp_config['server'], smtp_config['port']) as server:
server.starttls()
server.login(smtp_config['username'], smtp_config['password'])
server.send_message(msg)
return send_email
def webhook_alert_handler(self, webhook_url: str):
"""
Webhook告警处理器
"""
import requests
def send_webhook(alert):
try:
response = requests.post(webhook_url, json=alert)
response.raise_for_status()
except Exception as e:
print(f"Webhook alert failed: {str(e)}")
return send_webhook
class DashboardGenerator:
"""
仪表板生成器
"""
def __init__(self, redis_client):
self.redis_client = redis_client
def generate_overview_dashboard(self) -> Dict[str, Any]:
"""
生成概览仪表板
"""
dashboard = {
'summary': self._get_summary_stats(),
'performance_trends': self._get_performance_trends(),
'health_status': self._get_health_status(),
'recent_alerts': self._get_recent_alerts(),
'node_status': self._get_node_status(),
'generated_at': datetime.now().isoformat()
}
return dashboard
def _get_summary_stats(self) -> Dict[str, Any]:
"""
获取汇总统计
"""
# 获取总的请求数量
total_requests = 0
queue_keys = self.redis_client.keys('*:requests')
for key in queue_keys:
total_requests += self.redis_client.llen(key)
# 获取总的去重请求数量
total_processed = 0
stats_keys = self.redis_client.keys('*:stats')
for key in stats_keys:
stats = self.redis_client.hgetall(key)
if b'response_count' in stats:
total_processed += int(stats[b'response_count'])
# 获取活跃节点数
node_keys = self.redis_client.keys('distributed_spider:nodes:*')
active_nodes = len(node_keys)
return {
'total_requests_in_queue': total_requests,
'total_processed_requests': total_processed,
'active_nodes': active_nodes,
'uptime_hours': self._calculate_uptime()
}
def _get_performance_trends(self) -> Dict[str, Any]:
"""
获取性能趋势
"""
# 这里可以实现历史数据的趋势分析
# 从Redis的时间序列数据中获取
return {
'requests_per_minute': self._get_requests_per_minute(),
'average_response_time': self._get_average_response_time(),
'success_rate_trend': self._get_success_rate_trend()
}
def _get_health_status(self) -> Dict[str, str]:
"""
获取健康状态
"""
# 检查各个组件的健康状态
redis_ping = self.redis_client.ping()
return {
'redis': 'healthy' if redis_ping else 'unhealthy',
'nodes': self._assess_node_health(),
'queues': self._assess_queue_health()
}
def _get_recent_alerts(self) -> List[Dict[str, Any]]:
"""
获取最近的告警
"""
alert_data = self.redis_client.lrange('crawler_alerts', 0, 9)
alerts = []
for data in alert_data:
try:
alert = json.loads(data.decode())
alerts.append(alert)
except:
continue
return alerts
def _get_node_status(self) -> Dict[str, Any]:
"""
获取节点状态
"""
node_keys = self.redis_client.keys('distributed_spider:nodes:*')
node_status = {}
for key in node_keys:
node_info = self.redis_client.hgetall(key)
node_id = key.decode().split(':')[-1]
node_status[node_id] = {
'status': node_info.get(b'status', b'unknown').decode(),
'registered_at': node_info.get(b'registered_at', b'').decode(),
'capabilities': json.loads(node_info.get(b'capabilities', b'{}').decode())
}
return node_status
def _calculate_uptime(self) -> float:
"""
计算运行时间
"""
# 可以从Redis中存储的启动时间计算
startup_time = self.redis_client.get('crawler_startup_time')
if startup_time:
startup_timestamp = float(startup_time.decode())
uptime_seconds = time.time() - startup_timestamp
return uptime_seconds / 3600 # 转换为小时
return 0
### 自动运维
```python
# automation.py - 自动运维
import schedule
import time
import subprocess
from datetime import datetime, timedelta
class AutoOpsManager:
"""
自动运维管理器
"""
def __init__(self, monitor, redis_client):
self.monitor = monitor
self.redis_client = redis_client
self.automation_rules = []
self.job_scheduler = schedule.Scheduler()
def setup_regular_tasks(self):
"""
设置定期任务
"""
# 每分钟检查系统健康
self.job_scheduler.every(1).minutes.do(self._regular_health_check)
# 每5分钟清理过期数据
self.job_scheduler.every(5).minutes.do(self._cleanup_expired_data)
# 每小时生成报告
self.job_scheduler.every(1).hour.do(self._generate_hourly_report)
# 每天清理日志
self.job_scheduler.every().day.at("02:00").do(self._cleanup_logs)
def _regular_health_check(self):
"""
定期健康检查
"""
health_status = self.monitor.check_health()
if health_status['overall_status'] == 'critical':
self._trigger_emergency_procedures()
elif health_status['overall_status'] == 'degraded':
self._adjust_performance_settings()
def _cleanup_expired_data(self):
"""
清理过期数据
"""
# 清理超过7天的统计数据
week_ago = (datetime.now() - timedelta(days=7)).isoformat()
# 这里可以实现具体的数据清理逻辑
print(f"[{datetime.now()}] Cleaned up expired data")
def _generate_hourly_report(self):
"""
生成小时报告
"""
dashboard = self.monitor.generate_overview_dashboard()
# 存储报告
report_key = f"crawler_reports:{datetime.now().strftime('%Y%m%d_%H')}"
self.redis_client.setex(report_key, 86400 * 30, json.dumps(dashboard))
print(f"[{datetime.now()}] Generated hourly report")
def _cleanup_logs(self):
"""
清理日志
"""
# 实现日志清理逻辑
print(f"[{datetime.now()}] Cleaned up old logs")
def _trigger_emergency_procedures(self):
"""
触发应急程序
"""
print(f"[{datetime.now()}] Emergency procedures triggered")
# 可以实现降级、暂停爬虫等操作
self._pause_crawling_if_needed()
def _adjust_performance_settings(self):
"""
调整性能设置
"""
print(f"[{datetime.now()}] Adjusting performance settings")
# 根据负载动态调整并发数等参数
self._adjust_concurrency_based_on_load()
def _pause_crawling_if_needed(self):
"""
根据需要暂停爬虫
"""
# 设置暂停标志
self.redis_client.setex('crawler_pause_flag', 300, 'paused') # 暂停5分钟
def _adjust_concurrency_based_on_load(self):
"""
根据负载调整并发数
"""
system_metrics = self.monitor.collect_system_metrics()
if system_metrics['cpu_percent'] > 80:
# 降低并发数
self._reduce_concurrency()
elif system_metrics['cpu_percent'] < 50:
# 增加并发数
self._increase_concurrency()
def _reduce_concurrency(self):
"""
降低并发数
"""
# 向所有节点发送降低并发数的指令
self.redis_client.publish('crawler_control', json.dumps({
'action': 'reduce_concurrency',
'factor': 0.8
}))
def _increase_concurrency(self):
"""
增加并发数
"""
# 向所有节点发送增加并发数的指令
self.redis_client.publish('crawler_control', json.dumps({
'action': 'increase_concurrency',
'factor': 1.2
}))
class DeploymentManager:
"""
部署管理器
"""
def __init__(self):
self.deployment_configs = {}
def deploy_new_version(self, version: str, nodes: List[str] = None):
"""
部署新版本
"""
deployment_plan = {
'version': version,
'nodes': nodes or self._get_all_nodes(),
'strategy': 'rolling_update',
'rollback_point': self._create_rollback_point(),
'start_time': datetime.now().isoformat()
}
# 执行滚动更新
self._execute_rolling_update(deployment_plan)
return deployment_plan
def _get_all_nodes(self) -> List[str]:
"""
获取所有节点
"""
# 从Redis获取所有注册的节点
node_keys = self.redis_client.keys('distributed_spider:nodes:*')
return [key.decode().split(':')[-1] for key in node_keys]
def _create_rollback_point(self) -> str:
"""
创建回滚点
"""
rollback_id = f"rollback_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# 这里可以保存当前配置的状态
return rollback_id
def _execute_rolling_update(self, plan: Dict[str, Any]):
"""
执行滚动更新
"""
print(f"Starting rolling update for version {plan['version']}")
for node in plan['nodes']:
print(f"Updating node: {node}")
# 停止节点
self._stop_node(node)
# 更新代码
self._update_node_code(node, plan['version'])
# 启动节点
self._start_node(node)
# 等待节点健康检查通过
if not self._wait_for_node_healthy(node):
print(f"Node {node} failed to become healthy, initiating rollback")
self._rollback_deployment(plan)
return
print("Rolling update completed successfully")
def _stop_node(self, node: str):
"""
停止节点
"""
# 发送停止指令到节点
self.redis_client.publish(f'node_control:{node}', json.dumps({'action': 'stop'}))
def _update_node_code(self, node: str, version: str):
"""
更新节点代码
"""
# 这里实现具体的代码更新逻辑
# 可以通过git pull、scp等方式更新代码
pass
def _start_node(self, node: str):
"""
启动节点
"""
# 发送启动指令到节点
self.redis_client.publish(f'node_control:{node}', json.dumps({'action': 'start'}))
def _wait_for_node_healthy(self, node: str, timeout: int = 60) -> bool:
"""
等待节点健康
"""
start_time = time.time()
while time.time() - start_time < timeout:
# 检查节点状态
node_status = self.redis_client.hgetall(f'distributed_spider:nodes:{node}')
if node_status and node_status.get(b'status') == b'active':
return True
time.sleep(1)
return False
def _rollback_deployment(self, plan: Dict[str, Any]):
"""
回滚部署
"""
print(f"Initiating rollback to {plan['rollback_point']}")
# 实现回滚逻辑#最佳实践总结
#设计原则
"""
分布式爬虫系统设计最佳实践:
1. 高可用性原则:
- 多节点部署,避免单点故障
- 健康检查和自动恢复机制
- 数据备份和恢复策略
2. 可扩展性原则:
- 水平扩展能力
- 模块化设计
- 配置驱动的扩展
3. 性能优化原则:
- 连接池复用
- 批量操作
- 缓存策略
- 异步处理
4. 安全性原则:
- 认证授权机制
- 数据加密传输
- 访问控制策略
5. 监控治理原则:
- 全链路监控
- 告警通知
- 性能分析
- 日志审计
"""
### 性能调优建议
```python
"""
性能调优最佳实践:
1. Redis优化:
- 合理设置内存限制和淘汰策略
- 使用连接池避免频繁连接创建
- 批量操作减少网络开销
- 启用压缩节省存储空间
2. 爬虫配置优化:
- 合理设置并发数避免过度负载
- 适当的延迟策略避免被封禁
- 有效的重试机制处理临时故障
- 智能的请求调度算法
3. 网络优化:
- 使用连接复用减少握手开销
- 启用HTTP/2提升传输效率
- 合理的超时设置避免阻塞
- DNS缓存减少解析时间
4. 数据处理优化:
- 流式处理大数据集
- 内存复用减少GC压力
- 异步IO提升吞吐量
- 数据分区并行处理
"""
### 部署运维最佳实践
```python
"""
部署运维最佳实践:
1. 容器化部署:
- 使用Docker标准化环境
- Docker Compose简化多服务编排
- Kubernetes实现自动化运维
2. 监控告警:
- 实时监控系统状态
- 自定义业务指标监控
- 多渠道告警通知
- 告警分级处理
3. 日志管理:
- 结构化日志输出
- 集中化日志收集
- 日志轮转防止单文件过大
- 日志分析挖掘价值信息
4. 安全防护:
- 网络隔离和访问控制
- 敏感信息加密存储
- 定期安全扫描和漏洞修复
- 安全审计和合规检查
"""
通过本章的学习,你应该已经掌握了Scrapy-Redis分布式架构的核心技术和实现方法。分布式爬虫系统不仅能显著提升数据采集效率,还能提供高可用性和可扩展性。在实际项目中,需要根据具体需求和环境特点,灵活运用这些技术和最佳实践,构建稳定高效的分布式爬虫系统。
