Scrapy数据去重与增量更新完全指南

📂 所属阶段:第二阶段 — 数据流转(数据处理篇)
🔗 相关章节:Pipeline管道实战 · 数据清洗与校验

目录


快速入门场景

做爬虫时最常遇到的两个痛点:

  1. 重复爬取同一页面/内容,浪费带宽、时间、服务器资源
  2. 全量重爬,无法第一时间捕获新增或变更的数据

本文会覆盖从简单单机去重带智能调度的增量爬取的完整方案,所有代码均可直接在项目中复用。


Redis指纹去重实现

基础可复用Pipeline

这是最通用的Redis指纹去重方案,支持自定义指纹字段、过期时间:

import hashlib
import redis
from scrapy.exceptions import DropItem
from itemadapter import ItemAdapter

class RedisFingerprintPipeline:
    """基于Redis SHA256指纹的通用去重Pipeline"""
    
    def __init__(self, redis_conf):
        self.redis_conf = redis_conf
        self.redis_conn = None
        self.expire_days = redis_conf.get("expire_days", 7)  # 默认指纹保留7天
        self.dedup_fields = redis_conf.get("dedup_fields", ["url"])  # 默认按URL去重

    @classmethod
    def from_crawler(cls, crawler):
        """从Scrapy配置中读取Redis参数"""
        redis_conf = {
            "host": crawler.settings.get("REDIS_HOST", "localhost"),
            "port": crawler.settings.getint("REDIS_PORT", 6379),
            "db": crawler.settings.getint("REDIS_DB", 0),
            "password": crawler.settings.get("REDIS_PASSWORD"),
            "expire_days": crawler.settings.getint("DUPLICATE_EXPIRE_DAYS", 7),
            "dedup_fields": crawler.settings.getlist("DUPLICATE_DEDUP_FIELDS", ["url"])
        }
        return cls(redis_conf)

    def open_spider(self, spider):
        """爬虫启动时建立Redis连接"""
        self.redis_conn = redis.Redis(
            host=self.redis_conf["host"],
            port=self.redis_conf["port"],
            db=self.redis_conf["db"],
            password=self.redis_conf["password"],
            decode_responses=False  # 哈希值存字节更节省空间
        )

    def process_item(self, item, spider):
        """核心去重逻辑"""
        # 1. 生成数据指纹
        fp = self._gen_fingerprint(item)
        # 2. 检查Redis中是否已存在
        if self.redis_conn.exists(fp):
            spider.logger.warning(f"检测到重复数据,指纹前缀: {fp.decode()[:16]}")
            raise DropItem(f"Duplicate item: {fp.decode()[:16]}")
        # 3. 存入新指纹并设置过期
        self.redis_conn.setex(fp, 86400 * self.expire_days, b"1")
        return item

    def close_spider(self, spider):
        """爬虫关闭时断开连接"""
        if self.redis_conn:
            self.redis_conn.close()

    def _gen_fingerprint(self, item):
        """生成SHA256数据指纹"""
        adapter = ItemAdapter(item)
        # 提取配置的去重字段并排序(避免字段顺序不同导致指纹不同)
        parts = []
        for field in sorted(self.dedup_fields):
            if field in adapter:
                val = adapter[field]
                if val is not None:
                    parts.append(f"{field}={str(val)}")
        # 如果没有配置字段,用整个item的排序后的字典生成
        if not parts:
            item_sorted = str(sorted(adapter.asdict().items()))
            parts.append(item_sorted)
        # 拼接并加密
        return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest().encode("utf-8")

配置文件示例

settings.py 中添加以下配置:

# Pipeline启用
ITEM_PIPELINES = {
    "myproject.pipelines.RedisFingerprintPipeline": 300,  # 300代表优先级,数字越小越先执行
}

# Redis去重配置
REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PASSWORD = None
DUPLICATE_EXPIRE_DAYS = 7  # 7天后指纹自动清理,防止Redis内存溢出
DUPLICATE_DEDUP_FIELDS = ["url", "title"]  # 按URL+标题组合去重,提高准确性

URL标准化与请求级去重

URL标准化Pipeline

同一页面可能有多个URL形式(例如有无www、有无末尾斜杠、追踪参数不同),需要先标准化再去重:

from urllib.parse import urlparse, parse_qs, urlencode

