大规模爬虫优化 - 内存管理、网络优化与性能调优详解

📂 所属阶段:第五阶段 — 战力升级(分布式与进阶篇)
🔗 相关章节:自动限速AutoThrottle · 数据去重与增量更新 · 分布式去重与调度

今天道满给大家拆解真正企业级大规模爬虫的核心优化逻辑——别只靠堆配置堆CPU,「合理控制资源、智能容错、高效数据流转」才是王道!全文精简到3000字以内,全是可复制的代码和避坑点👇


1. 避坑前先定「核心优化目标」

很多人一开始就改配置,最后要么被封要么内存炸。先抓这4个核心目标:

  • 稳定性:7x24无人工干预,出错自动恢复
  • 效率性:单位时间爬取最多数据,资源利用率不浪费
  • 可扩展性:后续加机器、加节点不用大改代码
  • 可观测性:一眼知道哪里慢、哪里崩、哪里被封

2. 最常崩的环节:内存管理优化

堆千万URL、存几十GB缓存?内存必炸!道满给3个立竿见影的方案👇

2.1 Scrapy原生内存「三板斧」配置

# scrapy_settings.py
import psutil

# 【避坑】先取本机可用内存的70%,别写死4GB!
available_mem = psutil.virtual_memory().available // 1024 // 1024
MEMUSAGE_ENABLED = True
MEMUSAGE_LIMIT_MB = min(int(available_mem * 0.7), 8192)  # 本机70%或8GB封顶
MEMUSAGE_WARNING_MB = int(MEMUSAGE_LIMIT_MB * 0.75)       # 75%时发警告
CLOSESPIDER_MEMUSAGE = MEMUSAGE_LIMIT_MB                   # 超限自动关闭

# 【核心】大文件不要全部放内存!
DOWNLOAD_MAXSIZE = 50 * 1024 * 1024  # 50MB(按需调)
DOWNLOAD_WARNSIZE = 10 * 1024 * 1024
RESPONSE_ENCODING = 'utf-8'           # 避免gbk转码内存泄漏

# 【优化】用磁盘队列存大URL池
SCHEDULER_PRIORITY_QUEUE = 'scrapy.pqueues.ScrapyPriorityQueue'
SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleFifoDiskQueue'
SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.FifoMemoryQueue'

2.2 中间件层面「实时清理+弱引用」

# memory_optimization_middleware.py
import gc
import weakref
from scrapy import signals
from itemadapter import ItemAdapter

class MemoryOptimizationMiddleware:
    def __init__(self):
        self.item_weakref = weakref.WeakSet()  # 不用的Item自动被GC回收
        self.response_tmp = {}                   # 临时缓存,只留最近1000条
        self.tmp_limit = 1000

    @classmethod
    def from_crawler(cls, crawler):
        ext = cls()
        crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
        return ext

    def process_spider_input(self, response, spider):
        # 清理临时缓存
        if len(self.response_tmp) >= self.tmp_limit:
            del self.response_tmp[list(self.response_tmp.keys())[0]]
        return None

    def process_spider_output(self, response, result, spider):
        for item_or_req in result:
            # 如果是Item,加弱引用,处理完立即丢弃
            if hasattr(item_or_req, 'fields'):
                adapter = ItemAdapter(item_or_req)
                # 截断超长文本(按需调)
                for k, v in adapter.items():
                    if isinstance(v, str) and len(v) > 10000:
                        adapter[k] = v[:10000] + "..."
                self.item_weakref.add(item_or_req)
            yield item_or_req

    def spider_closed(self, spider):
        # 最后强制GC一次
        gc.collect()
        spider.logger.info("Memory optimized, GC collected")

3. 爬得慢的核心:网络优化与自适应限速

别一开始就开100并发!道满教你「智能限速+连接复用」,爬得快又不被封👇

3.1 网络配置「基础版」

# scrapy_settings.py
# 【复用连接】HTTP/1.1 keep-alive+连接池,减少握手开销
CONCURRENT_REQUESTS = 32                # 总并发
CONCURRENT_REQUESTS_PER_DOMAIN = 8      # 单个域名(别超目标网站阈值!)
CONCURRENT_REQUESTS_PER_IP = 4           # 单个IP(有代理池可加)

# 【DNS优化】缓存10000条,减少DNS请求
DNSCACHE_ENABLED = True
DNSCACHE_SIZE = 10000
DNS_TIMEOUT = 30

# 【重试优化】只重试可恢复的状态码
RETRY_TIMES = 3
RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 429]
RETRY_PRIORITY_ADJUST = -1  # 重试请求优先级降低

3.2 自适应限速「进阶版」

比Scrapy原生AutoThrottle多了成功率和响应时间趋势判断

# adaptive_throttle.py
import statistics
from collections import defaultdict, deque
from scrapy.downloadermiddlewares.throttle import AutoThrottle

