Complete Guide to Scrapy Data Deduplication and Incremental Updates

📂 Stage: Stage 2 - Data Flow (Data Processing) 🔗 Related chapters: Pipeline管道实战 · 数据清洗与校验

Table of contents


Quick Start Scenario

The two most common pain points encountered when doing crawlers are:

  1. Repeated crawling of the same page or content wastes bandwidth, time and server resources;
  2. Full re-crawl cannot capture new or changed data immediately.

This article will cover a complete solution from simple stand-alone deduplication to incremental crawling with intelligent scheduling. All codes can be directly reused to help you quickly implement a cost-effective crawler strategy.


Redis fingerprint deduplication implementation

Basic reusable Pipeline

This is the most versatile Redis fingerprint deduplication solution, supporting custom fingerprint fields and expiration times.

import hashlib
import redis
from scrapy.exceptions import DropItem
from itemadapter import ItemAdapter

class RedisFingerprintPipeline:
    """基于Redis SHA256指纹的通用去重Pipeline"""
    
    def __init__(self, redis_conf):
        self.redis_conf = redis_conf
        self.redis_conn = None
        self.expire_days = redis_conf.get("expire_days", 7)   # 指纹默认保留7天
        self.dedup_fields = redis_conf.get("dedup_fields", ["url"])  # 默认按URL去重

    @classmethod
    def from_crawler(cls, crawler):
        """从Scrapy项目配置中读取Redis连接参数和去重策略"""
        redis_conf = {
            "host": crawler.settings.get("REDIS_HOST", "localhost"),
            "port": crawler.settings.getint("REDIS_PORT", 6379),
            "db": crawler.settings.getint("REDIS_DB", 0),
            "password": crawler.settings.get("REDIS_PASSWORD"),
            "expire_days": crawler.settings.getint("DUPLICATE_EXPIRE_DAYS", 7),
            "dedup_fields": crawler.settings.getlist("DUPLICATE_DEDUP_FIELDS", ["url"])
        }
        return cls(redis_conf)

    def open_spider(self, spider):
        """爬虫启动时建立Redis连接"""
        self.redis_conn = redis.Redis(
            host=self.redis_conf["host"],
            port=self.redis_conf["port"],
            db=self.redis_conf["db"],
            password=self.redis_conf["password"],
            decode_responses=False   # 哈希值用字节存储更省空间
        )

    def process_item(self, item, spider):
        """核心去重逻辑:计算指纹 -> 检查存在 -> 存入或丢弃"""
        fp = self._gen_fingerprint(item)
        # 检查Redis中是否已有相同指纹
        if self.redis_conn.exists(fp):
            spider.logger.warning(f"检测到重复数据,指纹前缀: {fp.decode()[:16]}")
            raise DropItem(f"Duplicate item: {fp.decode()[:16]}")
        # 新指纹写入Redis并设置过期时间
        self.redis_conn.setex(fp, 86400 * self.expire_days, b"1")
        return item

    def close_spider(self, spider):
        """爬虫关闭时断开Redis连接"""
        if self.redis_conn:
            self.redis_conn.close()

    def _gen_fingerprint(self, item):
        """根据配置的去重字段生成SHA256指纹"""
        adapter = ItemAdapter(item)
        parts = []
        # 按字段名排序,避免因字段顺序不同导致指纹不同
        for field in sorted(self.dedup_fields):
            if field in adapter:
                val = adapter[field]
                if val is not None:
                    parts.append(f"{field}={str(val)}")
        # 若没有配置去重字段,就使用整个item的字典生成指纹
        if not parts:
            item_sorted = str(sorted(adapter.asdict().items()))
            parts.append(item_sorted)
        raw = "|".join(parts).encode("utf-8")
        return hashlib.sha256(raw).hexdigest().encode("utf-8")

Core idea:

  • Change the fields you care about in the Item (such asurltitle) combined into a fixed sequence string;
  • Calculate SHA256 hash value as "data fingerprint";
  • Using RedisSETEXThe command saves the fingerprint for a period of time and discards it directly when encountering the same fingerprint next time.