class URLNormalizationPipeline:
    """URL标准化前置Pipeline,放在去重Pipeline之前"""
    
    def process_item(self, item, spider):
        if "url" not in item:
            return item
        item["url"] = self._normalize(item["url"])
        return item

    def _normalize(self, url):
        """标准化URL"""
        parsed = urlparse(url)
        # 1. 协议/域名转小写
        scheme = parsed.scheme.lower()
        netloc = parsed.netloc.lower()
        # 2. 移除末尾斜杠(首页除外)
        path = parsed.path.rstrip("/") if parsed.path != "/" else "/"
        # 3. 按字母排序查询参数
        qs = parse_qs(parsed.query, keep_blank_values=True)
        # 4. 移除常见追踪参数
        tracking = ["utm_*", "gclid", "fbclid", "ref", "from", "via"]
        filtered_qs = {k: v for k, v in qs.items() if not any(k.startswith(t.replace("*", "")) for t in tracking)}
        sorted_qs = urlencode(sorted(filtered_qs.items()), doseq=True)
        # 5. 重建URL
        normalized = f"{scheme}://{netloc}{path}"
        if sorted_qs:
            normalized += f"?{sorted_qs}"
        return normalized

注意优先级调整:在 settings.py 中把标准化Pipeline的优先级设为 200,比去重Pipeline的300高:

ITEM_PIPELINES = {
    "myproject.pipelines.URLNormalizationPipeline": 200,
    "myproject.pipelines.RedisFingerprintPipeline": 300,
}

本地/Redis布隆过滤器优化

布隆过滤器是一种高效的概率型数据结构,用于快速判断元素是否存在,适合超大规模数据去重(百万/千万级),缺点是有极低的误判率(不存在的元素可能被误判为存在,但存在的绝不会误判为不存在)。

本地布隆过滤器(适合单机千万级以下)

依赖 mmh3bitarray 库:

pip install mmh3 bitarray
import mmh3
from bitarray import bitarray
import time
from scrapy.exceptions import DropItem

class LocalBloomFilterPipeline:
    """本地内存布隆过滤器Pipeline"""
    
    def __init__(self, capacity=10_000_000, error_rate=0.001):
        self.capacity = capacity  # 预期存储的指纹数量
        self.error_rate = error_rate  # 预期误判率(0.1%)
        self._init_bloom()

    def _init_bloom(self):
        """计算位数组大小和哈希函数数量并初始化"""
        # 简化公式(无数学公式版说明:根据容量和误判率算出合适的内存大小和哈希次数)
        m = int(-self.capacity * (0.0014426950408889634 if self.error_rate == 0.001 else 0.007213475204444817))  # 预填0.1%和1%的常用系数
        k = int(0.6931471805599453 * m / self.capacity)  # ln2≈0.693的简化系数
        self.m = m
        self.k = k
        self.bitarray = bitarray(m)
        self.bitarray.setall(0)

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            capacity=crawler.settings.getint("BLOOM_CAPACITY", 10_000_000),
            error_rate=crawler.settings.getfloat("BLOOM_ERROR_RATE", 0.001)
        )

    def process_item(self, item, spider):
        fp = item.get("url", str(sorted(ItemAdapter(item).asdict().items())))
        # 快速判断是否可能存在
        if all(self.bitarray[mmh3.hash(fp, i) % self.m] for i in range(self.k)):
            spider.logger.warning(f"布隆过滤器可能检测到重复: {fp[:50]}")
            raise DropItem(f"Possible duplicate: {fp[:50]}")
        # 添加到布隆过滤器
        for i in range(self.k):
            self.bitarray[mmh3.hash(fp, i) % self.m] = 1
        return item

时间戳+智能增量抓取

增量抓取只抓取上次爬取后新增或变更的数据,大幅提升效率。这里介绍最通用的时间戳方案,以及简单的智能频率调整

时间戳增量Spider

import scrapy
import redis
from datetime import datetime
from myproject.items import MyItem

