分布式去重与调度 - 高效去重算法与分布式协调机制详解

📂 所属阶段:第五阶段 — 战力升级(分布式与进阶篇)
🔗 相关章节:Scrapy-Redis分布式架构 · Spider中间件深度定制 · 大规模爬虫优化

目录


分布式去重的核心挑战与选型

在多节点爬虫中,去重是核心环节:避免资源浪费,保证数据质量。

三大核心挑战

维度痛点说明
数据一致性多节点需共享去重状态,防止跨节点重复抓取
性能与存储海量URL下,需兼顾低延迟判断和低内存/存储占用
可扩展性支持节点动态增减,无需重构去重架构

主流去重方案对比

方案类型优势劣势适用场景
本地Set/布隆速度极快,无网络开销无法跨节点去重单机/小规模测试
Redis Set精确去重,天然分布式内存消耗大,海量URL不适用中小规模(<1000万URL)
Redis 布隆过滤器空间效率高,分布式共享存在低概率假阳性大规模(>1亿URL)首选

布隆过滤器:空间效率之王

布隆过滤器是一种概率型数据结构:判断“不存在”100%准确,判断“存在”有低概率误判,但可以通过参数调整误判率。

简化版本地布隆过滤器(跳过参数数学推导)

import mmh3
from bitarray import bitarray

class SimpleBloomFilter:
    def __init__(self, capacity=1000000, error_rate=0.01):
        """
        初始化布隆过滤器
        capacity: 预期插入元素数
        error_rate: 可接受的假阳性率(默认1%)
        """
        # 预配置常用参数
        self.configs = {
            (1000000, 0.01): (14377587, 10),
            (10000000, 0.001): (143775871, 10),
        }
        # 找不到配置则用默认近似
        default_m, default_k = 14377587, 10
        self.m, self.k = self.configs.get((capacity, error_rate), (default_m, default_k))
        
        self.bit_array = bitarray(self.m)
        self.bit_array.setall(0)
    
    def _get_hashes(self, item):
        """用不同种子生成多个哈希值"""
        return [mmh3.hash(item, i) % self.m for i in range(self.k)]
    
    def add(self, item):
        """添加元素"""
        for idx in self._get_hashes(item):
            self.bit_array[idx] = 1
    
    def contains(self, item):
        """检查元素是否可能存在"""
        return all(self.bit_array[idx] for idx in self._get_hashes(item))

# 使用示例
bf = SimpleBloomFilter()
urls = ["https://a.com", "https://b.com", "https://a.com"]
for url in urls:
    if not bf.contains(url):
        print(f"✅ 新URL: {url}")
        bf.add(url)
    else:
        print(f"❌ 可能重复: {url}")

Redis分布式去重落地

将本地布隆过滤器迁移到Redis,实现多节点共享去重状态。

Redis版布隆过滤器(单实例)

import redis
import mmh3
from bitarray import bitarray

class RedisBloomFilter:
    def __init__(self, redis_client, key_prefix="crawler_bloom", capacity=1000000, error_rate=0.01):
        self.redis = redis_client
        self.key = f"{key_prefix}:bitmap"
        # 复用本地布隆的预配置参数
        configs = {(1000000, 0.01): (14377587, 10)}
        self.m, self.k = configs.get((capacity, error_rate), (14377587, 10))
    
    def _get_hashes(self, item):
        return [mmh3.hash(item, i) % self.m for i in range(self.k)]
    
    def add(self, item):
        """批量Redis操作提高性能"""
        hashes = self._get_hashes(item)
        pipe = self.redis.pipeline()
        for idx in hashes:
            pipe.setbit(self.key, idx, 1)
        pipe.execute()
    
    def contains(self, item):
        hashes = self._get_hashes(item)
        return all(self.redis.getbit(self.key, idx) for idx in hashes)

# 连接Redis并初始化
redis_client = redis.Redis(host="localhost", port=6379, db=0, decode_responses=False)
rbf = RedisBloomFilter(redis_client)

分布式锁:保证并发安全

分布式场景中,多节点可能同时操作共享资源(比如更新种子队列),需要用Redis分布式锁保证原子性。

最简实用Redis分布式锁

import uuid
import time