Configuration file example

existsettings.pyAdd the following configuration:

# Pipeline启用(数字表示优先级,越小越先执行)
ITEM_PIPELINES = {
    "myproject.pipelines.RedisFingerprintPipeline": 300,
}

# Redis连接与去重配置
REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PASSWORD = None
DUPLICATE_EXPIRE_DAYS = 7          # 7天后指纹自动清理,防止Redis内存持续增长
DUPLICATE_DEDUP_FIELDS = ["url", "title"]  # 按url+标题组合去重,提高准确性

URL standardization and request-level deduplication

The same page may appear as multiple different URLs:

  • Some beltswww, some don’t;
  • Whether there is a slash at the end;
  • With various tracking parameters (utm_sourcegclidwait).

If deduplication is performed directly, these URLs will be considered to be different pages, resulting in repeated crawling. Therefore, performing URL standardization before fingerprint calculation can greatly improve the deduplication effect.

URL normalization Pipeline

from urllib.parse import urlparse, parse_qs, urlencode

class URLNormalizationPipeline:
    """URL标准化Pipeline,应在去重Pipeline之前执行"""
    
    def process_item(self, item, spider):
        if "url" not in item:
            return item
        item["url"] = self._normalize(item["url"])
        return item

    def _normalize(self, url):
        """标准化URL的主要步骤"""
        parsed = urlparse(url)
        # 1. 协议、域名统一小写
        scheme = parsed.scheme.lower()
        netloc = parsed.netloc.lower()
        # 2. 路径:移除末尾斜杠(但保留根路径"/")
        path = parsed.path.rstrip("/") if parsed.path != "/" else "/"
        # 3. 查询参数按字母排序
        qs = parse_qs(parsed.query, keep_blank_values=True)
        # 4. 移除常见的追踪参数(这里用前缀匹配)
        tracking_prefixes = ["utm_", "gclid", "fbclid", "ref", "from", "via"]
        filtered_qs = {
            k: v for k, v in qs.items()
            if not any(k.startswith(prefix) for prefix in tracking_prefixes)
        }
        sorted_qs = urlencode(sorted(filtered_qs.items()), doseq=True)
        # 5. 重建标准化URL
        normalized = f"{scheme}://{netloc}{path}"
        if sorted_qs:
            normalized += f"?{sorted_qs}"
        return normalized

Priority Adjustment: Insettings.pyMake sure that the normalized Pipeline is executed before the Duplicate Pipeline, for example:

ITEM_PIPELINES = {
    "myproject.pipelines.URLNormalizationPipeline": 200,  # 先标准化
    "myproject.pipelines.RedisFingerprintPipeline": 300,  # 后去重
}

Local/Redis Bloom filter optimization

The Bloom filter is an efficient probabilistic data structure that uses very little memory to quickly determine whether an element may exist in the set.

  • Advantages: The memory overhead is minimal when deduplicating tens of millions of data, and the query speed is extremely fast;
  • Disadvantages: There is an extremely low misjudgment rate - a non-existent element is misjudged as existing (but not vice versa, existing elements will never be missed).

You can choose a local memory version of the Bloom filter based on the data size, or use the Bloom filter module provided by Redis.

Local bloom filter (suitable for single machine with levels below 10 million)

relymmh3andbitarrayLibrary:

pip install mmh3 bitarray
import mmh3
from bitarray import bitarray
from scrapy.exceptions import DropItem