class IncrementalSpider(scrapy.Spider):
    name = "incremental_spider"
    allowed_domains = ["example.com"]
    start_urls = ["https://example.com/news"]

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # 建立Redis连接,存上次爬取时间
        self.redis_conn = redis.Redis(
            host="127.0.0.1",
            port=6379,
            db=0,
            decode_responses=True
        )
        self.last_crawl_key = f"last_crawl:{self.name}"

    def parse(self, response):
        """解析列表页,只抓新增内容"""
        # 1. 获取上次爬取时间(如果是首次爬取,设为30天前)
        last_crawl_ts = float(self.redis_conn.get(self.last_crawl_key) or (datetime.now().timestamp() - 86400*30))
        # 2. 解析列表中的每一条新闻
        for news in response.css("div.news-item"):
            # 提取发布时间(根据目标网站格式调整)
            pub_time_str = news.css(".pub-time::text").get().strip()
            pub_time_ts = datetime.strptime(pub_time_str, "%Y-%m-%d %H:%M").timestamp()
            # 只抓发布时间晚于上次爬取的
            if pub_time_ts > last_crawl_ts:
                detail_url = news.css("a.title::attr(href)").get()
                yield response.follow(detail_url, callback=self.parse_detail)
        # 3. 更新本次爬取时间
        self.redis_conn.set(self.last_crawl_key, datetime.now().timestamp())

    def parse_detail(self, response):
        """解析详情页"""
        item = MyItem()
        item["url"] = response.url
        item["title"] = response.css("h1::text").get().strip()
        item["content"] = "".join(response.css("div.content p::text").getall()).strip()
        yield item

核心性能优化

内存LRU缓存 + Redis批量管道

这是最立竿见影的性能优化方案:

  1. 内存LRU缓存:快速过滤最近爬过的热门数据,减少Redis访问
  2. Redis批量管道:一次性发送多个Redis命令,减少网络往返
# 只列出优化后的核心部分,完整代码可参考基础Pipeline
def __init__(self, redis_conf):
    # ... 原有代码
    self.lru_cache = dict()
    self.lru_max = 10000  # LRU缓存最多存10000个指纹
    self.pending_fps = []  # 待批量存入的指纹
    self.batch_size = 100  # 每100个指纹批量存一次

def process_item(self, item, spider):
    fp = self._gen_fingerprint(item)
    # 1. 先查LRU缓存
    if fp in self.lru_cache:
        spider.logger.debug(f"LRU缓存命中,跳过重复")
        raise DropItem("LRU duplicate")
    # 2. LRU未命中,查Redis
    if self.redis_conn.exists(fp):
        spider.logger.debug(f"Redis命中,跳过重复")
        raise DropItem("Redis duplicate")
    # 3. 加入LRU缓存和待批量存入队列
    self.lru_cache[fp] = time.time()
    # LRU淘汰最久未使用的
    if len(self.lru_cache) > self.lru_max:
        self.lru_cache.pop(next(iter(self.lru_cache)))
    self.pending_fps.append(fp)
    # 4. 达到批量大小或爬虫关闭时批量存入
    if len(self.pending_fps) >= self.batch_size:
        self._flush_pending(spider)
    return item

def _flush_pending(self, spider):
    if not self.pending_fps:
        return
    pipe = self.redis_conn.pipeline()
    expire = 86400 * self.expire_days
    for fp in self.pending_fps:
        pipe.setex(fp, expire, b"1")
    pipe.execute()
    spider.logger.debug(f"批量存入 {len(self.pending_fps)} 个指纹到Redis")
    self.pending_fps.clear()

def close_spider(self, spider):
    self._flush_pending(spider)
    # ... 原有代码

2个高频问题解决

问题1:长时间运行内存溢出

原因:LRU缓存或Redis指纹未及时清理 解决

  1. 调整LRU缓存大小
  2. 缩短Redis指纹过期时间
  3. 定期(如每周)手动清空Redis去重DB

问题2:Redis连接超时/断开

原因:网络波动或Redis负载过高 解决: 在基础Pipeline的 open_spider 中配置Redis的重试机制健康检查

from redis.backoff import ExponentialBackoff
from redis.retry import Retry

def open_spider(self, spider):
    self.redis_conn = redis.Redis(
        host=self.redis_conf["host"],
        port=self.redis_conf["port"],
        db=self.redis_conf["db"],
        password=self.redis_conf["password"],
        decode_responses=False,
        socket_connect_timeout=5,
        socket_timeout=5,
        retry_on_timeout=True,
        retry=Retry(ExponentialBackoff(cap=10), 3),  # 指数退避重试,最多3次
        health_check_interval=30  # 每30秒检查一次连接健康
    )

💡 核心总结

  1. 简单场景用 URL标准化 + SHA256 Redis指纹去重
  2. 超大规模场景用 本地布隆过滤器 + Redis批量管道
  3. 增量爬取优先用 时间戳方案,有条件再加智能频率调整
  4. 一定要做好 监控和容错处理,防止Redis故障导致爬虫停止