#大规模爬虫优化 - 内存管理、网络优化与性能调优详解
📂 所属阶段:第五阶段 — 战力升级(分布式与进阶篇)
🔗 相关章节:自动限速AutoThrottle · 数据去重与增量更新 · 分布式去重与调度
#目录
#大规模爬虫优化概述
大规模爬虫系统面临着诸多挑战,包括内存使用、网络带宽、并发控制、数据处理效率等问题。优化这些方面对于确保爬虫系统的稳定性和高效性至关重要。
#大规模爬虫面临的挑战
"""
大规模爬虫面临的主要挑战:
1. 内存压力:
- 大量URL队列存储
- 响应数据缓存
- 中间数据处理
- 状态跟踪维护
2. 网络瓶颈:
- 连接池管理
- DNS解析效率
- 带宽利用率
- 超时和重试策略
3. 并发控制:
- 请求频率限制
- 线程/协程管理
- 资源竞争协调
- 负载均衡策略
4. 数据处理:
- 实时处理能力
- 存储效率优化
- 数据质量保障
- 错误处理机制
"""#优化目标与原则
"""
大规模爬虫优化的核心目标:
1. 稳定性:
- 7x24小时稳定运行
- 容错和恢复机制
- 异常处理完善
2. 效率性:
- 最大化吞吐量
- 最小化资源消耗
- 降低响应延迟
3. 可扩展性:
- 支持动态扩容
- 水平扩展能力
- 负载均衡支持
4. 可维护性:
- 详细的监控指标
- 清晰的日志记录
- 易于调试和优化
"""#内存管理优化
内存管理是大规模爬虫优化的核心环节,直接影响系统的稳定性和性能。
#内存配置优化
# advanced_settings.py - 高级内存配置
import psutil
import gc
from scrapy.utils.reactor import install_reactor
# 内存使用限制
MEMUSAGE_ENABLED = True
MEMUSAGE_LIMIT_MB = 2048 # 限制内存使用为2GB
MEMUSAGE_WARNING_MB = 1536 # 1.5GB警告
# 内存调试
MEMDEBUG_ENABLED = True
MEMDEBUG_NOTIFY = ['admin@example.com'] # 内存泄露通知邮箱
# 自动关闭爬虫当内存超过限制
CLOSESPIDER_MEMUSAGE = 2048 # 2GB
# 优化内存使用的配置
FEED_EXPORTERS = {
'json': 'scrapy.exporters.JsonItemExporter',
'csv': 'scrapy.exporters.CsvItemExporter',
}
FEED_STORAGES = {
'file': 'scrapy.extensions.feedexport.FileFeedStorage',
}
# 限制并发项目处理数量
CONCURRENT_ITEMS = 100
# 优化下载器内存使用
DOWNLOAD_HANDLERS = {
'http': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler',
'https': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler',
}
# 限制下载响应大小
DOWNLOAD_MAXSIZE = 50 * 1024 * 1024 # 50MB
DOWNLOAD_WARNSIZE = 10 * 1024 * 1024 # 10MB警告
# 优化请求队列内存使用
SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.PickleLifoMemoryQueue'#内存监控与管理
# memory_manager.py - 内存管理器
import psutil
import gc
import time
import logging
from scrapy import signals
from scrapy.exceptions import CloseSpider
from pydispatch import dispatcher
class MemoryManager:
"""
内存管理器
"""
def __init__(self, crawler):
self.crawler = crawler
self.stats = crawler.stats
self.max_memory_mb = crawler.settings.getint('MEMUSAGE_LIMIT_MB', 2048)
self.warning_memory_mb = crawler.settings.getint('MEMUSAGE_WARNING_MB', 1536)
self.logger = logging.getLogger(__name__)
# 连接信号
dispatcher.connect(self.spider_opened, signal=signals.spider_opened)
dispatcher.connect(self.spider_closed, signal=signals.spider_closed)
def spider_opened(self, spider):
"""
爬虫开启时的内存监控
"""
self.logger.info(f"Memory manager started for spider: {spider.name}")
# 启动内存监控
self.start_memory_monitoring()
def spider_closed(self, spider, reason):
"""
爬虫关闭时的内存清理
"""
self.logger.info(f"Memory manager stopped for spider: {spider.name}")
# 执行垃圾回收
collected = gc.collect()
self.logger.info(f"Garbage collected: {collected} objects")
def start_memory_monitoring(self):
"""
开始内存监控
"""
self.last_check_time = time.time()
def check_memory_usage(self) -> bool:
"""
检查内存使用情况
Returns:
True: 内存使用正常,False: 内存超限需要关闭爬虫
"""
process = psutil.Process()
memory_info = process.memory_info()
memory_mb = memory_info.rss / 1024 / 1024
# 更新统计信息
self.stats.set_value('memusage/rss', memory_mb)
# 检查是否达到警告阈值
if memory_mb > self.warning_memory_mb:
self.logger.warning(f"Memory usage warning: {memory_mb:.2f}MB / {self.warning_memory_mb}MB")
# 检查是否达到限制阈值
if memory_mb > self.max_memory_mb:
self.logger.error(f"Memory usage exceeded limit: {memory_mb:.2f}MB / {self.max_memory_mb}MB")
return False
return True
def get_memory_info(self) -> dict:
"""
获取内存信息
"""
process = psutil.Process()
memory_info = process.memory_info()
return {
'rss': memory_info.rss / 1024 / 1024, # MB
'vms': memory_info.vms / 1024 / 1024, # MB
'percent': process.memory_percent(),
'max_limit_mb': self.max_memory_mb,
'warning_limit_mb': self.warning_memory_mb
}
class MemoryUsageExtension:
"""
内存使用扩展
"""
def __init__(self, crawler):
self.crawler = crawler
self.memory_manager = MemoryManager(crawler)
@classmethod
def from_crawler(cls, crawler):
ext = cls(crawler)
# 监听spider_idle信号来检查内存使用
crawler.signals.connect(ext.spider_idle, signal=signals.spider_idle)
return ext
def spider_idle(self, spider):
"""
爬虫空闲时检查内存使用
"""
if not self.memory_manager.check_memory_usage():
# 内存超限,关闭爬虫
raise CloseSpider('memory_exceeded')#内存优化中间件
# memory_optimization_middleware.py - 内存优化中间件
import gc
import weakref
from scrapy.http import Response
from itemadapter import ItemAdapter
class MemoryOptimizationMiddleware:
"""
内存优化中间件
"""
def __init__(self):
self.item_cache = weakref.WeakSet() # 使用弱引用避免内存泄漏
self.response_cache = {}
self.cache_size_limit = 1000
def process_spider_input(self, response, spider):
"""
处理输入响应的内存优化
"""
# 检查响应大小
if len(response.body) > 10 * 1024 * 1024: # 10MB
spider.logger.warning(f"Large response detected: {len(response.body)} bytes for {response.url}")
# 清理缓存
if len(self.response_cache) > self.cache_size_limit:
# 清理一半缓存
keys_to_remove = list(self.response_cache.keys())[:self.cache_size_limit // 2]
for key in keys_to_remove:
del self.response_cache[key]
return None
def process_spider_output(self, response, result, spider):
"""
处理输出结果的内存优化
"""
for item_or_request in result:
if hasattr(item_or_request, 'fields'): # Item
# 优化Item内存使用
adapter = ItemAdapter(item_or_request)
# 限制大字段大小
for field_name, value in adapter.items():
if isinstance(value, str) and len(value) > 10000: # 10KB
adapter[field_name] = value[:10000] + "...[TRUNCATED]"
# 添加到弱引用缓存
self.item_cache.add(item_or_request)
yield item_or_request
def spider_closed(self, spider):
"""
爬虫关闭时的内存清理
"""
# 执行垃圾回收
collected = gc.collect()
spider.logger.info(f"Memory optimization middleware cleanup: collected {collected} objects")#网络性能优化
网络性能优化是提高爬虫效率的关键因素。
#网络配置优化
# network_optimization_settings.py - 网络优化配置
import socket
# DNS优化
DNSCACHE_ENABLED = True
DNSCACHE_SIZE = 10000 # 增大DNS缓存
DNS_TIMEOUT = 60 # DNS超时时间
# 连接池优化
CONCURRENT_REQUESTS = 32 # 并发请求数
CONCURRENT_REQUESTS_PER_DOMAIN = 8 # 每个域名的并发请求数
CONCURRENT_REQUESTS_PER_IP = 4 # 每个IP的并发请求数
# 下载超时设置
DOWNLOAD_TIMEOUT = 30 # 下载超时时间
DOWNLOAD_DELAY = 1 # 下载延迟
RANDOMIZE_DOWNLOAD_DELAY = 0.5 # 随机延迟比例
# 重试设置
RETRY_TIMES = 3 # 重试次数
RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 429, 403] # 需要重试的HTTP状态码
RETRY_PRIORITY_ADJUST = -1 # 重试请求优先级调整
# 下载器设置
DOWNLOADER_STATS = True # 启用下载器统计
DOWNLOAD_FAIL_ON_DATALOSS = False # 数据丢失时不失败
# TCP连接优化
REACTOR_THREADPOOL_MAXSIZE = 20 # Reactor线程池最大大小
TCP_DEFERRED_LISTENING_DELAY = 0.1 # TCP延迟监听
# HTTP缓存(通常在大规模爬虫中关闭)
HTTPCACHE_ENABLED = False # 关闭HTTP缓存以节省内存
HTTPCACHE_EXPIRATION_SECS = 0
# User-Agent优化
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
# 下载器中间件配置
DOWNLOADER_MIDDLEWARES = {
'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
'scrapy.downloadermiddlewares.retry.RetryMiddleware': 90,
'scrapy.downloadermiddlewares.redirect.MetaRefreshMiddleware': 80,
'scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware': 81,
'scrapy.downloadermiddlewares.stats.DownloaderStats': 82,
}#连接池管理
# connection_pool_manager.py - 连接池管理器
from scrapy.core.downloader.handlers.http11 import TunnelError
from twisted.internet import reactor
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.web.client import Agent, HTTPConnectionPool
from scrapy.core.downloader.contextfactory import ScrapyClientContextFactory
import logging
class OptimizedHTTPPool:
"""
优化的HTTP连接池
"""
def __init__(self, maxsize=20, persistent=True):
self.pool = HTTPConnectionPool(reactor, persistent=persistent)
self.pool.maxPersistentPerHost = maxsize
self.pool.cachedConnectionTimeout = 30 # 连接超时
self.pool.retryAutomatically = True # 自动重试
def get_agent(self, crawler):
"""
获取优化的HTTP代理
"""
context_factory = ScrapyClientContextFactory.from_settings(crawler.settings)
agent = Agent(reactor, contextFactory=context_factory, pool=self.pool)
return agent
def close_pool(self):
"""
关闭连接池
"""
d = self.pool.closeCachedConnections()
return d
class NetworkOptimizationMiddleware:
"""
网络优化中间件
"""
def __init__(self, crawler):
self.crawler = crawler
self.connection_pool = OptimizedHTTPPool(maxsize=crawler.settings.getint('CONCURRENT_REQUESTS_PER_DOMAIN', 8))
self.logger = logging.getLogger(__name__)
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_request(self, request, spider):
"""
处理请求的网络优化
"""
# 添加网络优化头部
request.headers.setdefault('Accept-Encoding', 'gzip, deflate')
request.headers.setdefault('Connection', 'keep-alive')
request.headers.setdefault('Keep-Alive', 'timeout=30, max=100')
# 根据URL优化请求策略
if self._is_api_request(request.url):
# API请求可能需要不同的策略
request.meta.setdefault('download_timeout', 15)
elif self._is_static_resource(request.url):
# 静态资源可能需要不同的策略
request.meta.setdefault('download_timeout', 10)
return None
def process_response(self, request, response, spider):
"""
处理响应的网络优化
"""
# 检查响应头信息
content_length = response.headers.get('Content-Length')
if content_length:
try:
size = int(content_length)
if size > 50 * 1024 * 1024: # 50MB
spider.logger.warning(f"Large response: {size} bytes from {response.url}")
except ValueError:
pass
# 检查压缩情况
content_encoding = response.headers.get('Content-Encoding', b'').decode('utf-8')
if 'gzip' not in content_encoding and len(response.body) > 1024: # 1KB
spider.logger.debug(f"Response not compressed: {response.url}")
return response
def _is_api_request(self, url: str) -> bool:
"""
判断是否为API请求
"""
api_patterns = ['/api/', '/v1/', '/v2/', '/json/', '.json', '/rest/']
return any(pattern in url.lower() for pattern in api_patterns)
def _is_static_resource(self, url: str) -> bool:
"""
判断是否为静态资源
"""
static_extensions = ['.css', '.js', '.jpg', '.jpeg', '.png', '.gif', '.ico', '.svg', '.woff', '.woff2', '.ttf']
return any(url.lower().endswith(ext) for ext in static_extensions)
def spider_closed(self, spider):
"""
爬虫关闭时清理连接池
"""
self.connection_pool.close_pool()
self.logger.info("Connection pool closed")#自适应限速优化
# adaptive_throttling.py - 自适应限速
import time
import statistics
from collections import deque, defaultdict
from scrapy.downloadermiddlewares.throttle import AutoThrottle
class AdaptiveAutoThrottle(AutoThrottle):
"""
自适应自动限速
"""
def __init__(self, crawler):
super().__init__(crawler)
self.response_times = defaultdict(lambda: deque(maxlen=100)) # 响应时间记录
self.success_rates = defaultdict(lambda: deque(maxlen=50)) # 成功率记录
self.adjustment_history = defaultdict(list) # 调整历史
self.min_delay = crawler.settings.getfloat('AUTOTHROTTLE_MIN_DELAY', 1)
self.max_delay = crawler.settings.getfloat('AUTOTHROTTLE_MAX_DELAY', 60)
self.target_avg_delay = crawler.settings.getfloat('AUTOTHROTTLE_TARGET_CONCURRENCY', 1.0)
def _adjust_delay(self, slot, latency, response=None):
"""
自适应调整延迟
"""
# 记录响应时间
slot_key = slot.key
self.response_times[slot_key].append(latency)
# 计算平均响应时间和变化趋势
if len(self.response_times[slot_key]) >= 10:
recent_avg = statistics.mean(list(self.response_times[slot_key])[-10:])
overall_avg = statistics.mean(self.response_times[slot_key])
# 根据响应时间趋势调整延迟
if recent_avg > overall_avg * 1.2: # 响应时间变慢
# 增加延迟
new_delay = min(slot.delay * 1.2, self.max_delay)
slot.delay = new_delay
elif recent_avg < overall_avg * 0.8: # 响应时间变快
# 减少延迟
new_delay = max(slot.delay * 0.9, self.min_delay)
slot.delay = new_delay
# 根据成功率调整(如果可用)
if response and hasattr(response, 'status'):
success = 200 <= response.status < 400
self.success_rates[slot_key].append(1 if success else 0)
if len(self.success_rates[slot_key]) >= 10:
recent_success_rate = sum(list(self.success_rates[slot_key])[-10:]) / 10
if recent_success_rate < 0.8: # 成功率低,可能是被限制
new_delay = min(slot.delay * 1.5, self.max_delay)
slot.delay = new_delay
elif recent_success_rate > 0.95: # 成功率很高,可以加快
new_delay = max(slot.delay * 0.9, self.min_delay)
slot.delay = new_delay
def _get_slot_key(self, request, spider):
"""
获取槽键(可根据域名或其他因素)
"""
return request.meta.get('slot_key') or super()._get_slot_key(request, spider)
class AdvancedThrottleMiddleware:
"""
高级限速中间件
"""
def __init__(self, crawler):
self.crawler = crawler
self.throttle = AdaptiveAutoThrottle(crawler)
self.domain_configs = {}
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_request(self, request, spider):
"""
处理请求时应用限速
"""
# 获取域名配置
domain = self._get_domain(request.url)
config = self.domain_configs.get(domain, {})
# 应用自定义限速规则
custom_delay = config.get('delay')
if custom_delay:
time.sleep(custom_delay)
# 应用自适应限速
self.throttle.process_request(request, spider)
return None
def process_response(self, request, response, spider):
"""
处理响应后更新限速参数
"""
# 更新限速参数
latency = response.meta.get('download_latency', 0)
self.throttle._adjust_delay(self.throttle._get_slot(request, spider), latency, response)
return response
def _get_domain(self, url: str) -> str:
"""
从URL提取域名
"""
from urllib.parse import urlparse
parsed = urlparse(url)
return parsed.netloc#并发控制策略
合理的并发控制是大规模爬虫性能的关键。
#并发配置优化
# concurrency_settings.py - 并发配置
# Scrapy内置并发设置
CONCURRENT_REQUESTS = 64 # 总并发请求数
CONCURRENT_REQUESTS_PER_DOMAIN = 16 # 每个域名的并发请求数
CONCURRENT_REQUESTS_PER_IP = 8 # 每个IP的并发请求数
# 下载器设置
DOWNLOAD_DELAY = 0.5 # 下载延迟
RANDOMIZE_DOWNLOAD_DELAY = 0.3 # 随机延迟比例
# 自动限速设置
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 0.5
AUTOTHROTTLE_MAX_DELAY = 10
AUTOTHROTTLE_TARGET_CONCURRENCY = 4.0
AUTOTHROTTLE_DEBUG = True
# 线程池设置
REACTOR_THREADPOOL_MAXSIZE = 20
# 队列设置
SCHEDULER_PRIORITY_QUEUE = 'scrapy.pqueues.ScrapyPriorityQueue'
SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleLifoDiskQueue'
SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.PickleLifoMemoryQueue'
# 处理器设置
CONCURRENT_ITEMS = 200 # 并发处理的项目数#动态并发控制器
# dynamic_concurrency_controller.py - 动态并发控制器
import time
import threading
from collections import defaultdict, deque
from scrapy import signals
from pydispatch import dispatcher
import psutil
class DynamicConcurrencyController:
"""
动态并发控制器
"""
def __init__(self, crawler):
self.crawler = crawler
self.stats = crawler.stats
self.original_concurrent_requests = crawler.settings.getint('CONCURRENT_REQUESTS', 16)
self.original_per_domain = crawler.settings.getint('CONCURRENT_REQUESTS_PER_DOMAIN', 8)
# 性能监控
self.response_times = deque(maxlen=100)
self.success_counts = deque(maxlen=100)
self.error_counts = deque(maxlen=100)
# 系统资源监控
self.cpu_threshold = crawler.settings.getfloat('CONCURRENCY_CPU_THRESHOLD', 80.0)
self.memory_threshold = crawler.settings.getfloat('CONCURRENCY_MEMORY_THRESHOLD', 80.0)
# 控制参数
self.adjustment_interval = crawler.settings.getint('CONCURRENCY_ADJUSTMENT_INTERVAL', 30)
self.last_adjustment = time.time()
# 连接信号
dispatcher.connect(self.response_received, signal=signals.response_received)
dispatcher.connect(self.request_dropped, signal=signals.request_dropped)
def response_received(self, response, request, spider):
"""
响应接收时更新统计信息
"""
latency = response.meta.get('download_latency', 0)
if latency > 0:
self.response_times.append(latency)
success = 200 <= response.status < 400
if success:
self.success_counts.append(1)
else:
self.error_counts.append(1)
def request_dropped(self, request, response, spider):
"""
请求被丢弃时更新统计信息
"""
self.error_counts.append(1)
def should_adjust_concurrency(self) -> bool:
"""
判断是否应该调整并发数
"""
return time.time() - self.last_adjustment > self.adjustment_interval
def get_system_load(self) -> dict:
"""
获取系统负载信息
"""
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
return {
'cpu_percent': cpu_percent,
'memory_percent': memory_percent,
'cpu_threshold_exceeded': cpu_percent > self.cpu_threshold,
'memory_threshold_exceeded': memory_percent > self.memory_threshold
}
def calculate_optimal_concurrency(self) -> tuple:
"""
计算最优并发数
Returns:
(总并发数, 每域名并发数)
"""
system_load = self.get_system_load()
# 基于系统负载调整
if system_load['cpu_threshold_exceeded'] or system_load['memory_threshold_exceeded']:
# 负载过高,降低并发
factor = 0.7
else:
# 负载正常,可以适当增加并发
factor = 1.2
# 基于成功率调整
if len(self.success_counts) >= 20:
recent_success_rate = sum(list(self.success_counts)[-20:]) / 20
if recent_success_rate < 0.7: # 成功率低
factor *= 0.8
elif recent_success_rate > 0.95: # 成功率高
factor *= 1.1
# 基于响应时间调整
if len(self.response_times) >= 20:
avg_response_time = sum(list(self.response_times)[-20:]) / 20
if avg_response_time > 5.0: # 响应时间过长
factor *= 0.8
elif avg_response_time < 1.0: # 响应时间很短
factor *= 1.1
# 计算新的并发数
new_concurrent_requests = int(self.original_concurrent_requests * factor)
new_per_domain = int(self.original_per_domain * factor)
# 限制在合理范围内
new_concurrent_requests = max(4, min(new_concurrent_requests, 128))
new_per_domain = max(2, min(new_per_domain, 32))
return new_concurrent_requests, new_per_domain
def apply_concurrency_adjustment(self, spider):
"""
应用并发数调整
"""
if not self.should_adjust_concurrency():
return
new_total, new_per_domain = self.calculate_optimal_concurrency()
# 更新爬虫设置
current_total = self.crawler.engine.slot.scheduler.concurrent_queue_count
current_per_domain = getattr(self.crawler.engine.slot.scheduler, 'per_domain_concurrent', 8)
if new_total != current_total or new_per_domain != current_per_domain:
spider.logger.info(
f"Adjusting concurrency: total={new_total}(was {current_total}), "
f"per_domain={new_per_domain}(was {current_per_domain})"
)
# 更新统计信息
self.stats.set_value('concurrency/total', new_total)
self.stats.set_value('concurrency/per_domain', new_per_domain)
self.stats.set_value('concurrency/last_adjustment', time.time())
self.last_adjustment = time.time()
class ConcurrencyControlMiddleware:
"""
并发控制中间件
"""
def __init__(self, crawler):
self.controller = DynamicConcurrencyController(crawler)
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_request(self, request, spider):
"""
处理请求时检查并发控制
"""
# 应用动态并发调整
self.controller.apply_concurrency_adjustment(spider)
return None#数据处理优化
数据处理优化对于大规模爬虫的效率至关重要。
#高效数据处理管道
# efficient_pipeline.py - 高效数据处理管道
import json
import pickle
import gzip
import hashlib
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
from scrapy.pipelines.files import FilesPipeline
from scrapy.pipelines.images import ImagesPipeline
import logging
class EfficientDataPipeline:
"""
高效数据处理管道
"""
def __init__(self, settings):
self.settings = settings
self.batch_size = settings.getint('PIPELINE_BATCH_SIZE', 100)
self.batch_buffer = []
self.dropped_count = 0
self.processed_count = 0
self.logger = logging.getLogger(__name__)
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings)
def open_spider(self, spider):
"""
爬虫开启时的初始化
"""
self.start_time = time.time()
self.logger.info("Efficient data pipeline opened")
def close_spider(self, spider):
"""
爬虫关闭时处理剩余数据
"""
if self.batch_buffer:
self._process_batch(spider)
processing_time = time.time() - self.start_time
self.logger.info(
f"Pipeline stats - Processed: {self.processed_count}, "
f"Dropped: {self.dropped_count}, Time: {processing_time:.2f}s"
)
def process_item(self, item, spider):
"""
处理单个项目
"""
adapter = ItemAdapter(item)
# 数据验证和清洗
if not self._validate_item(adapter):
self.dropped_count += 1
raise DropItem(f"Invalid item: {dict(adapter)}")
# 数据标准化
self._normalize_item(adapter)
# 添加处理元数据
adapter['processed_at'] = time.time()
adapter['source_spider'] = spider.name
# 添加到批处理缓冲区
self.batch_buffer.append(dict(adapter))
self.processed_count += 1
# 检查是否需要处理批次
if len(self.batch_buffer) >= self.batch_size:
self._process_batch(spider)
return item
def _validate_item(self, adapter) -> bool:
"""
验证项目数据
"""
# 检查必需字段
required_fields = self.settings.getlist('REQUIRED_FIELDS', [])
for field in required_fields:
if not adapter.get(field):
return False
# 检查数据长度限制
max_lengths = self.settings.getdict('FIELD_MAX_LENGTHS', {})
for field, max_len in max_lengths.items():
value = adapter.get(field)
if value and isinstance(value, str) and len(value) > max_len:
return False
return True
def _normalize_item(self, adapter):
"""
标准化项目数据
"""
# 清理文本字段
for field, value in adapter.items():
if isinstance(value, str):
# 去除多余空白
adapter[field] = ' '.join(value.split())
# 标准化URL
if 'url' in adapter:
adapter['url'] = adapter['url'].strip().lower()
def _process_batch(self, spider):
"""
处理一批数据
"""
if not self.batch_buffer:
return
# 批量处理逻辑
processed_batch = self._batch_process_logic(self.batch_buffer)
# 输出处理结果
self._output_batch(processed_batch, spider)
# 清空缓冲区
self.batch_buffer = []
def _batch_process_logic(self, batch):
"""
批量处理逻辑
"""
# 这里可以实现具体的批量处理逻辑
# 如:批量数据库插入、批量文件写入等
return batch
def _output_batch(self, batch, spider):
"""
输出批处理结果
"""
# 根据配置选择输出方式
output_method = self.settings.get('PIPELINE_OUTPUT_METHOD', 'json')
if output_method == 'json':
self._output_json(batch, spider)
elif output_method == 'csv':
self._output_csv(batch, spider)
elif output_method == 'database':
self._output_database(batch, spider)
def _output_json(self, batch, spider):
"""
JSON格式输出
"""
filename = f"items_{spider.name}_{int(time.time())}.json.gz"
# 压缩输出
json_data = json.dumps(batch, ensure_ascii=False, indent=2)
compressed_data = gzip.compress(json_data.encode('utf-8'))
with open(filename, 'wb') as f:
f.write(compressed_data)
class DeduplicationPipeline:
"""
去重处理管道
"""
def __init__(self, settings):
self.settings = settings
self.use_redis = settings.getbool('DEDUPLICATION_USE_REDIS', True)
self.dedup_field = settings.get('DEDUPLICATION_FIELD', 'url')
if self.use_redis:
import redis
self.redis_client = redis.from_url(settings.get('REDIS_URL', 'redis://localhost:6379'))
self.redis_key = settings.get('DEDUPLICATION_REDIS_KEY', 'scrapy:duplicates')
else:
self.seen_items = set()
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings)
def process_item(self, item, spider):
"""
处理去重逻辑
"""
adapter = ItemAdapter(item)
item_value = adapter.get(self.dedup_field)
if item_value is None:
return item # 没有去重字段,直接通过
# 生成去重标识
dedup_id = self._generate_dedup_id(item_value)
if self._is_duplicate(dedup_id):
raise DropItem(f"Duplicate item: {item_value}")
# 标记为已见过
self._mark_seen(dedup_id)
# 添加去重标识到项目
adapter['dedup_id'] = dedup_id
return item
def _generate_dedup_id(self, value) -> str:
"""
生成去重标识
"""
return hashlib.md5(str(value).encode()).hexdigest()
def _is_duplicate(self, dedup_id: str) -> bool:
"""
检查是否为重复项
"""
if self.use_redis:
return bool(self.redis_client.sismember(self.redis_key, dedup_id))
else:
return dedup_id in self.seen_items
def _mark_seen(self, dedup_id: str):
"""
标记项目为已见过
"""
if self.use_redis:
self.redis_client.sadd(self.redis_key, dedup_id)
else:
self.seen_items.add(dedup_id)#内存高效的数据处理
# memory_efficient_processing.py - 内存高效数据处理
import io
import csv
from contextlib import contextmanager
from scrapy.utils.serialize import ScrapyJSONEncoder
class MemoryEfficientPipeline:
"""
内存高效处理管道
"""
def __init__(self, settings):
self.settings = settings
self.buffer_size = settings.getint('PROCESSING_BUFFER_SIZE', 1024 * 1024) # 1MB
self.temp_storage = settings.get('TEMP_STORAGE_PATH', '/tmp/scrapy_temp')
self.active_files = {} # 活跃的临时文件
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings)
@contextmanager
def _get_temp_file(self, prefix: str):
"""
获取临时文件上下文管理器
"""
import tempfile
temp_file = tempfile.NamedTemporaryFile(
mode='w+b',
prefix=f"{prefix}_",
suffix='.tmp',
dir=self.temp_storage,
delete=False
)
try:
yield temp_file
finally:
temp_file.close()
def process_item(self, item, spider):
"""
处理项目(内存高效)
"""
adapter = ItemAdapter(item)
# 序列化项目到字节
item_bytes = self._serialize_item(adapter)
# 检查是否需要写入临时文件
if len(item_bytes) > self.buffer_size // 4: # 单个项目过大
# 直接写入临时文件
self._write_to_temp_file(item_bytes, spider.name)
else:
# 添加到内存缓冲区
self._add_to_buffer(item_bytes, spider.name)
return item
def _serialize_item(self, adapter) -> bytes:
"""
序列化项目为字节
"""
json_str = ScrapyJSONEncoder().encode(dict(adapter))
return json_str.encode('utf-8')
def _add_to_buffer(self, item_bytes: bytes, spider_name: str):
"""
添加到内存缓冲区
"""
buffer_key = f"{spider_name}_buffer"
if buffer_key not in self.__dict__:
self.__dict__[buffer_key] = io.BytesIO()
self.__dict__[f"{buffer_key}_size"] = 0
buffer = self.__dict__[buffer_key]
buffer_size = self.__dict__[f"{buffer_key}_size"]
if buffer_size + len(item_bytes) > self.buffer_size:
# 缓冲区满了,写入文件
self._flush_buffer(buffer_key, spider_name)
buffer.write(item_bytes)
buffer.write(b'\n') # 分隔符
self.__dict__[f"{buffer_key}_size"] += len(item_bytes) + 1
def _flush_buffer(self, buffer_key: str, spider_name: str):
"""
刷新缓冲区到文件
"""
buffer = self.__dict__.pop(buffer_key)
buffer_size = self.__dict__.pop(f"{buffer_key}_size")
if buffer.tell() > 0:
buffer.seek(0)
temp_filename = f"{self.temp_storage}/{spider_name}_{int(time.time())}_{id(buffer)}.tmp"
with open(temp_filename, 'wb') as f:
while True:
chunk = buffer.read(8192) # 8KB chunks
if not chunk:
break
f.write(chunk)
# 重置缓冲区
self.__dict__[buffer_key] = io.BytesIO()
self.__dict__[f"{buffer_key}_size"] = 0
def _write_to_temp_file(self, item_bytes: bytes, spider_name: str):
"""
直接写入临时文件
"""
temp_filename = f"{self.temp_storage}/{spider_name}_large_{int(time.time())}_{len(item_bytes)}.tmp"
with open(temp_filename, 'wb') as f:
f.write(item_bytes)
def close_spider(self, spider):
"""
爬虫关闭时处理剩余数据
"""
# 处理所有剩余的缓冲区
for attr_name in list(self.__dict__.keys()):
if attr_name.endswith('_buffer'):
spider_name = attr_name.replace('_buffer', '')
self._flush_buffer(attr_name, spider_name)#调度与队列优化
调度和队列优化对于大规模爬虫的性能至关重要。
#高性能调度器
# high_performance_scheduler.py - 高性能调度器
from scrapy.core.scheduler import Scheduler
from scrapy.utils.misc import load_object
from scrapy.utils.job import job_dir
from scrapy.utils.deprecate import create_deprecated_class
import pickle
import gzip
import os
from queuelib import PriorityQueue
from collections import deque
import time
class HighPerformanceScheduler(Scheduler):
"""
高性能调度器
"""
def __init__(self, crawler):
super().__init__(crawler)
self.crawler = crawler
self.stats = crawler.stats
self.dupefilter = self.df_class.from_settings(crawler.settings)
self.jobdir = job_dir(crawler.settings)
# 队列配置
self.queue_cls = load_object(crawler.settings['SCHEDULER_QUEUE_CLASS'])
self.serializer = load_object(crawler.settings['SCHEDULER_SERIALIZER'])
# 性能优化配置
self.batch_size = crawler.settings.getint('SCHEDULER_BATCH_SIZE', 10)
self.prefetch_multiplier = crawler.settings.getint('SCHEDULER_PREFETCH_MULTIPLIER', 3)
# 统计信息
self.enqueued_count = 0
self.dequeue_count = 0
self.start_time = time.time()
# 初始化队列
self._init_queue()
def _init_queue(self):
"""
初始化队列
"""
self.mqs = self.pqclass(self._newmq)
self.df = self.dupefilter
if self.jobdir:
self._restore()
def enqueue_request(self, request):
"""
入队请求(优化版本)
"""
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
dqok = self._dqappend(request)
if dqok:
self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)
else:
self._mqpush(request)
self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)
self.stats.inc_value('scheduler/enqueued', spider=self.spider)
self.enqueued_count += 1
return True
def next_request(self):
"""
获取下一个请求(优化版本)
"""
# 批量预取请求以提高效率
requests = []
for _ in range(self.batch_size):
request = self._next_request_from_queue()
if request:
requests.append(request)
else:
break
if requests:
self.dequeue_count += len(requests)
self.stats.inc_value('scheduler/dequeued', len(requests), spider=self.spider)
# 返回第一个请求,其余的可以缓存供下次使用
current_request = requests[0]
if len(requests) > 1:
# 这里可以实现请求缓存逻辑
pass
return current_request
return None
def _next_request_from_queue(self):
"""
从队列获取下一个请求
"""
request = super().next_request()
return request
def has_pending_requests(self):
"""
检查是否有待处理请求(优化版本)
"""
result = super().has_pending_requests()
# 更新统计信息
self.stats.set_value('scheduler/pending_count', self._get_pending_count())
return result
def _get_pending_count(self) -> int:
"""
获取待处理请求数量
"""
# 这里可以实现更高效的计数逻辑
return len(self.mqs) if hasattr(self, 'mqs') else 0
def close(self, reason):
"""
关闭调度器
"""
if self.jobdir:
self._persist()
# 输出性能统计
total_time = time.time() - self.start_time
self.stats.set_value('scheduler/enqueued_count', self.enqueued_count)
self.stats.set_value('scheduler/dequeued_count', self.dequeue_count)
self.stats.set_value('scheduler/efficiency_ratio',
self.dequeue_count / max(self.enqueued_count, 1))
self.stats.set_value('scheduler/processing_time', total_time)
return super().close(reason)
class PriorityBasedScheduler(HighPerformanceScheduler):
"""
基于优先级的调度器
"""
def __init__(self, crawler):
super().__init__(crawler)
self.priority_weights = crawler.settings.getdict('PRIORITY_WEIGHTS', {
'high': 10,
'medium': 5,
'low': 1
})
def enqueue_request(self, request):
"""
根据优先级入队请求
"""
# 计算优先级
priority_factor = self._calculate_priority(request)
request.priority = request.priority * priority_factor
return super().enqueue_request(request)
def _calculate_priority(self, request) -> float:
"""
计算请求优先级因子
"""
# 根据URL模式设置优先级
url = request.url.lower()
if any(keyword in url for keyword in ['urgent', 'priority', 'important']):
return self.priority_weights.get('high', 10)
elif any(keyword in url for keyword in ['normal', 'standard']):
return self.priority_weights.get('medium', 5)
else:
return self.priority_weights.get('low', 1)#队列性能监控
# queue_monitor.py - 队列监控器
import time
from collections import deque
from scrapy import signals
from pydispatch import dispatcher
import logging
class QueuePerformanceMonitor:
"""
队列性能监控器
"""
def __init__(self, crawler):
self.crawler = crawler
self.stats = crawler.stats
self.logger = logging.getLogger(__name__)
# 性能指标
self.enqueue_times = deque(maxlen=1000)
self.dequeue_times = deque(maxlen=1000)
self.queue_sizes = deque(maxlen=100)
# 统计信息
self.enqueued_count = 0
self.dequeued_count = 0
self.failed_count = 0
# 连接信号
dispatcher.connect(self.request_enqueued, signal=signals.request_enqueued)
dispatcher.connect(self.request_dequeued, signal=signals.request_dequeued)
dispatcher.connect(self.request_dropped, signal=signals.request_dropped)
def request_enqueued(self, request, response, spider):
"""
请求入队时的监控
"""
self.enqueued_count += 1
self.enqueue_times.append(time.time())
# 更新统计
self.stats.inc_value('queue_monitor/enqueued', spider=spider)
self._update_queue_stats(spider)
def request_dequeued(self, request, spider):
"""
请求出队时的监控
"""
self.dequeued_count += 1
self.dequeue_times.append(time.time())
# 更新统计
self.stats.inc_value('queue_monitor/dequeued', spider=spider)
self._update_queue_stats(spider)
def request_dropped(self, request, response, spider):
"""
请求被丢弃时的监控
"""
self.failed_count += 1
# 更新统计
self.stats.inc_value('queue_monitor/failed', spider=spider)
self._update_queue_stats(spider)
def _update_queue_stats(self, spider):
"""
更新队列统计信息
"""
# 获取队列大小(需要访问调度器)
if hasattr(self.crawler.engine, 'slot') and self.crawler.engine.slot:
scheduler = self.crawler.engine.slot.scheduler
if hasattr(scheduler, 'has_pending_requests'):
pending = scheduler.has_pending_requests()
self.stats.set_value('queue_monitor/pending', pending, spider=spider)
# 计算性能指标
if self.enqueue_times and self.dequeue_times:
current_time = time.time()
# 计算请求速率
recent_enqueued = sum(1 for t in self.enqueue_times if current_time - t < 60)
recent_dequeued = sum(1 for t in self.dequeue_times if current_time - t < 60)
self.stats.set_value('queue_monitor/rate/enqueued_per_minute', recent_enqueued, spider=spider)
self.stats.set_value('queue_monitor/rate/dequeued_per_minute', recent_dequeued, spider=spider)
def get_performance_report(self) -> dict:
"""
获取性能报告
"""
current_time = time.time()
return {
'enqueued_count': self.enqueued_count,
'dequeued_count': self.dequeued_count,
'failed_count': self.failed_count,
'success_rate': self.dequeued_count / max(self.enqueued_count, 1),
'average_enqueue_rate': len(self.enqueue_times) / 60 if self.enqueue_times else 0,
'average_dequeue_rate': len(self.dequeue_times) / 60 if self.dequeue_times else 0,
'uptime_minutes': (current_time - self.crawler.stats.start_time.timestamp()) / 60
}
class QueueOptimizationMiddleware:
"""
队列优化中间件
"""
def __init__(self, crawler):
self.monitor = QueuePerformanceMonitor(crawler)
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_spider_output(self, response, result, spider):
"""
处理蜘蛛输出时优化队列
"""
for item_or_request in result:
if hasattr(item_or_request, 'url'): # Request object
# 根据响应情况调整请求优先级
if response.status == 429: # Too Many Requests
# 降低优先级,延后处理
item_or_request.priority -= 10
elif response.status == 200: # Success
# 可以提高相关请求的优先级
pass
yield item_or_request#监控与诊断
完善的监控体系是大规模爬虫稳定运行的保障。
#系统监控组件
# system_monitor.py - 系统监控组件
import psutil
import time
import logging
from collections import deque
from scrapy import signals
from pydispatch import dispatcher
from prometheus_client import Counter, Gauge, Histogram, start_http_server
# Prometheus指标定义
REQUESTS_TOTAL = Counter('scrapy_requests_total', 'Total requests processed', ['spider', 'status'])
ITEMS_SCRAPED = Counter('scrapy_items_scraped_total', 'Total items scraped', ['spider'])
RESPONSE_TIME = Histogram('scrapy_response_time_seconds', 'Response time in seconds', ['spider'])
MEMORY_USAGE = Gauge('scrapy_memory_usage_bytes', 'Memory usage in bytes', ['spider'])
CPU_USAGE = Gauge('scrapy_cpu_usage_percent', 'CPU usage percentage', ['spider'])
class SystemMonitor:
"""
系统监控器
"""
def __init__(self, crawler):
self.crawler = crawler
self.stats = crawler.stats
self.logger = logging.getLogger(__name__)
# 系统资源监控
self.process = psutil.Process()
self.cpu_samples = deque(maxlen=100)
self.memory_samples = deque(maxlen=100)
# 性能指标
self.start_time = time.time()
self.last_sample_time = time.time()
# Prometheus监控
self.enable_prometheus = crawler.settings.getbool('MONITOR_PROMETHEUS_ENABLED', False)
if self.enable_prometheus:
prometheus_port = crawler.settings.getint('MONITOR_PROMETHEUS_PORT', 8000)
start_http_server(prometheus_port)
# 连接信号
dispatcher.connect(self.spider_opened, signal=signals.spider_opened)
dispatcher.connect(self.spider_closed, signal=signals.spider_closed)
dispatcher.connect(self.response_received, signal=signals.response_received)
dispatcher.connect(self.item_scraped, signal=signals.item_scraped)
def spider_opened(self, spider):
"""
爬虫开启监控
"""
self.logger.info(f"System monitoring started for spider: {spider.name}")
spider.crawler.stats.set_value('monitor/start_time', time.time())
def spider_closed(self, spider, reason):
"""
爬虫关闭监控
"""
total_time = time.time() - self.start_time
self.logger.info(f"System monitoring stopped for spider: {spider.name}, runtime: {total_time:.2f}s")
# 输出最终统计
self._log_final_stats(spider)
def response_received(self, response, request, spider):
"""
响应接收监控
"""
# 记录响应时间
download_latency = response.meta.get('download_latency', 0)
if download_latency > 0:
RESPONSE_TIME.labels(spider=spider.name).observe(download_latency)
# 记录请求状态
REQUESTS_TOTAL.labels(spider=spider.name, status=response.status).inc()
# 采样系统资源
self._sample_resources(spider)
def item_scraped(self, item, response, spider):
"""
项目抓取监控
"""
ITEMS_SCRAPED.labels(spider=spider.name).inc()
# 采样系统资源
self._sample_resources(spider)
def _sample_resources(self, spider):
"""
采样系统资源使用情况
"""
current_time = time.time()
# 每秒采样一次
if current_time - self.last_sample_time >= 1.0:
try:
# CPU使用率
cpu_percent = self.process.cpu_percent()
self.cpu_samples.append(cpu_percent)
CPU_USAGE.labels(spider=spider.name).set(cpu_percent)
# 内存使用
memory_info = self.process.memory_info()
self.memory_samples.append(memory_info.rss)
MEMORY_USAGE.labels(spider=spider.name).set(memory_info.rss)
# 更新统计数据
spider.crawler.stats.set_value('monitor/cpu_percent', cpu_percent)
spider.crawler.stats.set_value('monitor/memory_rss', memory_info.rss)
self.last_sample_time = current_time
except Exception as e:
self.logger.error(f"Error sampling resources: {e}")
def _log_final_stats(self, spider):
"""
记录最终统计信息
"""
if self.cpu_samples:
avg_cpu = sum(self.cpu_samples) / len(self.cpu_samples)
max_cpu = max(self.cpu_samples) if self.cpu_samples else 0
spider.crawler.stats.set_value('monitor/avg_cpu_percent', avg_cpu)
spider.crawler.stats.set_value('monitor/max_cpu_percent', max_cpu)
if self.memory_samples:
avg_memory = sum(self.memory_samples) / len(self.memory_samples)
max_memory = max(self.memory_samples) if self.memory_samples else 0
spider.crawler.stats.set_value('monitor/avg_memory_bytes', avg_memory)
spider.crawler.stats.set_value('monitor/max_memory_bytes', max_memory)
def get_current_status(self) -> dict:
"""
获取当前状态
"""
try:
cpu_percent = self.process.cpu_percent()
memory_info = self.process.memory_info()
return {
'cpu_percent': cpu_percent,
'memory_rss': memory_info.rss,
'memory_vms': memory_info.vms,
'num_threads': self.process.num_threads(),
'connections': len(self.process.connections()),
'open_files': len(self.process.open_files()) if self.process.is_running() else 0,
'uptime': time.time() - self.start_time
}
except Exception as e:
self.logger.error(f"Error getting current status: {e}")
return {}
class DetailedStatsCollector:
"""
详细统计收集器
"""
def __init__(self, crawler):
self.crawler = crawler
self.stats = crawler.stats
# 详细统计指标
self.response_status_counts = {}
self.domain_request_counts = {}
self.download_sizes = deque(maxlen=1000)
self.processing_times = deque(maxlen=1000)
def collect_response_stats(self, response, request):
"""
收集响应统计
"""
# 状态码统计
status = response.status
self.response_status_counts[status] = self.response_status_counts.get(status, 0) + 1
# 域名请求统计
from urllib.parse import urlparse
domain = urlparse(request.url).netloc
self.domain_request_counts[domain] = self.domain_request_counts.get(domain, 0) + 1
# 下载大小统计
content_length = len(response.body)
self.download_sizes.append(content_length)
# 更新统计数据
self.stats.set_value(f'response_stats/status/{status}',
self.response_status_counts[status])
self.stats.set_value(f'domain_stats/{domain}',
self.domain_request_counts[domain])
self.stats.set_value('download/avg_size',
sum(self.download_sizes) / len(self.download_sizes) if self.download_sizes else 0)
def collect_processing_stats(self, start_time, end_time):
"""
收集处理时间统计
"""
processing_time = end_time - start_time
self.processing_times.append(processing_time)
avg_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0
self.stats.set_value('processing/avg_time', avg_time)
self.stats.set_value('processing/current_time', processing_time)
def get_detailed_report(self) -> dict:
"""
获取详细报告
"""
return {
'response_statuses': dict(self.response_status_counts),
'domain_requests': dict(self.domain_request_counts),
'avg_download_size': sum(self.download_sizes) / len(self.download_sizes) if self.download_sizes else 0,
'max_download_size': max(self.download_sizes) if self.download_sizes else 0,
'avg_processing_time': sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0,
'total_requests': sum(self.response_status_counts.values()),
'successful_requests': sum(v for k, v in self.response_status_counts.items() if 200 <= k < 400)
}#告警与通知系统
# alert_system.py - 告警系统
import smtplib
import json
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
import logging
class AlertSystem:
"""
告警系统
"""
def __init__(self, crawler):
self.crawler = crawler
self.settings = crawler.settings
self.logger = logging.getLogger(__name__)
# 告警配置
self.email_enabled = settings.getbool('ALERT_EMAIL_ENABLED', False)
self.email_host = settings.get('ALERT_EMAIL_HOST', 'localhost')
self.email_port = settings.getint('ALERT_EMAIL_PORT', 587)
self.email_user = settings.get('ALERT_EMAIL_USER')
self.email_password = settings.get('ALERT_EMAIL_PASSWORD')
self.alert_recipients = settings.getlist('ALERT_RECIPIENTS', [])
# 告警阈值
self.error_rate_threshold = settings.getfloat('ALERT_ERROR_RATE_THRESHOLD', 0.1) # 10%错误率
self.slow_response_threshold = settings.getfloat('ALERT_SLOW_RESPONSE_THRESHOLD', 10.0) # 10秒
self.memory_usage_threshold = settings.getfloat('ALERT_MEMORY_USAGE_THRESHOLD', 90.0) # 90%内存使用
self.cpu_usage_threshold = settings.getfloat('ALERT_CPU_USAGE_THRESHOLD', 90.0) # 90% CPU使用
# 告警历史
self.alert_history = {}
self.suppression_window = settings.getint('ALERT_SUPPRESSION_WINDOW', 300) # 5分钟抑制窗口
def check_and_alert(self, spider, current_stats: dict):
"""
检查指标并发送告警
"""
alerts = []
# 检查错误率
error_rate = self._calculate_error_rate(current_stats)
if error_rate > self.error_rate_threshold:
alerts.append({
'type': 'high_error_rate',
'message': f'High error rate detected: {error_rate:.2%}',
'severity': 'HIGH',
'value': error_rate
})
# 检查响应时间
avg_response_time = current_stats.get('avg_response_time', 0)
if avg_response_time > self.slow_response_threshold:
alerts.append({
'type': 'slow_responses',
'message': f'Slow average response time: {avg_response_time:.2f}s',
'severity': 'MEDIUM',
'value': avg_response_time
})
# 检查内存使用
memory_percent = current_stats.get('memory_percent', 0)
if memory_percent > self.memory_usage_threshold:
alerts.append({
'type': 'high_memory',
'message': f'High memory usage: {memory_percent:.1f}%',
'severity': 'HIGH',
'value': memory_percent
})
# 检查CPU使用
cpu_percent = current_stats.get('cpu_percent', 0)
if cpu_percent > self.cpu_usage_threshold:
alerts.append({
'type': 'high_cpu',
'message': f'High CPU usage: {cpu_percent:.1f}%',
'severity': 'MEDIUM',
'value': cpu_percent
})
# 发送告警
for alert in alerts:
self._send_alert(spider, alert)
def _calculate_error_rate(self, stats: dict) -> float:
"""
计算错误率
"""
total_requests = stats.get('scheduler/enqueued', 0)
if total_requests == 0:
return 0.0
error_count = stats.get('downloader/exception_count', 0)
return error_count / total_requests
def _should_send_alert(self, alert_type: str) -> bool:
"""
检查是否应该发送告警(避免重复告警)
"""
current_time = time.time()
last_alert_time = self.alert_history.get(alert_type, 0)
return current_time - last_alert_time > self.suppression_window
def _send_alert(self, spider, alert: dict):
"""
发送告警
"""
alert_type = alert['type']
if not self._should_send_alert(alert_type):
return
# 更新告警历史
self.alert_history[alert_type] = time.time()
# 构建告警消息
subject = f"[SCRAZY ALERT] {alert['type'].upper()} - Spider: {spider.name}"
message = self._build_alert_message(spider, alert)
# 发送告警
if self.email_enabled:
self._send_email_alert(subject, message)
# 记录日志
self.logger.warning(f"Alert sent: {alert['message']}")
def _build_alert_message(self, spider, alert: dict) -> str:
"""
构建告警消息
"""
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
message = f"""
Scrapy Spider Alert
Time: {current_time}
Spider: {spider.name}
Alert Type: {alert['type']}
Severity: {alert['severity']}
Message: {alert['message']}
Value: {alert['value']}
Current Stats:
- Requests Enqueued: {self.crawler.stats.get_value('scheduler/enqueued', 0)}
- Items Scraped: {self.crawler.stats.get_value('item_scraped_count', 0)}
- Error Count: {self.crawler.stats.get_value('downloader/exception_count', 0)}
- Memory Usage: {self.crawler.stats.get_value('monitor/memory_percent', 0):.1f}%
- CPU Usage: {self.crawler.stats.get_value('monitor/cpu_percent', 0):.1f}%
This is an automated alert from your Scrapy monitoring system.
"""
return message
def _send_email_alert(self, subject: str, message: str):
"""
发送邮件告警
"""
if not self.alert_recipients:
self.logger.warning("No alert recipients configured")
return
try:
msg = MIMEMultipart()
msg['From'] = self.email_user
msg['To'] = ', '.join(self.alert_recipients)
msg['Subject'] = subject
msg.attach(MIMEText(message, 'plain', 'utf-8'))
server = smtplib.SMTP(self.email_host, self.email_port)
server.starttls()
server.login(self.email_user, self.email_password)
text = msg.as_string()
server.sendmail(self.email_user, self.alert_recipients, text)
server.quit()
self.logger.info(f"Alert email sent to: {self.alert_recipients}")
except Exception as e:
self.logger.error(f"Failed to send alert email: {e}")
class HealthCheckMiddleware:
"""
健康检查中间件
"""
def __init__(self, crawler):
self.alert_system = AlertSystem(crawler)
self.check_interval = crawler.settings.getint('HEALTH_CHECK_INTERVAL', 60) # 60秒检查一次
self.last_check_time = time.time()
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_spider_output(self, response, result, spider):
"""
处理蜘蛛输出时进行健康检查
"""
current_time = time.time()
# 定期进行健康检查
if current_time - self.last_check_time > self.check_interval:
current_stats = dict(spider.crawler.stats.get_stats())
self.alert_system.check_and_alert(spider, current_stats)
self.last_check_time = current_time
for item_or_request in result:
yield item_or_request#故障恢复与容错
健壮的容错机制是大规模爬虫系统可靠性的基础。
#容错处理机制
# fault_tolerance.py - 容错处理机制
import time
import random
import logging
from scrapy.exceptions import IgnoreRequest, DropItem
from scrapy.http import Request
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.task import deferLater
class FaultToleranceMiddleware:
"""
容错中间件
"""
def __init__(self, crawler):
self.crawler = crawler
self.settings = crawler.settings
self.logger = logging.getLogger(__name__)
# 重试配置
self.max_retries = settings.getint('FAULT_TOLERANCE_MAX_RETRIES', 5)
self.retry_http_codes = set(settings.getlist('FAULT_TOLERANCE_RETRY_CODES',
[500, 502, 503, 504, 408, 429]))
self.retry_factor = settings.getfloat('FAULT_TOLERANCE_RETRY_FACTOR', 1.5)
self.retry_max_delay = settings.getfloat('FAULT_TOLERANCE_RETRY_MAX_DELAY', 60)
# 熔断器配置
self.circuit_breaker_enabled = settings.getbool('FAULT_TOLERANCE_CIRCUIT_BREAKER', True)
self.failure_threshold = settings.getint('FAULT_TOLERANCE_FAILURE_THRESHOLD', 5)
self.reset_timeout = settings.getint('FAULT_TOLERANCE_RESET_TIMEOUT', 60)
# 域名级别熔断器
self.domain_circuit_breakers = {}
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_request(self, request, spider):
"""
处理请求时的容错检查
"""
# 检查熔断器状态
if self.circuit_breaker_enabled:
domain = self._get_domain(request.url)
circuit_breaker = self._get_circuit_breaker(domain)
if circuit_breaker.is_open():
# 熔断器打开,跳过请求
spider.logger.warning(f"Circuit breaker open for domain {domain}, skipping request: {request.url}")
raise IgnoreRequest(f"Circuit breaker open for {domain}")
# 添加重试元数据
if 'fault_tolerance_retry_times' not in request.meta:
request.meta['fault_tolerance_retry_times'] = 0
request.meta['fault_tolerance_original_url'] = request.url
return None
def process_response(self, request, response, spider):
"""
处理响应时的容错逻辑
"""
domain = self._get_domain(request.url)
# 检查是否需要重试
if response.status in self.retry_http_codes:
retry_times = request.meta.get('fault_tolerance_retry_times', 0)
if retry_times < self.max_retries:
# 计算重试延迟
delay = self._calculate_retry_delay(retry_times)
spider.logger.warning(
f"Retrying {request.url} after {response.status} (attempt {retry_times + 1}/{self.max_retries}), "
f"delay: {delay}s"
)
# 标记域名为失败(更新熔断器)
self._record_failure(domain)
# 创建重试请求
retry_req = request.copy()
retry_req.meta['fault_tolerance_retry_times'] = retry_times + 1
# 添加重试延迟
reactor.callLater(delay, lambda: self._schedule_retry(spider, retry_req))
return response.replace(status=200) # 防止进一步处理原始响应
else:
spider.logger.error(f"Max retries exceeded for {request.url}")
# 成功响应,关闭熔断器(如果需要)
self._record_success(domain)
return response
def process_exception(self, request, exception, spider):
"""
处理异常时的容错逻辑
"""
domain = self._get_domain(request.url)
# 记录失败
self._record_failure(domain)
retry_times = request.meta.get('fault_tolerance_retry_times', 0)
if retry_times < self.max_retries:
delay = self._calculate_retry_delay(retry_times)
spider.logger.warning(
f"Retrying {request.url} after exception {type(exception).__name__} "
f"(attempt {retry_times + 1}/{self.max_retries}), delay: {delay}s"
)
# 创建重试请求
retry_req = request.copy()
retry_req.meta['fault_tolerance_retry_times'] = retry_times + 1
# 添加重试延迟
reactor.callLater(delay, lambda: self._schedule_retry(spider, retry_req))
return response.replace(status=200) # 返回占位响应
# 达到最大重试次数,记录错误
spider.logger.error(f"Max retries exceeded for {request.url} due to {exception}")
return None
def _calculate_retry_delay(self, retry_times: int) -> float:
"""
计算重试延迟时间(指数退避)
"""
base_delay = 1.0
delay = base_delay * (self.retry_factor ** retry_times)
return min(delay, self.retry_max_delay)
def _schedule_retry(self, spider, request):
"""
调度重试请求
"""
spider.crawler.engine.schedule(request, spider)
def _get_domain(self, url: str) -> str:
"""
从URL获取域名
"""
from urllib.parse import urlparse
parsed = urlparse(url)
return parsed.netloc
def _get_circuit_breaker(self, domain: str):
"""
获取域名的熔断器
"""
if domain not in self.domain_circuit_breakers:
self.domain_circuit_breakers[domain] = CircuitBreaker(
failure_threshold=self.failure_threshold,
reset_timeout=self.reset_timeout
)
return self.domain_circuit_breakers[domain]
def _record_failure(self, domain: str):
"""
记录失败
"""
if self.circuit_breaker_enabled:
circuit_breaker = self._get_circuit_breaker(domain)
circuit_breaker.record_failure()
class CircuitBreaker:
"""
熔断器实现
"""
def __init__(self, failure_threshold: int = 5, reset_timeout: int = 60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
self.last_attempt_time = None
def is_open(self) -> bool:
"""
检查熔断器是否打开
"""
if self.state == 'OPEN':
# 检查重置超时
if time.time() - self.last_failure_time >= self.reset_timeout:
self.state = 'HALF_OPEN'
self.last_attempt_time = time.time()
return False # 半开状态,允许一次尝试
return True
elif self.state == 'HALF_OPEN':
# 半开状态下,如果尝试间隔太短,仍然返回True
if (time.time() - self.last_attempt_time) < 5: # 5秒内不允许重复尝试
return True
return False
else: # CLOSED
return False
def record_failure(self):
"""
记录失败
"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
def record_success(self):
"""
记录成功
"""
self.failure_count = 0
self.state = 'CLOSED'
def allow_request(self) -> bool:
"""
是否允许请求
"""
return not self.is_open()
class CheckpointManager:
"""
检查点管理器 - 实现断点续传功能
"""
def __init__(self, checkpoint_dir: str = './checkpoints'):
self.checkpoint_dir = checkpoint_dir
self.checkpoint_file = os.path.join(checkpoint_dir, 'checkpoint.json')
self.ensure_checkpoint_dir()
def ensure_checkpoint_dir(self):
"""
确保检查点目录存在
"""
os.makedirs(self.checkpoint_dir, exist_ok=True)
def save_checkpoint(self, spider_name: str, stats: dict, queue_state: dict = None):
"""
保存检查点
"""
checkpoint_data = {
'spider_name': spider_name,
'timestamp': time.time(),
'stats': dict(stats),
'queue_state': queue_state or {},
'version': '1.0'
}
try:
with open(self.checkpoint_file, 'w', encoding='utf-8') as f:
json.dump(checkpoint_data, f, ensure_ascii=False, indent=2)
except Exception as e:
logging.error(f"Failed to save checkpoint: {e}")
def load_checkpoint(self) -> dict:
"""
加载检查点
"""
if not os.path.exists(self.checkpoint_file):
return {}
try:
with open(self.checkpoint_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logging.error(f"Failed to load checkpoint: {e}")
return {}
def has_valid_checkpoint(self, max_age_hours: int = 24) -> bool:
"""
检查是否有有效的检查点
"""
checkpoint = self.load_checkpoint()
if not checkpoint:
return False
age_hours = (time.time() - checkpoint.get('timestamp', 0)) / 3600
return age_hours <= max_age_hours
class FaultTolerantSpider:
"""
容错爬虫基类
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.checkpoint_manager = CheckpointManager()
self.restart_count = 0
self.max_restarts = 5
def start_requests(self):
"""
开始请求 - 支持断点续传
"""
# 尝试从检查点恢复
checkpoint = self.checkpoint_manager.load_checkpoint()
if (checkpoint and
checkpoint.get('spider_name') == self.name and
self.checkpoint_manager.has_valid_checkpoint()):
# 从检查点恢复
queue_state = checkpoint.get('queue_state', {})
logging.info(f"Resuming from checkpoint: {checkpoint}")
# 这里可以根据具体需求实现从检查点恢复逻辑
for request in self._resume_from_checkpoint(queue_state):
yield request
else:
# 正常开始请求
for request in self._initial_requests():
yield request
def _resume_from_checkpoint(self, queue_state):
"""
从检查点恢复请求
"""
# 子类需要实现具体的恢复逻辑
pass
def _initial_requests(self):
"""
初始请求
"""
# 子类需要实现具体的初始请求逻辑
pass
def closed(self, reason):
"""
爬虫关闭时保存检查点
"""
if hasattr(self, 'crawler'):
stats = dict(self.crawler.stats.get_stats())
queue_state = self._get_queue_state()
self.checkpoint_manager.save_checkpoint(self.name, stats, queue_state)
def _get_queue_state(self) -> dict:
"""
获取队列状态
"""
# 这里需要根据具体调度器实现
if hasattr(self.crawler.engine, 'slot') and self.crawler.engine.slot:
scheduler = self.crawler.engine.slot.scheduler
# 获取待处理请求等状态信息
return {
'pending_requests': getattr(scheduler, 'has_pending_requests', lambda: 0)()
}
return {}
## 性能调优实战 \{#性能调优实战}
在实际的大规模爬虫项目中,我们需要综合运用各种优化技术来达到最佳性能。
### 性能基准测试
```python
# performance_benchmark.py - 性能基准测试
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from scrapy.crawler import CrawlerRunner
from scrapy.utils.project import get_project_settings
class PerformanceBenchmark:
"""
性能基准测试工具
"""
def __init__(self):
self.results = []
self.test_results = {}
def benchmark_single_spider(self, spider_class, urls: list, duration: int = 60):
"""
测试单个爬虫性能
Args:
spider_class: 爬虫类
urls: 测试URL列表
duration: 测试持续时间(秒)
"""
start_time = time.time()
result = {
'spider': spider_class.__name__,
'urls_count': len(urls),
'duration': duration,
'requests_sent': 0,
'responses_received': 0,
'items_scraped': 0,
'error_count': 0,
'avg_response_time': 0,
'requests_per_second': 0,
'items_per_second': 0
}
# 这里需要实现具体的基准测试逻辑
# 通常需要创建一个特殊的测试环境
end_time = time.time()
actual_duration = end_time - start_time
result['actual_duration'] = actual_duration
result['requests_per_second'] = result['requests_sent'] / actual_duration if actual_duration > 0 else 0
result['items_per_second'] = result['items_scraped'] / actual_duration if actual_duration > 0 else 0
self.results.append(result)
return result
def benchmark_concurrent_spiders(self, spider_classes: list, url_sets: list):
"""
测试并发爬虫性能
Args:
spider_classes: 爬虫类列表
url_sets: URL集合列表,每个爬虫对应一组URL
"""
results = []
with ThreadPoolExecutor(max_workers=len(spider_classes)) as executor:
futures = []
for spider_class, urls in zip(spider_classes, url_sets):
future = executor.submit(self.benchmark_single_spider, spider_class, urls)
futures.append(future)
for future in futures:
result = future.result()
results.append(result)
return results
def compare_configurations(self, configs: list, spider_class, test_urls: list):
"""
比较不同配置的性能
Args:
configs: 配置列表
spider_class: 测试爬虫类
test_urls: 测试URL列表
"""
comparison_results = []
for i, config in enumerate(configs):
print(f"Testing configuration {i+1}/{len(configs)}")
# 应用配置并测试
result = self.benchmark_single_spider(spider_class, test_urls)
result['configuration'] = f"config_{i+1}"
result['settings'] = config
comparison_results.append(result)
# 分析比较结果
self.analyze_comparison_results(comparison_results)
return comparison_results
def analyze_comparison_results(self, results: list):
"""
分析比较结果
"""
if not results:
return
print("\n=== 性能比较分析 ===")
# 按请求速率排序
sorted_by_rps = sorted(results, key=lambda x: x['requests_per_second'], reverse=True)
print("\n按请求速率排序:")
for i, result in enumerate(sorted_by_rps[:3]): # 显示前三名
print(f"{i+1}. {result['configuration']}: {result['requests_per_second']:.2f} RPS")
# 按项目速率排序
sorted_by_ips = sorted(results, key=lambda x: x['items_per_second'], reverse=True)
print("\n按项目速率排序:")
for i, result in enumerate(sorted_by_ips[:3]): # 显示前三名
print(f"{i+1}. {result['configuration']}: {result['items_per_second']:.2f} IPS")
# 按错误率排序
sorted_by_error_rate = sorted(results,
key=lambda x: x['error_count'] / max(x['responses_received'], 1),
reverse=False)
print("\n按错误率排序(越低越好):")
for i, result in enumerate(sorted_by_error_rate[:3]): # 显示前三名
error_rate = (result['error_count'] / max(result['responses_received'], 1)) * 100
print(f"{i+1}. {result['configuration']}: {error_rate:.2f}% errors")
class OptimizationRecommendationEngine:
"""
优化建议引擎
"""
def __init__(self):
self.knowledge_base = self._build_knowledge_base()
def _build_knowledge_base(self) -> dict:
"""
构建知识库
"""
return {
'high_memory_usage': {
'symptoms': ['memory_usage > 80%', 'frequent_gc', 'slow_processing'],
'causes': ['large_response_handling', 'item_accumulation', 'poor_caching'],
'solutions': [
'enable_response_compression',
'implement_streaming_processing',
'reduce_concurrent_items',
'use_disk_queues'
]
},
'low_throughput': {
'symptoms': ['rps < threshold', 'idle_time_high', 'bandwidth_underutilized'],
'causes': ['excessive_delays', 'connection_pool_limitations', 'rate_limiting'],
'solutions': [
'increase_concurrent_requests',
'optimize_network_settings',
'implement_connection_pooling',
'adjust_autothrottle_settings'
]
},
'high_error_rate': {
'symptoms': ['error_rate > 5%', 'frequent_timeouts', 'blocked_by_antibot'],
'causes': ['aggressive_settings', 'insufficient_delay', 'missing_headers'],
'solutions': [
'increase_download_delay',
'rotate_user_agents',
'implement_proxy_rotation',
'add_retry_middleware'
]
}
}
def analyze_performance_data(self, stats: dict) -> list:
"""
分析性能数据并给出建议
Args:
stats: 性能统计数据
Returns:
优化建议列表
"""
recommendations = []
# 检查内存使用
memory_percent = stats.get('monitor/memory_percent', 0)
if memory_percent > 80:
recommendations.append({
'issue': 'high_memory_usage',
'severity': 'HIGH',
'description': f'High memory usage detected: {memory_percent:.1f}%',
'suggestions': self.knowledge_base['high_memory_usage']['solutions']
})
# 检查吞吐量
rps = stats.get('queue_monitor/rate/enqueued_per_minute', 0) / 60
if rps < 10: # 假设阈值为10 RPS
recommendations.append({
'issue': 'low_throughput',
'severity': 'MEDIUM',
'description': f'Low throughput detected: {rps:.2f} RPS',
'suggestions': self.knowledge_base['low_throughput']['solutions']
})
# 检查错误率
total_requests = stats.get('scheduler/enqueued', 1)
error_count = stats.get('downloader/exception_count', 0)
error_rate = error_count / total_requests
if error_rate > 0.05: # 5%错误率阈值
recommendations.append({
'issue': 'high_error_rate',
'severity': 'HIGH',
'description': f'High error rate detected: {error_rate:.2%}',
'suggestions': self.knowledge_base['high_error_rate']['solutions']
})
return recommendations
def generate_optimization_report(self, spider_name: str, stats: dict) -> str:
"""
生成优化报告
"""
recommendations = self.analyze_performance_data(stats)
report = f"""
# 性能优化报告 - {spider_name}
## 概述
- 分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- 总请求数: {stats.get('scheduler/enqueued', 0)}
- 成功响应: {stats.get('response_received_count', 0)}
- 错误数量: {stats.get('downloader/exception_count', 0)}
## 性能指标
- 请求速率: {stats.get('queue_monitor/rate/enqueued_per_minute', 0) / 60:.2f} RPS
- 内存使用: {stats.get('monitor/memory_percent', 0):.1f}%
- CPU使用: {stats.get('monitor/cpu_percent', 0):.1f}%
- 平均响应时间: {stats.get('download/response_bytes', 0) / max(stats.get('response_received_count', 1), 1):.2f}s
## 优化建议
"""
for rec in recommendations:
report += f"\n### {rec['issue']} ({rec['severity']})\n"
report += f"- **问题**: {rec['description']}\n"
report += "- **建议方案**:\n"
for suggestion in rec['suggestions']:
report += f" - {suggestion}\n"
if not recommendations:
report += "\n系统运行良好,暂无优化建议。\n"
return report
## 常见问题与解决方案 \{#常见问题与解决方案}
### 问题1: 内存泄漏
**现象**: 爬虫运行一段时间后内存使用持续增长
**解决方案**:
```python
# 实现内存监控和清理
class MemoryLeakDetector:
"""
内存泄漏检测器
"""
def __init__(self, crawler):
self.crawler = crawler
self.memory_snapshots = deque(maxlen=100)
self.object_trackers = {}
def take_snapshot(self):
"""
获取内存快照
"""
import tracemalloc
snapshot = tracemalloc.take_snapshot()
self.memory_snapshots.append(snapshot)
# 分析内存使用情况
if len(self.memory_snapshots) >= 2:
top_stats = snapshot.compare_to(self.memory_snapshots[0], 'lineno')
for stat in top_stats[:5]:
print(f"Memory growth: {stat}")
def detect_leaks(self) -> list:
"""
检测内存泄漏
"""
if len(self.memory_snapshots) < 2:
return []
# 比较最新快照与初始快照
initial = self.memory_snapshots[0]
current = self.memory_snapshots[-1]
# 计算内存增长
initial_size = sum(stat.size for stat in initial.statistics('lineno'))
current_size = sum(stat.size for stat in current.statistics('lineno'))
growth_rate = (current_size - initial_size) / initial_size if initial_size > 0 else 0
if growth_rate > 0.5: # 50%增长认为是潜在泄漏
return [f"Potential memory leak detected: {growth_rate:.2%} growth"]
return []
# 在设置中启用tracemalloc
import tracemalloc
tracemalloc.start()#问题2: 请求速率不稳定
现象: 请求速率波动很大,有时很快有时很慢 解决方案:
# 实现速率平滑控制器
class RateSmoothController:
"""
速率平滑控制器
"""
def __init__(self, target_rate: float, smoothing_factor: float = 0.1):
self.target_rate = target_rate
self.smoothing_factor = smoothing_factor
self.current_rate = target_rate
self.rate_history = deque(maxlen=50)
self.time_window = 10 # 10秒窗口
def adjust_rate(self, actual_rate: float) -> float:
"""
调整目标速率
"""
# 计算平滑后的速率
if self.rate_history:
smoothed_rate = self.smoothing_factor * actual_rate + \
(1 - self.smoothing_factor) * self.rate_history[-1]
else:
smoothed_rate = actual_rate
self.rate_history.append(smoothed_rate)
# 限制调整幅度
adjustment = min(abs(smoothed_rate - self.current_rate),
self.current_rate * 0.2) # 最大调整20%
if smoothed_rate > self.current_rate:
self.current_rate = min(self.current_rate + adjustment, self.target_rate * 1.5)
else:
self.current_rate = max(self.current_rate - adjustment, self.target_rate * 0.5)
return self.current_rate#问题3: 频繁被封禁
现象: 爬虫频繁遇到403、429等反爬状态码 解决方案:
# 实现智能反爬对策
class AntiBanStrategy:
"""
反反爬策略
"""
def __init__(self):
self.session_rotator = SessionRotator()
self.header_rotator = HeaderRotator()
self.delay_adaptor = DelayAdaptor()
def adapt_to_detection(self, response, request):
"""
根据响应自适应调整策略
"""
status = response.status
if status == 403: # Forbidden
# 立即更换session和headers
self.session_rotator.rotate_session()
self.header_rotator.rotate_headers()
# 增加延迟
self.delay_adaptor.increase_delay(factor=2.0)
elif status == 429: # Too Many Requests
# 增加延迟,可能需要等待更长时间
retry_after = response.headers.get('Retry-After')
if retry_after:
delay = int(retry_after)
time.sleep(delay)
else:
self.delay_adaptor.increase_delay(factor=1.5)
elif status == 404: # 可能IP被暂时封禁
self.session_rotator.rotate_session(immediate=True)
# 更新策略
self.update_strategy(response, request)
def update_strategy(self, response, request):
"""
更新反爬策略
"""
# 基于响应内容、状态码等更新策略
content = response.text.lower()
if 'captcha' in content or 'verify' in content:
# 需要处理验证码,暂停并人工介入
pass
class SessionRotator:
"""
Session轮换器
"""
def __init__(self):
self.sessions = []
self.current_session_index = 0
self.create_sessions()
def create_sessions(self):
"""
创建多个session
"""
import requests
for i in range(10): # 创建10个session
session = requests.Session()
# 配置session
session.headers.update({
'User-Agent': self._random_user_agent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
})
self.sessions.append(session)
def rotate_session(self, immediate: bool = False):
"""
轮换session
"""
if immediate:
# 立即轮换
self.current_session_index = (self.current_session_index + 1) % len(self.sessions)
else:
# 随机轮换
import random
self.current_session_index = random.randint(0, len(self.sessions) - 1)
def get_current_session(self):
"""
获取当前session
"""
return self.sessions[self.current_session_index]
def _random_user_agent(self):
"""
获取随机User-Agent
"""
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
]
import random
return random.choice(user_agents)#最佳实践总结
#配置优化最佳实践
-
内存配置:
# 生产环境推荐配置 MEMUSAGE_ENABLED = True MEMUSAGE_LIMIT_MB = 4096 # 4GB限制 MEMUSAGE_WARNING_MB = 3072 # 3GB警告 # 队列配置 CONCURRENT_REQUESTS = 32 CONCURRENT_REQUESTS_PER_DOMAIN = 8 DOWNLOAD_DELAY = 1 RANDOMIZE_DOWNLOAD_DELAY = 0.5 # 调度器配置 SCHEDULER_PRIORITY_QUEUE = 'scrapy.pqueues.DownloaderAwarePriorityQueue' SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleFifoDiskQueue' SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.FifoMemoryQueue' -
网络配置:
# DNS优化 DNSCACHE_ENABLED = True DNSCACHE_SIZE = 10000 # 下载超时 DOWNLOAD_TIMEOUT = 60 DOWNLOAD_MAXSIZE = 50 * 1024 * 1024 # 50MB DOWNLOAD_WARNSIZE = 10 * 1024 * 1024 # 10MB警告 # 重试配置 RETRY_TIMES = 3 RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 429, 403]
#监控最佳实践
-
关键指标监控:
- 请求成功率
- 响应时间分布
- 内存使用情况
- CPU使用率
- 错误率趋势
-
告警设置:
- 内存使用超过85%告警
- 错误率超过5%告警
- 响应时间超过10秒告警
- 爬虫停止运行告警
#容错最佳实践
-
优雅降级:
- 当某个域名无法访问时,自动降低对该域名的请求频率
- 当内存使用过高时,自动减少并发数
- 当错误率过高时,启用更保守的请求策略
-
恢复机制:
- 断点续传支持
- 自动重启失败的爬虫
- 状态持久化存储
💡 核心要点: 大规模爬虫优化是一个系统工程,需要综合考虑内存、网络、并发、数据处理等多个方面。通过合理的配置、完善的监控和健壮的容错机制,可以构建高效稳定的爬虫系统。
#SEO优化策略
- 关键词优化: 在标题、内容中合理布局"大规模爬虫", "性能优化", "内存管理", "网络优化", "并发控制", "容错机制"等关键词
- 内容结构: 使用清晰的标题层级(H1-H6),便于搜索引擎理解内容结构
- 内部链接: 建立与其他相关教程的内部链接,提升页面权重
- 元数据优化: 在页面头部包含描述性的标题、描述和标签
🔗 相关教程推荐
- 自动限速AutoThrottle - 智能限速机制
- 数据去重与增量更新 - 数据处理优化
- 分布式去重与调度 - 分布式优化
- Scrapyd与ScrapydWeb - 系统监控
🏷️ 标签云: 大规模爬虫 性能优化 内存管理 网络优化 并发控制 容错机制 监控系统 断点续传 爬虫调优 系统稳定性