class LocalBloomFilterPipeline:
    """本地内存布隆过滤器Pipeline(注意:应用重启后过滤器会清空)"""
    
    def __init__(self, capacity=10_000_000, error_rate=0.001):
        self.capacity = capacity         # 预期存储的指纹个数
        self.error_rate = error_rate     # 期望的误判率,例如0.001即0.1%
        self._init_bloom()

    def _init_bloom(self):
        # 根据容量和误判率估算位数组大小和哈希函数个数
        # 这里使用预先算好的系数,直接给出常见的对应值,避免公式
        if self.error_rate == 0.001:
            m = int(self.capacity * 14.37)   # 近似系数
            k = 7
        elif self.error_rate == 0.01:
            m = int(self.capacity * 7.21)    # 近似系数
            k = 5
        else:
            # 默认采用安全参数,防止手误配置
            m = int(self.capacity * 14.37)
            k = 7
        self.m = m
        self.k = k
        self.bitarray = bitarray(m)
        self.bitarray.setall(0)

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            capacity=crawler.settings.getint("BLOOM_CAPACITY", 10_000_000),
            error_rate=crawler.settings.getfloat("BLOOM_ERROR_RATE", 0.001)
        )

    def process_item(self, item, spider):
        # 用url生成指纹,如果item没有url则使用整个item字典
        fp = item.get("url", str(sorted(ItemAdapter(item).asdict().items())))
        # 检查布隆过滤器是否已记录
        for i in range(self.k):
            pos = mmh3.hash(fp, i) % self.m
            if not self.bitarray[pos]:
                break
        else:
            # 所有hash位都已置1,说明极大概率是重复数据
            spider.logger.warning(f"布隆过滤器可能检测到重复: {fp[:50]}")
            raise DropItem(f"Possible duplicate: {fp[:50]}")
        # 将指纹加入布隆过滤器
        for i in range(self.k):
            self.bitarray[mmh3.hash(fp, i) % self.m] = 1
        return item

⚠️ Note: The local bloom filter will lose all records after the process is restarted. It is only suitable for short-term large-scale deduplication or temporary scenarios that are not sensitive to restarts. If persistence is required, consider regularly saving the Bloom filter data to disk or using the Redis version of the Bloom filter instead.


Timestamp + Intelligent Incremental Fetching

The core idea of ​​incremental crawling: Only crawl new or updated data after the last crawl, thereby avoiding repeated crawling of historical content.

Timestamp increment spider

The following is an example of an incremental crawler based on the Redis record "last crawl timestamp", which is suitable for data sources with clear release times such as articles and news.

import scrapy
import redis
from datetime import datetime
from myproject.items import MyItem

class IncrementalSpider(scrapy.Spider):
    name = "incremental_spider"
    allowed_domains = ["example.com"]
    start_urls = ["https://example.com/news"]

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.redis_conn = redis.Redis(
            host="127.0.0.1",
            port=6379,
            db=0,
            decode_responses=True   # 时间戳用字符串保存方便对比
        )
        self.last_crawl_key = f"last_crawl:{self.name}"

    def parse(self, response):
        # 1. 获取上次爬取时间(首次运行时默认30天前,确保能抓到近期的历史数据)
        last_crawl_ts = float(
            self.redis_conn.get(self.last_crawl_key) or
            (datetime.now().timestamp() - 86400 * 30)
        )
        # 2. 遍历列表页中的每一条数据
        for news in response.css("div.news-item"):
            pub_time_str = news.css(".pub-time::text").get().strip()
            pub_time_ts = datetime.strptime(pub_time_str, "%Y-%m-%d %H:%M").timestamp()
            if pub_time_ts > last_crawl_ts:   # 只抓比上次时间更新的内容
                detail_url = news.css("a.title::attr(href)").get()
                yield response.follow(detail_url, callback=self.parse_detail)
        # 3. 抓取完成后,更新本次爬取时间
        self.redis_conn.set(self.last_crawl_key, datetime.now().timestamp())

    def parse_detail(self, response):
        """解析详情页,产出Item"""
        item = MyItem()
        item["url"] = response.url
        item["title"] = response.css("h1::text").get().strip()
        item["content"] = "".join(response.css("div.content p::text").getall()).strip()
        yield item

Key points analysis:

  • Time Base: Use Redis to store the completion time of the last crawl, and update it after each crawl;
  • Filtering logic: Only process data whose release time is later than the benchmark;
  • Backup Strategy: Set an older initial time (such as 30 days ago) when running for the first time to avoid missing recently updated data.

Core performance optimization

Memory LRU cache + Redis batch pipeline

This is the most immediate performance optimization solution:

  1. Memory LRU cache: Quickly filter recently crawled popular data to reduce frequent access to Redis;
  2. Redis batch pipeline: Send multiple fingerprint write commands in batches to reduce the number of network round-trips.
