Distributed deduplication and scheduling - detailed explanation of efficient deduplication algorithm and distributed coordination mechanism

📂 Stage: Stage 5 - Combat Power Upgrade (Distributed and Advanced) 🔗 Related chapters: Scrapy-Redis分布式架构 · Spider中间件深度定制 · 大规模爬虫优化

Table of contents


The core challenges and selections of distributed deduplication

When the crawler expands from a single machine to multiple nodes working together, repeated crawling becomes the biggest cost black hole. Not only do you have to ensure that the same node does not process the same URL repeatedly, but you also need to ensure that there is no "crash" between nodes. This presents several real challenges.

Three core challenges

Challenge DimensionWhy It’s Difficult
Data consistencyAll nodes need to share a list of "processed URLs". Who changed what and when they changed must be clear
Performance and storageThe deduplication judgment of hundreds of millions of URLs must be fast (millisecond level) without eating up all the memory
ScalabilityNodes may be added or deleted at any time. The deduplication solution cannot be reinvented and must be able to adapt smoothly

Comparison of mainstream deduplication solutions

The following table lists several common solutions. After reading this, you will know when to choose which one.

Solution typeAdvantagesDisadvantagesApplicable scenarios
Local Set / BloomExtremely fast, zero network overheadUnable to share deduplication information across nodesSingle-machine testing, temporary small tasks
Redis SetAccurate deduplication, native support for distributionHuge memory overhead for massive URLsMedium scale (< 10 million URLs)
Redis Bloom filterExtremely space efficient, multi-node sharingThere is a very low probability of "false positive" misjudgmentsThe first choice for very large scale (> 100 million URLs)

What false positive means: the filter may occasionally tell you "this URL already exists", when in fact it is new. This is usually acceptable with large data volumes, because we can also confirm it twice through other means.


Bloom filter: the king of space efficiency

Bloom Filter (Bloom Filter) is a very smart probabilistic data structure. It has two unwavering properties:

  1. The judgment of "does not exist" is 100% accurate - As long as it says that the URL has not been processed, then you can rest assured.
  2. There is a very low misjudgment rate in judging "existence" - Occasionally, a new URL will be "unjustly accused", but the probability can be controlled.

Its working principle can be understood as: each URL is mapped to a very long binary array by multiple hash functions; when adding, the corresponding positions are set to 1, and when querying, check whether these positions are all 1.

Simplified version of Bloom filter

The following implementation omits complex parameter derivation and directly uses preconfigured values ​​(bit array length and number of hash functions) that have been verified in actual combat. You can think of it as an out-of-the-box tool.

import mmh3
from bitarray import bitarray

class SimpleBloomFilter:
    def __init__(self, capacity=1000000, error_rate=0.01):
        """
        初始化布隆过滤器
        capacity  : 预期插入的元素数量
        error_rate: 可接受的最大假阳性率(默认1%)
        """
        # 预配置的常用参数组合(位数组大小 m,哈希函数个数 k)
        self.configs = {
            (1000000, 0.01):   (14377587, 10),
            (10000000, 0.001): (143775871, 10),
        }
        default_m, default_k = 14377587, 10
        self.m, self.k = self.configs.get((capacity, error_rate), (default_m, default_k))

        self.bit_array = bitarray(self.m)
        self.bit_array.setall(0)

    def _get_hashes(self, item):
        """用不同种子生成 k 个哈希位置"""
        return [mmh3.hash(item, i) % self.m for i in range(self.k)]

    def add(self, item):
        """将URL标记为已处理"""
        for idx in self._get_hashes(item):
            self.bit_array[idx] = 1

    def contains(self, item):
        """检查URL是否可能已经存在"""
        return all(self.bit_array[idx] for idx in self._get_hashes(item))

# ---- 使用示例 ----
bf = SimpleBloomFilter()
urls = ["https://a.com", "https://b.com", "https://a.com"]
for url in urls:
    if not bf.contains(url):
        print(f"✅ 新URL: {url}")
        bf.add(url)
    else:
        print(f"❌ 可能重复: {url}")

The running results are similar:

✅ 新URL: https://a.com
✅ 新URL: https://b.com
❌ 可能重复: https://a.com

Redis distributed deduplication landing

Local bloom filters run very fast, but only live in the memory of a single process. To make it serve the entire crawler cluster, we need to move the bit array to Redis - all nodes read and write the same Bloom filter through Redis.