class SmartAutoThrottle(AutoThrottle):
    def __init__(self, crawler):
        super().__init__(crawler)
        self.stats = defaultdict(lambda: deque(maxlen=50))  # 每个域名存50条记录

    def _adjust_delay(self, slot, latency, response=None):
        domain = slot.key
        self.stats[domain].append({
            'latency': latency,
            'success': 200 <= response.status < 400 if response else True
        })

        # 至少有10条记录才调整
        if len(self.stats[domain]) < 10:
            return

        # 1. 基于成功率调整:<80%加延迟,>95%减延迟
        recent = list(self.stats[domain])[-10:]
        success_rate = sum(1 for r in recent if r['success']) / 10
        if success_rate < 0.8:
            slot.delay = min(slot.delay * 1.3, 60)
        elif success_rate > 0.95:
            slot.delay = max(slot.delay * 0.9, 0.5)

        # 2. 基于响应时间趋势调整:变慢加延迟,变快减延迟
        avg_latency = sum(r['latency'] for r in recent) / 10
        overall_latency = statistics.mean(r['latency'] for r in self.stats[domain])
        if avg_latency > overall_latency * 1.2:
            slot.delay = min(slot.delay * 1.2, 60)
        elif avg_latency < overall_latency * 0.8:
            slot.delay = max(slot.delay * 0.9, 0.5)

# 记得在settings.py里替换原生的AutoThrottle!
DOWNLOADER_MIDDLEWARES = {
    'scrapy.downloadermiddlewares.throttle.AutoThrottle': None,
    'your_project.adaptive_throttle.SmartAutoThrottle': 800,  # 原生AutoThrottle是800
}

4. 最后1公里:可观测性+容错

爬了几万条崩了,不知道崩在哪里?加个简单的健康检查+邮件告警就行👇

4.1 健康检查与告警(基于Prometheus+邮件)

# health_check.py
import time
import logging
from scrapy import signals
from pydispatch import dispatcher
from prometheus_client import Gauge, start_http_server

logger = logging.getLogger(__name__)

# Prometheus指标(本地运行8000端口访问)
MEM_GAUGE = Gauge('scrapy_mem_rss_mb', 'Memory RSS usage', ['spider'])
CPU_GAUGE = Gauge('scrapy_cpu_percent', 'CPU usage', ['spider'])
ERR_GAUGE = Gauge('scrapy_error_rate', 'Error rate (last 100 requests)', ['spider'])

class HealthCheckExtension:
    def __init__(self, crawler):
        self.crawler = crawler
        self.stats = crawler.stats
        self.last_errs = deque(maxlen=100)  # 存最近100次请求的成功/失败
        self.check_interval = 60  # 每60秒检查一次
        self.last_check = time.time()
        
        # 启动Prometheus
        start_http_server(8000)
        dispatcher.connect(self.spider_opened, signals.spider_opened)
        dispatcher.connect(self.response_received, signals.response_received)
        dispatcher.connect(self.request_dropped, signals.request_dropped)

    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)

    def spider_opened(self, spider):
        self.spider_name = spider.name

    def response_received(self, response, request, spider):
        self.last_errs.append(200 <= response.status < 400)
        self._check_and_alert()

    def request_dropped(self, request, response, spider):
        self.last_errs.append(False)
        self._check_and_alert()

    def _check_and_alert(self):
        current = time.time()
        if current - self.last_check < self.check_interval:
            return
        self.last_check = current

        # 更新Prometheus指标
        import psutil
        p = psutil.Process()
        MEM_GAUGE.labels(spider=self.spider_name).set(p.memory_info().rss // 1024 // 1024)
        CPU_GAUGE.labels(spider=self.spider_name).set(p.cpu_percent(interval=0.1))
        
        # 计算错误率
        if len(self.last_errs) >= 50:
            err_rate = 1 - sum(self.last_errs) / len(self.last_errs)
            ERR_GAUGE.labels(spider=self.spider_name).set(err_rate)
            
            # 错误率>10%发日志告警(可以用smtplib改成邮件)
            if err_rate > 0.1:
                logger.error(f"ALERT: Error rate too high! {err_rate:.2%}")

5. 道满的「最佳实践总结」

生产环境配置模板

# scrapy_settings_prod.py
# 省略具体配置,参考前面的内存+网络+限速配置

避坑3条铁律

  1. 别一开始就堆配置:先开小并发(4总并发,2单域名),观察2小时没被封再慢慢加
  2. 别信任目标网站的表面规则:比如robots.txt说1秒1次,但实际2秒1次才不被封,用SmartAutoThrottle自动调
  3. 别忘记持久化URL池和去重集:哪怕机器断电重启,也能从上次的地方继续爬

🔗 相关教程推荐

🏷️ 标签云: 大规模爬虫 性能优化 内存管理 网络优化 并发控制 Scrapy调优