import time

# 在基础Pipeline中增加LRU缓存和批量操作
class OptimizedRedisPipeline(RedisFingerprintPipeline):
    def __init__(self, redis_conf):
        super().__init__(redis_conf)
        self.lru_cache = dict()
        self.lru_max = 10000          # 缓存最多10000条指纹
        self.pending_fps = []         # 待批量写入的指纹
        self.batch_size = 100         # 每累积100条指纹执行一次批量写入

    def process_item(self, item, spider):
        fp = self._gen_fingerprint(item)
        # 1. LRU缓存快速去重
        if fp in self.lru_cache:
            spider.logger.debug("LRU缓存命中,跳过重复")
            raise DropItem("LRU duplicate")
        # 2. 检查Redis
        if self.redis_conn.exists(fp):
            spider.logger.debug("Redis命中,跳过重复")
            raise DropItem("Redis duplicate")
        # 3. 加入LRU缓存(并淘汰最久未使用的条目)
        self.lru_cache[fp] = time.time()
        if len(self.lru_cache) > self.lru_max:
            self.lru_cache.pop(next(iter(self.lru_cache)))
        # 4. 添加到待批量写入列表
        self.pending_fps.append(fp)
        if len(self.pending_fps) >= self.batch_size:
            self._flush_pending(spider)
        return item

    def _flush_pending(self, spider):
        """批量将指纹写入Redis,使用Pipeline管道"""
        if not self.pending_fps:
            return
        pipe = self.redis_conn.pipeline()
        expire = 86400 * self.expire_days
        for fp in self.pending_fps:
            pipe.setex(fp, expire, b"1")
        pipe.execute()
        spider.logger.debug(f"批量存入 {len(self.pending_fps)} 个指纹到Redis")
        self.pending_fps.clear()

    def close_spider(self, spider):
        # 爬虫关闭时确保所有待写入指纹都被处理
        self._flush_pending(spider)
        super().close_spider(spider)

2 high-frequency problems solved

Problem 1: Long-running memory overflow

Cause: The LRU cache grows indefinitely or the Redis fingerprint expiration time is too long, causing the memory to expand continuously.

Solution:

  • Adjust the LRU cache size, such as setting it according to the server memory situationlru_max = 5000
  • Shorten Redis fingerprint expiration timeDUPLICATE_EXPIRE_DAYS = 3
  • Manually clean the Redis deduplication database regularly (such as weekly), or use RedisFLUSHDBOrder.

Problem 2: Redis connection timed out/disconnected

Cause: Network jitter or Redis service load is too high.

Workaround: Increase the timeout, retry and health check configuration when creating a Redis connection.

from redis.backoff import ExponentialBackoff
from redis.retry import Retry

def open_spider(self, spider):
    self.redis_conn = redis.Redis(
        host=self.redis_conf["host"],
        port=self.redis_conf["port"],
        db=self.redis_conf["db"],
        password=self.redis_conf["password"],
        decode_responses=False,
        socket_connect_timeout=5,
        socket_timeout=5,
        retry_on_timeout=True,
        retry=Retry(ExponentialBackoff(cap=10), 3),  # 指数退避重试,最多3次
        health_check_interval=30                     # 每30秒检查一次连接健康状态
    )

Exponential backoff retry means: wait 1 second for the first retry, wait 2 seconds for the second time, and wait 4 seconds for the third time (up to 10 seconds), which can effectively avoid the entire crawler being interrupted due to instantaneous connection failure.


💡 Core Summary:

  1. Simple scenario: URL standardization + SHA256 Redis fingerprint deduplication can solve most duplication problems;
  2. Large-scale scenario: local Bloom filter + Redis batch pipeline to achieve low-cost and high-performance deduplication;
  3. Incremental crawling: Prioritize the use of incremental solutions based on timestamps, and then add dynamic scheduling if conditions permit;
  4. Fault Tolerance and Monitoring: Be sure to configure Redis retry and health check to prevent a single point of failure from paralyzing the crawler.