Redis version of Bloom filter (single instance)

import redis
import mmh3

class RedisBloomFilter:
    def __init__(self, redis_client, key_prefix="crawler_bloom", capacity=1000000, error_rate=0.01):
        self.redis = redis_client
        self.key = f"{key_prefix}:bitmap"
        # 沿用和本地布隆相同的预配置参数
        configs = {(1000000, 0.01): (14377587, 10)}
        self.m, self.k = configs.get((capacity, error_rate), (14377587, 10))

    def _get_hashes(self, item):
        return [mmh3.hash(item, i) % self.m for i in range(self.k)]

    def add(self, item):
        """批量执行 Redis 位操作,减少网络往返"""
        hashes = self._get_hashes(item)
        pipe = self.redis.pipeline()
        for idx in hashes:
            pipe.setbit(self.key, idx, 1)
        pipe.execute()

    def contains(self, item):
        hashes = self._get_hashes(item)
        return all(self.redis.getbit(self.key, idx) for idx in hashes)

# ---- 连接 Redis 并初始化 ----
redis_client = redis.Redis(host="localhost", port=6379, db=0, decode_responses=False)
rbf = RedisBloomFilter(redis_client)

Using RedissetbitandgetbitOperation, it only takes up a very small amount of memory, but can support the judgment of billions of URLs.


Distributed lock: ensure concurrency security

If multiple crawler nodes add URLs to Redis at the same time, or fetch new task seeds at the same time, it is easy for data competition to occur. For example: both nodes thought they were the first to get a certain seed, and ended up crawling a URL twice.

At this time, distributed lock is needed to control concurrency. Provided by RedisSET key value NX EXThe command can atomically implement "only set when the key does not exist, and automatically set the expiration time".

The simplest and most practical Redis distributed lock

import uuid
import time

class SimpleRedisLock:
    def __init__(self, redis_client, lock_key, timeout=30):
        self.redis = redis_client
        self.lock_key = lock_key
        self.timeout = timeout          # 锁自动过期时间,防止死锁
        self.identifier = str(uuid.uuid4())  # 唯一标识,防止误删别人的锁

    def acquire(self):
        """原子性加锁,仅在锁不存在时成功"""
        return self.redis.set(
            self.lock_key,
            self.identifier,
            nx=True,
            ex=self.timeout
        )

    def release(self):
        """安全释放锁:先检查是不是自己的锁,再删除"""
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        return self.redis.eval(lua_script, 1, self.lock_key, self.identifier)

# ---- 典型用法 ----
lock = SimpleRedisLock(redis_client, "crawler:seed:lock")
if lock.acquire():
    try:
        print("🔒 获得锁,操作共享资源")
        # 这里写获取种子、分派任务等关键操作
    finally:
        lock.release()
        print("🔓 释放锁")
else:
    print("⚠️ 获取锁失败,稍后重试")

Although this implementation is simple, it has solved the two core problems:

  1. Lock atomicity – multiple nodes will not acquire locks at the same time
  2. Automatic Expiration - If the node holding the lock crashes, the lock will not be permanently occupied

The Lua script is used to release the lock to ensure the atomicity of the two-step operations of "checking the identification" and "deleting" to avoid accidentally deleting the locks just acquired by other nodes.


Consistent Hash: Load Balancing Artifact

When we assign crawler tasks to different nodes, we generally use the hash modulus method (hash(task) % N). But once the number of nodes changes (expansion, shrinkage, failure), almost all tasks need to be reallocated - which will cause large-scale cache invalidation and state migration.

Consistent hashing maps both nodes and tasks onto a ring, with each task assigned clockwise to the first reached node. When nodes are added or deleted, only the tasks of adjacent nodes need to be migrated, and most others remain unchanged.

Simplified version of consistent hash ring

import hashlib
import bisect