class SimpleRedisLock:
    def __init__(self, redis_client, lock_key, timeout=30):
        self.redis = redis_client
        self.lock_key = lock_key
        self.timeout = timeout  # 锁自动过期时间,防止死锁
        self.identifier = str(uuid.uuid4())  # 唯一标识,防止误删别人的锁
    
    def acquire(self):
        """获取锁:原子性操作,仅在锁不存在时设置"""
        return self.redis.set(
            self.lock_key,
            self.identifier,
            nx=True,
            ex=self.timeout
        )
    
    def release(self):
        """释放锁:用Lua脚本保证原子性,先检查标识再删除"""
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        return self.redis.eval(lua_script, 1, self.lock_key, self.identifier)

# 使用示例
lock = SimpleRedisLock(redis_client, "crawler:seed:lock")
if lock.acquire():
    try:
        print("🔒 获得锁,操作共享资源")
        # 这里写操作种子队列的代码
    finally:
        lock.release()
        print("🔓 释放锁")
else:
    print("⚠️ 获取锁失败,稍后重试")

一致性哈希:负载均衡神器

普通哈希取模在节点增减时会导致大量任务重分配,一致性哈希可以将重分配的任务数降到最低。

简化版一致性哈希环

import hashlib
import bisect

class ConsistentHashRing:
    def __init__(self, nodes=None, virtual_nodes=150):
        """
        初始化一致性哈希环
        nodes: 真实节点列表
        virtual_nodes: 虚拟节点数(默认150,解决节点分布不均匀问题)
        """
        self.virtual_nodes = virtual_nodes
        self.ring = {}
        self.sorted_keys = []
        if nodes:
            for node in nodes:
                self.add_node(node)
    
    def _hash(self, key):
        """生成哈希值"""
        return int(hashlib.md5(key.encode()).hexdigest(), 16)
    
    def add_node(self, node):
        """添加真实节点(同时添加虚拟节点)"""
        for i in range(self.virtual_nodes):
            virtual_key = f"{node}:vnode:{i}"
            hash_key = self._hash(virtual_key)
            self.ring[hash_key] = node
            self.sorted_keys.append(hash_key)
        self.sorted_keys.sort()
    
    def get_node(self, key):
        """根据任务ID获取对应节点"""
        if not self.ring:
            return None
        hash_key = self._hash(key)
        # 找到第一个大于等于hash_key的位置
        idx = bisect.bisect_right(self.sorted_keys, hash_key)
        # 环形结构,超出边界返回第一个节点
        if idx == len(self.sorted_keys):
            idx = 0
        return self.ring[self.sorted_keys[idx]]

# 使用示例
ring = ConsistentHashRing(["node1", "node2", "node3"])
tasks = [f"task_{i}" for i in range(10)]
for task in tasks:
    node = ring.get_node(task)
    print(f"📋 任务 {task} -> 节点 {node}")

核心优化策略

1. 批量处理

减少Redis网络调用次数,比如批量添加布隆过滤器位:

# 上述RedisBloomFilter.add()已经使用了pipeline批量操作

2. 本地缓存

缓存热点URL的去重结果,减少远程布隆过滤器的访问:

# 可以用LRU缓存库(如cachetools)实现本地热点缓存
from cachetools import LRUCache

class CachedRedisBloomFilter(RedisBloomFilter):
    def __init__(self, *args, cache_size=10000, **kwargs):
        super().__init__(*args, **kwargs)
        self.cache = LRUCache(maxsize=cache_size)
    
    def contains(self, item):
        if item in self.cache:
            return self.cache[item]
        res = super().contains(item)
        self.cache[item] = res
        return res
    
    def add(self, item):
        super().add(item)
        self.cache[item] = True  # 新增的直接标记为可能存在

3. 分段设计

  • 分段锁:将共享资源分成多段,减少锁竞争
  • 分段布隆:单个Redis布隆可能成为瓶颈,可拆分为多个布隆分段

最佳实践总结

设计原则

  1. 分层去重:本地LRU缓存 -> Redis布隆过滤器 -> Redis Set(误判兜底)
  2. 性能优先:优先使用快速的概率型结构,误判兜底用精确结构
  3. 容错设计:锁加自动过期时间,布隆过滤器定期备份
  4. 监控完备:监控去重命中率、响应时间、Redis内存/布隆负载

常见问题快速解决

问题现象解决方向
布隆过滤器假阳性率过高增加容量或降低error_rate参数
Redis内存占用过大用Redis布隆替代Redis Set
分布式锁竞争激烈分段锁或优化锁持有时间
节点增减任务重分配过多使用一致性哈希环

🔗 相关教程推荐

🏷️ 标签云: 分布式去重 布隆过滤器 分布式锁 一致性哈希 Redis 爬虫优化