class ConsistentHashRing:
    def __init__(self, nodes=None, virtual_nodes=150):
        """
        nodes         : 真实节点列表(如 ["node1", "node2"])
        virtual_nodes : 每个真实节点的虚拟节点数量,用于均衡分布
        """
        self.virtual_nodes = virtual_nodes
        self.ring = {}
        self.sorted_keys = []
        if nodes:
            for node in nodes:
                self.add_node(node)

    def _hash(self, key):
        """将任意字符串映射为一个整数"""
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

    def add_node(self, node):
        """添加一个真实节点,并为其生成多个虚拟节点"""
        for i in range(self.virtual_nodes):
            virtual_key = f"{node}:vnode:{i}"
            hash_key = self._hash(virtual_key)
            self.ring[hash_key] = node
            self.sorted_keys.append(hash_key)
        self.sorted_keys.sort()

    def get_node(self, key):
        """根据任务ID找到应该负责的节点"""
        if not self.ring:
            return None
        hash_key = self._hash(key)
        idx = bisect.bisect_right(self.sorted_keys, hash_key)
        # 环形结构,若超出范围则回到第一个节点
        if idx == len(self.sorted_keys):
            idx = 0
        return self.ring[self.sorted_keys[idx]]

# ---- 演示任务分配 ----
ring = ConsistentHashRing(["node1", "node2", "node3"])
tasks = [f"task_{i}" for i in range(10)]
for task in tasks:
    node = ring.get_node(task)
    print(f"📋 任务 {task} -> 节点 {node}")

Output example:

📋 任务 task_0 -> 节点 node2
📋 任务 task_1 -> 节点 node1
📋 任务 task_2 -> 节点 node3
📋 任务 task_3 -> 节点 node1
...

Each node handles approximately one-third of the load. And when you add a fourth node, only about a quarter of the original tasks will be migrated to the new node, greatly reducing system jitter.


Core Optimization Strategy

Now that you have mastered the basic tools, here are some optimization tips for immediate results in production environments.

1. Batch processing

Every network round trip to Redis is overhead. existRedisBloomFilter.add()We have used pipeline to combine multiplesetbitCombined into one communication. Similarly, when querying batch URLs, you can also use pipeline to get all the bits at once.

2. Local cache

For URLs with a high recurrence rate (such as list pages and directory page links), you can add a layer of LRU cache in the process to avoid querying the Redis Bloom filter every time.

from cachetools import LRUCache

class CachedRedisBloomFilter(RedisBloomFilter):
    def __init__(self, *args, cache_size=10000, **kwargs):
        super().__init__(*args, **kwargs)
        self.cache = LRUCache(maxsize=cache_size)

    def contains(self, item):
        if item in self.cache:
            return self.cache[item]
        res = super().contains(item)
        self.cache[item] = res
        return res

    def add(self, item):
        super().add(item)
        self.cache[item] = True   # 新添加的直接标记为存在

3. Segmented design

  • Segment lock: Split shared resources into multiple segments, each segment has a lock to reduce competition
  • Segmented Bloom: A single large Redis bloom may become a performance bottleneck and can be split into multiple bloom filters by URL prefix or hash value, scattered on different Redis instances

Best Practice Summary

Design principles at a glance

PrinciplesDescription
Hierarchical deduplicationLocal LRU cache -> Redis Bloom filter -> Redis Set (accurate bottom line, used in case of misjudgment)
Performance firstProbabilistic data structures can withstand massive pressure, while precise structures are only used for a small amount of confirmation
Fault-tolerant designLocks must have automatic expiration; Bloom filters are regularly backed up to disk to prevent data loss
Complete monitoringPay attention to key indicators such as deduplication hit rate, response delay, Redis memory usage, Bloom filter filling rate, etc.

Quick location of frequently asked questions

PhenomenonPossible causesSolutions
Bloom filter false positive rate is too highCapacity underestimated or overfilledIncrease capacity or reduce error_rate
Redis memory usage is too largeUse Set instead of BloomReplace Set with Redis Bloom filter
Fierce competition for distributed locksLock granularity is too coarse or the holding time is too longSegmented locks, narrowing critical sections
A large number of tasks are redistributed after the node is added or deletedSimple hash modulus is usedSwitch to consistent hashing

This "duplication + scheduling" combination has supported multiple crawler clusters with an average of hundreds of millions of URLs per day. You can directly apply it according to your own business, or you can replace the components as needed (for example, replace Redis with Redis Cluster or alternatives). The key is to master the idea of ​​layering, the combination of probability and accuracy, and the atomicity guarantee of distributed coordination.


🔗 Recommended related tutorials

🏷️ tag cloud:分布式去重 布隆过滤器 分布式锁 一致性哈希 Redis 爬虫优化