分布式去重与调度 - 高效去重算法与分布式协调机制详解

📂 所属阶段:第五阶段 — 战力升级(分布式与进阶篇)
🔗 相关章节:Scrapy-Redis分布式架构 · Spider中间件深度定制 · 大规模爬虫优化

目录

分布式去重的重要性

在分布式爬虫系统中,去重是一个至关重要的环节。由于多个爬虫节点同时工作,很容易出现重复抓取同一URL的情况,这不仅浪费网络资源,还可能导致数据质量问题。

去重挑战分析

"""
分布式去重面临的主要挑战:

1. 数据一致性:
   - 多节点间的去重数据同步
   - 避免重复抓取同一URL
   - 保证去重数据的准确性

2. 性能要求:
   - 高并发下的快速去重判断
   - 低延迟的去重操作
   - 内存和存储效率

3. 可扩展性:
   - 支持节点动态增减
   - 处理海量URL去重
   - 负载均衡考虑
"""

去重策略对比

"""
常见的去重策略及其特点:

1. 本地去重:
   - 优点:速度快,内存占用少
   - 缺点:无法处理跨节点重复
   - 适用:单机爬虫或小规模分布式

2. 中心化去重:
   - 优点:全局一致性好
   - 缺点:单点故障风险,性能瓶颈
   - 适用:中小规模爬虫集群

3. 分布式去重:
   - 优点:高可用,高性能,可扩展
   - 缺点:实现复杂,一致性难保证
   - 适用:大规模分布式爬虫
"""

布隆过滤器详解

布隆过滤器是一种空间效率极高的概率型数据结构,用于判断一个元素是否存在于集合中。

基础布隆过滤器实现

import hashlib
import math
from bitarray import bitarray

class BloomFilter:
    """
    布隆过滤器实现
    """
    
    def __init__(self, capacity: int, error_rate: float = 0.01):
        """
        初始化布隆过滤器
        
        Args:
            capacity: 预期插入的元素数量
            error_rate: 误判率(假阳性率)
        """
        self.capacity = capacity
        self.error_rate = error_rate
        
        # 计算位数组大小和哈希函数数量
        self.bit_array_size = self._get_bit_array_size()
        self.hash_count = self._get_hash_count()
        
        # 初始化位数组
        self.bit_array = bitarray(self.bit_array_size)
        self.bit_array.setall(0)
        
        # 统计信息
        self.inserted_count = 0
    
    def _get_bit_array_size(self) -> int:
        """
        计算位数组大小
        公式: m = -(n * ln(p)) / (ln(2)^2)
        """
        import math
        m = -(self.capacity * math.log(self.error_rate)) / (math.log(2) ** 2)
        return int(m)
    
    def _get_hash_count(self) -> int:
        """
        计算哈希函数数量
        公式: k = (m/n) * ln(2)
        """
        import math
        k = (self.bit_array_size / self.capacity) * math.log(2)
        return int(k)
    
    def _hash(self, item: str, seed: int) -> int:
        """
        生成哈希值(使用不同种子生成多个哈希函数)
        """
        import mmh3
        return mmh3.hash(item, seed) % self.bit_array_size
    
    def add(self, item: str):
        """
        添加元素到布隆过滤器
        """
        for i in range(self.hash_count):
            index = self._hash(item, i)
            self.bit_array[index] = 1
        
        self.inserted_count += 1
    
    def contains(self, item: str) -> bool:
        """
        检查元素是否可能存在于集合中
        注意:可能存在假阳性,但不会有假阴性
        """
        for i in range(self.hash_count):
            index = self._hash(item, i)
            if not self.bit_array[index]:
                return False
        return True
    
    def estimate_size(self) -> int:
        """
        估计集合中元素的实际数量
        使用公式: n = -m/k * ln(1 - X/m)
        其中 X 是位数组中1的数量
        """
        x = self.bit_array.count(1)
        if x == 0:
            return 0
        
        import math
        estimated = -self.bit_array_size / self.hash_count * math.log(1 - x / self.bit_array_size)
        return int(estimated)
    
    def load_factor(self) -> float:
        """
        计算负载因子
        """
        return self.inserted_count / self.capacity if self.capacity > 0 else 0

# 使用示例
bf = BloomFilter(capacity=1000000, error_rate=0.01)
urls = [
    'https://example.com/page1',
    'https://example.com/page2',
    'https://example.com/page3'
]

for url in urls:
    if not bf.contains(url):
        print(f"Processing: {url}")
        bf.add(url)
    else:
        print(f"Duplicate detected: {url}")

Redis布隆过滤器

import redis
import mmh3
import math
from bitarray import bitarray

class RedisBloomFilter:
    """
    基于Redis的布隆过滤器
    """
    
    def __init__(self, redis_client, key_prefix: str, capacity: int = 1000000, error_rate: float = 0.01):
        """
        初始化Redis布隆过滤器
        
        Args:
            redis_client: Redis客户端实例
            key_prefix: Redis键前缀
            capacity: 预期容量
            error_rate: 误判率
        """
        self.redis_client = redis_client
        self.key_prefix = key_prefix
        self.capacity = capacity
        self.error_rate = error_rate
        
        # 计算参数
        self.bit_array_size = int(-(capacity * math.log(error_rate)) / (math.log(2) ** 2))
        self.hash_count = int((self.bit_array_size / capacity) * math.log(2))
        
        # Redis位图键
        self.bitmap_key = f"{key_prefix}:bloom_bitmap"
    
    def _get_hashes(self, item: str) -> list:
        """
        获取多个哈希值
        """
        hashes = []
        for i in range(self.hash_count):
            hash_val = mmh3.hash(item, i) % self.bit_array_size
            hashes.append(hash_val)
        return hashes
    
    def add(self, item: str):
        """
        添加元素到布隆过滤器
        """
        hashes = self._get_hashes(item)
        pipe = self.redis_client.pipeline()
        
        for hash_val in hashes:
            pipe.setbit(self.bitmap_key, hash_val, 1)
        
        pipe.execute()
    
    def contains(self, item: str) -> bool:
        """
        检查元素是否存在
        """
        hashes = self._get_hashes(item)
        
        for hash_val in hashes:
            if not self.redis_client.getbit(self.bitmap_key, hash_val):
                return False
        
        return True
    
    def clear(self):
        """
        清空布隆过滤器
        """
        self.redis_client.delete(self.bitmap_key)
    
    def info(self) -> dict:
        """
        获取布隆过滤器信息
        """
        # 获取位数组中1的数量
        # 注意:这里只是估算,实际需要遍历整个位图
        info = {
            'capacity': self.capacity,
            'error_rate': self.error_rate,
            'bit_array_size': self.bit_array_size,
            'hash_count': self.hash_count,
            'bitmap_key': self.bitmap_key
        }
        return info

# 使用示例
redis_client = redis.Redis(host='localhost', port=6379, db=0)
redis_bf = RedisBloomFilter(redis_client, 'crawler_bloom', capacity=1000000, error_rate=0.001)

# 测试去重
test_urls = [
    'https://example.com/test1',
    'https://example.com/test2',
    'https://example.com/test1'  # 重复
]

for url in test_urls:
    if not redis_bf.contains(url):
        print(f"Processing: {url}")
        redis_bf.add(url)
    else:
        print(f"Duplicate detected: {url}")

高级布隆过滤器特性

import time
from typing import Tuple

class AdvancedBloomFilter:
    """
    高级布隆过滤器,支持计数和过期功能
    """
    
    def __init__(self, capacity: int, error_rate: float = 0.01, expiration_time: int = 86400):
        """
        初始化高级布隆过滤器
        
        Args:
            capacity: 预期容量
            error_rate: 误判率
            expiration_time: 过期时间(秒)
        """
        self.capacity = capacity
        self.error_rate = error_rate
        self.expiration_time = expiration_time
        
        # 基础布隆过滤器
        self.main_bf = BloomFilter(capacity, error_rate)
        
        # 时间戳记录
        self.timestamps = {}
        
        # 本地缓存(减少Redis访问)
        self.local_cache = {}
        self.cache_size_limit = 10000
    
    def add(self, item: str, timestamp: float = None):
        """
        添加元素并记录时间戳
        """
        if timestamp is None:
            timestamp = time.time()
        
        self.main_bf.add(item)
        self.timestamps[item] = timestamp
        
        # 更新本地缓存
        self._update_local_cache(item, True)
    
    def contains(self, item: str) -> Tuple[bool, bool]:
        """
        检查元素是否存在,并返回是否可能过期
        
        Returns:
            (存在标志, 是否可能过期)
        """
        # 检查本地缓存
        if item in self.local_cache:
            cached_time, cached_exists = self.local_cache[item]
            if time.time() - cached_time < 300:  # 5分钟缓存
                return cached_exists, False
        
        # 检查布隆过滤器
        exists = self.main_bf.contains(item)
        
        # 检查过期
        expired = False
        if exists and item in self.timestamps:
            if time.time() - self.timestamps[item] > self.expiration_time:
                expired = True
        
        # 更新本地缓存
        self._update_local_cache(item, exists)
        
        return exists, expired
    
    def _update_local_cache(self, item: str, exists: bool):
        """
        更新本地缓存
        """
        if len(self.local_cache) >= self.cache_size_limit:
            # 清理一半缓存
            keys_to_remove = list(self.local_cache.keys())[:self.cache_size_limit // 2]
            for key in keys_to_remove:
                del self.local_cache[key]
        
        self.local_cache[item] = (time.time(), exists)
    
    def cleanup_expired(self):
        """
        清理过期数据
        """
        current_time = time.time()
        expired_keys = []
        
        for item, timestamp in self.timestamps.items():
            if current_time - timestamp > self.expiration_time:
                expired_keys.append(item)
        
        # 从布隆过滤器中移除过期数据(实际上无法移除,这里只是清理时间戳)
        for key in expired_keys:
            del self.timestamps[key]
            # 从本地缓存中移除
            if key in self.local_cache:
                del self.local_cache[key]

# 使用示例
adv_bf = AdvancedBloomFilter(capacity=100000, error_rate=0.01, expiration_time=3600)  # 1小时过期

# 添加URL
adv_bf.add('https://example.com/page1')
time.sleep(2)
adv_bf.add('https://example.com/page2')

# 检查是否存在
exists, expired = adv_bf.contains('https://example.com/page1')
print(f"URL exists: {exists}, Expired: {expired}")

Redis分布式去重

Redis提供了多种数据结构来实现高效的分布式去重。

Redis Set去重

import redis
import hashlib
from typing import List, Union

class RedisSetDeduplicator:
    """
    基于Redis Set的去重器
    """
    
    def __init__(self, redis_client, set_key: str = 'dedup_set'):
        self.redis_client = redis_client
        self.set_key = set_key
    
    def add(self, item: str) -> bool:
        """
        添加元素,返回是否为新元素
        
        Returns:
            True: 新元素,False: 重复元素
        """
        # 生成元素的哈希值作为集合成员
        item_hash = hashlib.md5(item.encode()).hexdigest()
        
        added = self.redis_client.sadd(self.set_key, item_hash)
        return bool(added)
    
    def contains(self, item: str) -> bool:
        """
        检查元素是否存在
        """
        item_hash = hashlib.md5(item.encode()).hexdigest()
        return bool(self.redis_client.sismember(self.set_key, item_hash))
    
    def batch_add(self, items: List[str]) -> int:
        """
        批量添加元素
        
        Returns:
            成功添加的新元素数量
        """
        item_hashes = [hashlib.md5(item.encode()).hexdigest() for item in items]
        pipe = self.redis_client.pipeline()
        
        added_count = 0
        for item_hash in item_hashes:
            pipe.sadd(self.set_key, item_hash)
        
        results = pipe.execute()
        added_count = sum(results)
        
        return added_count
    
    def get_size(self) -> int:
        """
        获取去重集合大小
        """
        return self.redis_client.scard(self.set_key)
    
    def clear(self):
        """
        清空去重集合
        """
        self.redis_client.delete(self.set_key)

# 使用示例
redis_client = redis.Redis(host='localhost', port=6379, db=0)
dedup = RedisSetDeduplicator(redis_client, 'crawler_dedup_set')

urls = [
    'https://example.com/page1',
    'https://example.com/page2',
    'https://example.com/page1',  # 重复
    'https://example.com/page3'
]

for url in urls:
    is_new = dedup.add(url)
    print(f"URL: {url}, Is New: {is_new}, Set Size: {dedup.get_size()}")

Redis HyperLogLog去重

class RedisHyperLogLogDeduplicator:
    """
    基于Redis HyperLogLog的去重器(用于估算基数)
    """
    
    def __init__(self, redis_client, hll_key: str = 'dedup_hll'):
        self.redis_client = redis_client
        self.hll_key = hll_key
    
    def add(self, item: str):
        """
        添加元素到HyperLogLog
        """
        item_hash = hashlib.md5(item.encode()).hexdigest()
        self.redis_client.pfadd(self.hll_key, item_hash)
    
    def batch_add(self, items: List[str]):
        """
        批量添加元素
        """
        item_hashes = [hashlib.md5(item.encode()).hexdigest() for item in items]
        self.redis_client.pfadd(self.hll_key, *item_hashes)
    
    def get_count(self) -> int:
        """
        获取去重后的元素数量估算值
        """
        return self.redis_client.pfcount(self.hll_key)
    
    def merge(self, other_hll_key: str):
        """
        合并另一个HyperLogLog
        """
        self.redis_client.pfmerge(self.hll_key, self.hll_key, other_hll_key)

# 使用示例
hll_dedup = RedisHyperLogLogDeduplicator(redis_client, 'crawler_hll_dedup')

# 添加大量URL
test_urls = [f'https://example.com/page{i}' for i in range(1000)]
# 添加一些重复URL
test_urls.extend([f'https://example.com/page{i}' for i in range(100)])

hll_dedup.batch_add(test_urls)
print(f"Estimated unique URLs: {hll_dedup.get_count()}")

Redis Sorted Set去重(带时间戳)

class RedisSortedSetDeduplicator:
    """
    基于Redis Sorted Set的去重器(支持时间戳)
    """
    
    def __init__(self, redis_client, zset_key: str = 'dedup_zset', expiration_days: int = 7):
        self.redis_client = redis_client
        self.zset_key = zset_key
        self.expiration_seconds = expiration_days * 24 * 3600  # 转换为秒
    
    def add(self, item: str, timestamp: float = None) -> bool:
        """
        添加元素,返回是否为新元素
        
        Args:
            item: 要添加的元素
            timestamp: 时间戳,默认为当前时间
            
        Returns:
            True: 新元素,False: 重复元素
        """
        if timestamp is None:
            timestamp = time.time()
        
        item_hash = hashlib.md5(item.encode()).hexdigest()
        
        # 检查是否已存在
        exists = bool(self.redis_client.zscore(self.zset_key, item_hash))
        
        if not exists:
            # 添加新元素
            self.redis_client.zadd(self.zset_key, {item_hash: timestamp})
            # 设置过期时间
            self.redis_client.expire(self.zset_key, self.expiration_seconds)
            return True
        
        return False
    
    def contains(self, item: str) -> bool:
        """
        检查元素是否存在
        """
        item_hash = hashlib.md5(item.encode()).hexdigest()
        score = self.redis_client.zscore(self.zset_key, item_hash)
        return score is not None
    
    def get_recent_items(self, count: int = 100) -> List[tuple]:
        """
        获取最近添加的项目
        
        Returns:
            [(item_hash, timestamp), ...]
        """
        items = self.redis_client.zrevrange(self.zset_key, 0, count - 1, withscores=True)
        return [(item.decode(), score) for item, score in items]
    
    def cleanup_old_items(self, older_than_days: int = 30):
        """
        清理指定天数之前的旧项目
        """
        cutoff_time = time.time() - (older_than_days * 24 * 3600)
        self.redis_client.zremrangebyscore(self.zset_key, 0, cutoff_time)

# 使用示例
zset_dedup = RedisSortedSetDeduplicator(redis_client, 'crawler_zset_dedup', expiration_days=14)

urls = [
    'https://example.com/page1',
    'https://example.com/page2',
    'https://example.com/page1',  # 重复
    'https://example.com/page3'
]

for url in urls:
    is_new = zset_dedup.add(url)
    print(f"URL: {url}, Is New: {is_new}")

# 获取最近添加的项目
recent = zset_dedup.get_recent_items(10)
print(f"Recent items: {len(recent)}")

分布式锁机制

在分布式环境中,需要使用分布式锁来协调多个节点的访问。

基础分布式锁

import uuid
import time
from typing import Optional

class BasicDistributedLock:
    """
    基础分布式锁实现
    """
    
    def __init__(self, redis_client, lock_key: str, default_timeout: int = 30):
        self.redis_client = redis_client
        self.lock_key = lock_key
        self.default_timeout = default_timeout
        self.identifier = str(uuid.uuid4())  # 锁标识符,用于释放自己的锁
    
    def acquire(self, timeout: int = None) -> bool:
        """
        获取锁
        
        Args:
            timeout: 锁超时时间(秒)
            
        Returns:
            True: 成功获取锁,False: 获取锁失败
        """
        if timeout is None:
            timeout = self.default_timeout
        
        end_time = time.time() + timeout
        
        while time.time() < end_time:
            # 使用SET命令的NX和EX选项原子性地设置锁
            if self.redis_client.set(
                self.lock_key,
                self.identifier,
                nx=True,  # 仅在键不存在时设置
                ex=timeout  # 设置过期时间
            ):
                return True
            
            # 等待一小段时间再重试
            time.sleep(0.01)
        
        return False
    
    def release(self) -> bool:
        """
        释放锁
        
        Returns:
            True: 成功释放,False: 释放失败(可能是因为锁已经被其他客户端获取)
        """
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        
        result = self.redis_client.eval(
            lua_script,
            1,  # 一个KEYS参数
            self.lock_key,  # KEYS[1]
            self.identifier  # ARGV[1]
        )
        
        return bool(result)
    
    def is_locked(self) -> bool:
        """
        检查锁是否被持有
        """
        return self.redis_client.exists(self.lock_key) == 1

# 使用示例
lock = BasicDistributedLock(redis_client, 'crawler_lock', default_timeout=30)

if lock.acquire(timeout=10):
    try:
        print("Lock acquired, performing critical operation...")
        time.sleep(2)  # 模拟临界区操作
        print("Critical operation completed.")
    finally:
        lock.release()
        print("Lock released.")
else:
    print("Failed to acquire lock")

Redlock算法实现

import random
import time
from typing import List

class Redlock:
    """
    Redlock算法实现(Redis分布式锁算法)
    """
    
    def __init__(self, redis_clients: List[redis.Redis], retry_count: int = 3, retry_delay: float = 200):
        """
        初始化Redlock
        
        Args:
            redis_clients: Redis客户端列表(用于多个Redis实例)
            retry_count: 重试次数
            retry_delay: 重试延迟(毫秒)
        """
        self.servers = redis_clients
        self.quorum = len(redis_clients) // 2 + 1  # 多数节点
        self.retry_count = retry_count
        self.retry_delay = retry_delay / 1000.0  # 转换为秒
        self.clock_drift_factor = 0.01  # 时钟漂移因子
        self.lock_timeout = 10000  # 默认锁超时时间(毫秒)
    
    def lock(self, resource: str, ttl: int = None) -> Optional[str]:
        """
        获取分布式锁
        
        Args:
            resource: 要锁定的资源
            ttl: 锁的生存时间(毫秒)
            
        Returns:
            锁标识符,如果获取失败则返回None
        """
        if ttl is None:
            ttl = self.lock_timeout
        
        token = str(uuid.uuid4())
        retry = 0
        
        while retry < self.retry_count:
            retry += 1
            servers_locked = 0
            start_time = time.time() * 1000  # 毫秒时间戳
            
            # 尝试在所有服务器上获取锁
            for server in self.servers:
                try:
                    if server.set(resource, token, nx=True, px=ttl):
                        servers_locked += 1
                except:
                    pass  # 忽略单个服务器的错误
            
            # 计算经过的时间(考虑网络延迟和时钟漂移)
            drift = int(self.clock_drift_factor * ttl) + 2
            elapsed_time = (time.time() * 1000) - start_time - drift
            
            validity_time = ttl - elapsed_time
            
            # 检查是否在多数服务器上获取了锁,并且锁仍然有效
            if servers_locked >= self.quorum and validity_time > 0:
                return token
            else:
                # 释放已获取的锁
                self.unlock(resource, token)
            
            # 等待后重试
            time.sleep(random.uniform(0, self.retry_delay))
        
        return None
    
    def unlock(self, resource: str, token: str) -> bool:
        """
        释放分布式锁
        
        Args:
            resource: 要解锁的资源
            token: 锁标识符
            
        Returns:
            True: 成功释放,False: 释放失败
        """
        servers_unlocked = 0
        
        for server in self.servers:
            try:
                lua_script = """
                if redis.call("get", KEYS[1]) == ARGV[1] then
                    return redis.call("del", KEYS[1])
                else
                    return 0
                end
                """
                
                result = server.eval(lua_script, 1, resource, token)
                servers_unlocked += 1 if result else 0
            except:
                pass  # 忽略单个服务器的错误
        
        return servers_unlocked >= self.quorum

# 使用示例(需要多个Redis实例)
# redis_servers = [
#     redis.Redis(host='localhost', port=6379, db=0),
#     redis.Redis(host='localhost', port=6380, db=0),
#     redis.Redis(host='localhost', port=6381, db=0)
# ]
# redlock = Redlock(redis_servers)

# token = redlock.lock('my_resource', ttl=10000)
# if token:
#     try:
#         print("Redlock acquired, performing critical operation...")
#         time.sleep(2)
#         print("Critical operation completed.")
#     finally:
#         redlock.unlock('my_resource', token)
#         print("Redlock released.")
# else:
#     print("Failed to acquire redlock")

可重入分布式锁

class ReentrantDistributedLock:
    """
    可重入分布式锁
    """
    
    def __init__(self, redis_client, lock_key: str, owner_id: str = None):
        self.redis_client = redis_client
        self.lock_key = lock_key
        self.owner_id = owner_id or str(uuid.uuid4())
        self.reentrancy_key = f"{lock_key}:reentrancy:{self.owner_id}"
        self.timeout = 30
    
    def acquire(self) -> bool:
        """
        获取锁(可重入)
        """
        # 检查是否已经持有锁
        current_owner = self.redis_client.get(self.lock_key)
        if current_owner and current_owner.decode() == self.owner_id:
            # 已经持有锁,增加重入计数
            self.redis_client.incr(self.reentrancy_key)
            return True
        
        # 尝试获取锁
        if self.redis_client.set(
            self.lock_key,
            self.owner_id,
            nx=True,
            ex=self.timeout
        ):
            # 成功获取锁,设置重入计数为1
            self.redis_client.set(self.reentrancy_key, 1)
            self.redis_client.expire(self.reentrancy_key, self.timeout)
            return True
        
        return False
    
    def release(self) -> bool:
        """
        释放锁(可重入)
        """
        # 检查是否持有锁
        current_owner = self.redis_client.get(self.lock_key)
        if not current_owner or current_owner.decode() != self.owner_id:
            return False
        
        # 减少重入计数
        reentrancy_count = self.redis_client.decr(self.reentrancy_key)
        
        if reentrancy_count <= 0:
            # 重入计数为0,真正释放锁
            self.redis_client.delete(self.lock_key)
            self.redis_client.delete(self.reentrancy_key)
        
        return True
    
    def is_held_by_current_owner(self) -> bool:
        """
        检查锁是否被当前所有者持有
        """
        current_owner = self.redis_client.get(self.lock_key)
        return current_owner and current_owner.decode() == self.owner_id

# 使用示例
reentrant_lock = ReentrantDistributedLock(redis_client, 'reentrant_lock', 'client_1')

def recursive_function(depth=0):
    if depth >= 3:
        return
    
    if reentrant_lock.acquire():
        print(f"Acquired lock at depth {depth}")
        recursive_function(depth + 1)
        reentrant_lock.release()
        print(f"Released lock at depth {depth}")

recursive_function()

一致性哈希算法

一致性哈希用于在分布式系统中均匀分配请求到不同节点。

一致性哈希实现

import hashlib
import bisect
from typing import List, Dict, Optional

class ConsistentHashRing:
    """
    一致性哈希环实现
    """
    
    def __init__(self, nodes: List[str] = None, replicas: int = 150):
        """
        初始化一致性哈希环
        
        Args:
            nodes: 节点列表
            replicas: 每个节点的虚拟节点数量
        """
        self.replicas = replicas
        self.ring = {}  # 哈希环,存储哈希值到节点的映射
        self.sorted_keys = []  # 排序的哈希值列表
        
        if nodes:
            for node in nodes:
                self.add_node(node)
    
    def add_node(self, node: str):
        """
        添加节点到哈希环
        """
        for i in range(self.replicas):
            virtual_key = f"{node}:{i}"
            hash_key = self._hash(virtual_key)
            self.ring[hash_key] = node
            self.sorted_keys.append(hash_key)
        
        self.sorted_keys.sort()
    
    def remove_node(self, node: str):
        """
        从哈希环移除节点
        """
        for i in range(self.replicas):
            virtual_key = f"{node}:{i}"
            hash_key = self._hash(virtual_key)
            if hash_key in self.ring:
                del self.ring[hash_key]
                self.sorted_keys.remove(hash_key)
        
        self.sorted_keys.sort()
    
    def get_node(self, key: str) -> Optional[str]:
        """
        根据键获取对应的节点
        
        Args:
            key: 查询键
            
        Returns:
            对应的节点,如果哈希环为空则返回None
        """
        if not self.ring:
            return None
        
        hash_key = self._hash(key)
        
        # 在排序的键列表中找到第一个大于等于hash_key的位置
        idx = bisect.bisect_right(self.sorted_keys, hash_key)
        
        if idx == len(self.sorted_keys):
            # 如果没找到,返回第一个节点(环形结构)
            idx = 0
        
        return self.ring[self.sorted_keys[idx]]
    
    def _hash(self, key: str) -> int:
        """
        计算哈希值
        """
        return int(hashlib.md5(key.encode()).hexdigest(), 16)
    
    def get_nodes_for_key(self, key: str, count: int) -> List[str]:
        """
        获取键对应的多个节点(用于副本机制)
        
        Args:
            key: 查询键
            count: 需要的节点数量
            
        Returns:
            节点列表
        """
        if not self.ring or count <= 0:
            return []
        
        hash_key = self._hash(key)
        start_idx = bisect.bisect_right(self.sorted_keys, hash_key)
        
        nodes = []
        visited = set()
        
        for i in range(len(self.sorted_keys)):
            idx = (start_idx + i) % len(self.sorted_keys)
            node = self.ring[self.sorted_keys[idx]]
            
            if node not in visited:
                nodes.append(node)
                visited.add(node)
                
                if len(nodes) >= count:
                    break
        
        return nodes

class DistributedScheduler:
    """
    基于一致性哈希的分布式调度器
    """
    
    def __init__(self, nodes: List[str]):
        self.consistent_hash = ConsistentHashRing(nodes)
        self.node_loads = {node: 0 for node in nodes}
        self.task_assignments = {}  # 任务到节点的分配记录
    
    def assign_task(self, task_id: str, task_data: dict = None) -> str:
        """
        分配任务到节点
        
        Args:
            task_id: 任务ID
            task_data: 任务数据
            
        Returns:
            分配的节点
        """
        node = self.consistent_hash.get_node(task_id)
        
        if node:
            # 更新节点负载
            self.node_loads[node] += 1
            # 记录任务分配
            self.task_assignments[task_id] = node
        
        return node
    
    def get_node_for_task(self, task_id: str) -> str:
        """
        获取任务对应的节点
        """
        return self.task_assignments.get(task_id)
    
    def release_task(self, task_id: str):
        """
        释放任务(减少节点负载)
        """
        node = self.task_assignments.get(task_id)
        if node and node in self.node_loads:
            self.node_loads[node] = max(0, self.node_loads[node] - 1)
        
        if task_id in self.task_assignments:
            del self.task_assignments[task_id]
    
    def get_load_distribution(self) -> Dict[str, int]:
        """
        获取负载分布
        """
        return self.node_loads.copy()

# 使用示例
nodes = ['node1', 'node2', 'node3', 'node4']
scheduler = DistributedScheduler(nodes)

# 分配任务
for i in range(100):
    task_id = f"task_{i}"
    assigned_node = scheduler.assign_task(task_id)
    print(f"Task {task_id} assigned to {assigned_node}")

# 查看负载分布
print("Load distribution:", scheduler.get_load_distribution())

任务调度策略

负载均衡调度器

import heapq
import time
from enum import Enum
from typing import Dict, List, Callable, Any

class TaskPriority(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4

class Task:
    """
    任务类
    """
    
    def __init__(self, task_id: str, priority: TaskPriority, task_type: str, 
                 data: dict = None, created_at: float = None):
        self.task_id = task_id
        self.priority = priority
        self.task_type = task_type
        self.data = data or {}
        self.created_at = created_at or time.time()
        self.assigned_node = None
        self.started_at = None
        self.completed_at = None
    
    def __lt__(self, other):
        """
        优先级队列比较方法
        """
        # 优先级数字越大优先级越高,所以使用负数进行比较
        return (-self.priority.value, self.created_at) < (-other.priority.value, other.created_at)

class LoadBalancedScheduler:
    """
    负载均衡调度器
    """
    
    def __init__(self, nodes: List[str]):
        self.nodes = nodes
        self.node_stats = {
            node: {
                'load': 0,
                'active_tasks': 0,
                'completed_tasks': 0,
                'avg_response_time': 0.0,
                'last_activity': time.time()
            } for node in nodes
        }
        self.task_queue = []  # 优先级队列
        self.pending_tasks = {}  # 待处理任务
        self.assigned_tasks = {}  # 已分配任务
        self.task_history = []  # 任务历史
    
    def submit_task(self, task: Task) -> bool:
        """
        提交任务
        """
        heapq.heappush(self.task_queue, task)
        self.pending_tasks[task.task_id] = task
        return True
    
    def assign_next_task(self) -> tuple:
        """
        分配下一个任务
        
        Returns:
            (任务, 节点) 或 (None, None)
        """
        if not self.task_queue:
            return None, None
        
        # 获取最优节点
        best_node = self._select_best_node()
        if not best_node:
            return None, None
        
        # 获取最高优先级的任务
        task = heapq.heappop(self.task_queue)
        del self.pending_tasks[task.task_id]
        
        # 分配任务
        task.assigned_node = best_node
        task.started_at = time.time()
        self.assigned_tasks[task.task_id] = task
        
        # 更新节点统计
        self.node_stats[best_node]['load'] += 1
        self.node_stats[best_node]['active_tasks'] += 1
        self.node_stats[best_node]['last_activity'] = time.time()
        
        return task, best_node
    
    def _select_best_node(self) -> str:
        """
        选择最佳节点(基于负载均衡)
        """
        # 策略1: 选择负载最低的节点
        min_load = float('inf')
        best_node = None
        
        for node, stats in self.node_stats.items():
            current_load = stats['active_tasks']
            if current_load < min_load:
                min_load = current_load
                best_node = node
        
        return best_node
    
    def mark_task_completed(self, task_id: str, node: str, response_time: float = None):
        """
        标记任务完成
        """
        if task_id in self.assigned_tasks:
            task = self.assigned_tasks[task_id]
            task.completed_at = time.time()
            
            # 更新节点统计
            self.node_stats[node]['active_tasks'] -= 1
            self.node_stats[node]['completed_tasks'] += 1
            
            if response_time:
                # 更新平均响应时间(使用指数移动平均)
                old_avg = self.node_stats[node]['avg_response_time']
                alpha = 0.1
                new_avg = alpha * response_time + (1 - alpha) * old_avg
                self.node_stats[node]['avg_response_time'] = new_avg
            
            # 移除分配记录
            del self.assigned_tasks[task_id]
            
            # 添加到历史记录
            self.task_history.append(task)
    
    def get_node_status(self) -> Dict[str, dict]:
        """
        获取节点状态
        """
        return self.node_stats.copy()
    
    def get_queue_size(self) -> int:
        """
        获取队列大小
        """
        return len(self.task_queue)

# 使用示例
scheduler = LoadBalancedScheduler(['node1', 'node2', 'node3'])

# 提交不同类型的任务
tasks = [
    Task('task1', TaskPriority.HIGH, 'crawl', {'url': 'https://example.com'}),
    Task('task2', TaskPriority.MEDIUM, 'parse', {'data': 'html_content'}),
    Task('task3', TaskPriority.LOW, 'index', {'doc_id': '123'}),
    Task('task4', TaskPriority.CRITICAL, 'crawl', {'url': 'https://priority-site.com'})
]

for task in tasks:
    scheduler.submit_task(task)

# 分配任务
while scheduler.get_queue_size() > 0:
    task, node = scheduler.assign_next_task()
    if task and node:
        print(f"Assigned {task.task_id} to {node}")
        # 模拟任务完成
        scheduler.mark_task_completed(task.task_id, node, 2.5)

print("Node status:", scheduler.get_node_status())

智能调度策略

import statistics
from collections import defaultdict, deque

class IntelligentScheduler:
    """
    智能调度器 - 基于历史性能的调度
    """
    
    def __init__(self, nodes: List[str]):
        self.nodes = nodes
        self.node_performance = defaultdict(lambda: deque(maxlen=100))  # 性能历史
        self.task_type_preferences = defaultdict(dict)  # 任务类型偏好
        self.current_loads = {node: 0 for node in nodes}
        self.task_assignment_history = defaultdict(list)
    
    def record_task_performance(self, task_id: str, node: str, task_type: str, 
                              execution_time: float, success: bool = True):
        """
        记录任务执行性能
        """
        # 记录节点性能
        self.node_performance[node].append(execution_time)
        
        # 记录任务类型偏好
        if task_type not in self.task_type_preferences[node]:
            self.task_type_preferences[node][task_type] = []
        self.task_type_preferences[node][task_type].append(execution_time)
        
        # 更新负载
        if success:
            self.current_loads[node] = max(0, self.current_loads[node] - 1)
    
    def predict_execution_time(self, node: str, task_type: str) -> float:
        """
        预测在指定节点执行指定类型任务的执行时间
        """
        if task_type in self.task_type_preferences[node]:
            times = self.task_type_preferences[node][task_type]
            if times:
                return statistics.mean(times)
        
        # 如果没有历史数据,返回节点平均执行时间
        if self.node_performance[node]:
            return statistics.mean(self.node_performance[node])
        
        return 1.0  # 默认执行时间
    
    def calculate_node_score(self, node: str, task_type: str) -> float:
        """
        计算节点分数(分数越低越好)
        """
        # 基于预测执行时间的分数
        predicted_time = self.predict_execution_time(node, task_type)
        
        # 基于当前负载的分数
        load_factor = self.current_loads[node] / 10.0  # 假设最大负载为10
        
        # 综合分数
        score = predicted_time * (1 + load_factor)
        
        return score
    
    def assign_task_intelligent(self, task_type: str) -> str:
        """
        智能分配任务到最优节点
        """
        best_node = None
        best_score = float('inf')
        
        for node in self.nodes:
            score = self.calculate_node_score(node, task_type)
            if score < best_score:
                best_score = score
                best_node = node
        
        if best_node:
            self.current_loads[best_node] += 1
        
        return best_node

# 使用示例
intelligent_scheduler = IntelligentScheduler(['node1', 'node2', 'node3'])

# 模拟任务执行历史
execution_history = [
    ('node1', 'crawl', 1.2, True),
    ('node1', 'crawl', 1.1, True),
    ('node1', 'parse', 0.8, True),
    ('node2', 'crawl', 1.5, True),
    ('node2', 'parse', 0.6, True),
    ('node3', 'index', 2.0, True),
    ('node3', 'index', 1.8, True),
]

for node, task_type, exec_time, success in execution_history:
    intelligent_scheduler.record_task_performance(f'task_{len(intelligent_scheduler.node_performance[node])}', 
                                                  node, task_type, exec_time, success)

# 智能分配新任务
new_task_type = 'crawl'
assigned_node = intelligent_scheduler.assign_task_intelligent(new_task_type)
print(f"Intelligently assigned {new_task_type} task to {assigned_node}")

# 检查预测时间
for node in ['node1', 'node2', 'node3']:
    pred_time = intelligent_scheduler.predict_execution_time(node, 'crawl')
    print(f"Predicted time for crawl on {node}: {pred_time:.2f}s")

去重性能优化

批量去重优化

import asyncio
from typing import Iterator, Tuple

class BatchDeduplicator:
    """
    批量去重优化器
    """
    
    def __init__(self, deduplicator, batch_size: int = 1000):
        self.deduplicator = deduplicator
        self.batch_size = batch_size
        self.local_cache = set()  # 本地缓存,减少远程调用
        self.cache_size_limit = 10000
    
    def add_batch(self, items: List[str]) -> List[Tuple[str, bool]]:
        """
        批量添加项目
        
        Returns:
            [(item, is_new), ...]
        """
        results = []
        remaining_items = []
        
        # 首先检查本地缓存
        for item in items:
            item_hash = hashlib.md5(item.encode()).hexdigest()
            
            if item_hash in self.local_cache:
                results.append((item, False))
            else:
                remaining_items.append(item)
        
        # 对剩余项目进行去重检查
        if remaining_items:
            batch_results = self._check_remote_batch(remaining_items)
            results.extend(batch_results)
            
            # 更新本地缓存
            for item, is_new in batch_results:
                if is_new:
                    item_hash = hashlib.md5(item.encode()).hexdigest()
                    self.local_cache.add(item_hash)
                    
                    # 控制缓存大小
                    if len(self.local_cache) > self.cache_size_limit:
                        # 随机移除一半缓存项
                        items_to_remove = random.sample(list(self.local_cache), 
                                                      len(self.local_cache) // 2)
                        for item_hash in items_to_remove:
                            self.local_cache.discard(item_hash)
        
        return results
    
    def _check_remote_batch(self, items: List[str]) -> List[Tuple[str, bool]]:
        """
        批量检查远程去重器
        """
        results = []
        
        # 分批处理,避免单次请求过大
        for i in range(0, len(items), self.batch_size):
            batch = items[i:i + self.batch_size]
            batch_results = self._process_batch(batch)
            results.extend(batch_results)
        
        return results
    
    def _process_batch(self, batch: List[str]) -> List[Tuple[str, bool]]:
        """
        处理单个批次
        """
        results = []
        
        for item in batch:
            is_new = self.deduplicator.add(item)
            results.append((item, is_new))
        
        return results

# 使用示例
batch_dedup = BatchDeduplicator(dedup, batch_size=100)

test_items = [f"https://example.com/page{i}" for i in range(1000)]
# 添加一些重复项
test_items.extend([f"https://example.com/page{i}" for i in range(100)])

batch_results = batch_dedup.add_batch(test_items)
new_count = sum(1 for _, is_new in batch_results if is_new)
print(f"Processed {len(test_items)} items, {new_count} were new")

内存优化的去重器

class MemoryEfficientDeduplicator:
    """
    内存优化的去重器
    """
    
    def __init__(self, max_memory_mb: float = 100):
        self.max_memory_bytes = max_memory_mb * 1024 * 1024
        self.small_items = set()  # 小项目直接存储
        self.bloom_filter = BloomFilter(capacity=100000, error_rate=0.01)
        self.memory_usage = 0
        self.item_sizes = {}  # 记录项目大小
    
    def add(self, item: str) -> bool:
        """
        添加项目
        """
        item_hash = hashlib.md5(item.encode()).hexdigest()
        
        # 检查是否已存在
        if item_hash in self.small_items or self.bloom_filter.contains(item_hash):
            return False
        
        # 计算项目大小
        item_size = len(item.encode('utf-8'))
        
        # 对于小项目,直接存储
        if item_size < 100:  # 小于100字节
            self.small_items.add(item_hash)
            self.item_sizes[item_hash] = item_size
            self.memory_usage += item_size
        else:
            # 对于大项目,只存储在布隆过滤器中
            self.bloom_filter.add(item_hash)
        
        # 如果内存使用超过限制,清理一些数据
        if self.memory_usage > self.max_memory_bytes * 0.8:  # 80%阈值
            self._cleanup_memory()
        
        return True
    
    def contains(self, item: str) -> bool:
        """
        检查项目是否存在
        """
        item_hash = hashlib.md5(item.encode()).hexdigest()
        return item_hash in self.small_items or self.bloom_filter.contains(item_hash)
    
    def _cleanup_memory(self):
        """
        清理内存
        """
        # 移除最大的一些项目
        sorted_items = sorted(
            [(size, item_hash) for item_hash, size in self.item_sizes.items()],
            reverse=True
        )
        
        items_to_remove = 0
        for size, item_hash in sorted_items:
            if self.memory_usage <= self.max_memory_bytes * 0.7:  # 降到70%
                break
            
            self.small_items.discard(item_hash)
            if item_hash in self.item_sizes:
                self.memory_usage -= self.item_sizes[item_hash]
                del self.item_sizes[item_hash]
            
            items_to_remove += 1
        
        print(f"Cleaned up {items_to_remove} items to reduce memory usage")

# 使用示例
mem_efficient_dedup = MemoryEfficientDeduplicator(max_memory_mb=50)

urls = [f"https://example.com/page{i}" for i in range(1000)]
for url in urls:
    is_new = mem_efficient_dedup.add(url)
    if not is_new:
        print(f"Duplicate found: {url}")

容错与一致性

多层去重机制

class MultiLayerDeduplicator:
    """
    多层去重机制
    """
    
    def __init__(self, redis_client):
        self.local_bloom = BloomFilter(capacity=10000, error_rate=0.01)
        self.redis_bloom = RedisBloomFilter(redis_client, 'multi_layer_bloom', 
                                          capacity=1000000, error_rate=0.001)
        self.redis_set = RedisSetDeduplicator(redis_client, 'multi_layer_set')
        
        # 统计信息
        self.stats = {
            'local_hits': 0,
            'redis_bloom_hits': 0,
            'redis_set_hits': 0,
            'misses': 0,
            'false_positives_avoided': 0
        }
    
    def is_duplicate(self, item: str) -> bool:
        """
        检查是否为重复项(多层检查)
        """
        # 第一层:本地布隆过滤器(最快)
        item_hash = hashlib.md5(item.encode()).hexdigest()
        if self.local_bloom.contains(item_hash):
            self.stats['local_hits'] += 1
            return True
        
        # 第二层:Redis布隆过滤器(较快,可能存在误判)
        if self.redis_bloom.contains(item_hash):
            # 可能是重复项,需要第三层确认
            if self.redis_set.contains(item_hash):
                self.stats['redis_set_hits'] += 1
                self.stats['false_positives_avoided'] += 1
                return True
            else:
                # 布隆过滤器误判,实际不是重复项
                pass
        
        # 第三层:Redis Set精确检查
        is_duplicate = self.redis_set.contains(item_hash)
        if is_duplicate:
            self.stats['redis_set_hits'] += 1
        else:
            self.stats['misses'] += 1
            # 添加到所有层
            self.local_bloom.add(item_hash)
            self.redis_bloom.add(item_hash)
            self.redis_set.add(item_hash)
        
        return is_duplicate
    
    def get_stats(self) -> dict:
        """
        获取统计信息
        """
        total_checks = sum([
            self.stats['local_hits'],
            self.stats['redis_bloom_hits'],
            self.stats['redis_set_hits'],
            self.stats['misses']
        ])
        
        hit_rate = (self.stats['local_hits'] + self.stats['redis_set_hits']) / total_checks if total_checks > 0 else 0
        
        return {
            **self.stats,
            'hit_rate': hit_rate,
            'total_checks': total_checks
        }

# 使用示例
multi_dedup = MultiLayerDeduplicator(redis_client)

test_urls = [
    'https://example.com/page1',
    'https://example.com/page2',
    'https://example.com/page1',  # 重复
    'https://example.com/page3',
    'https://example.com/page2',  # 重复
] * 100  # 重复测试

for url in test_urls:
    is_dup = multi_dedup.is_duplicate(url)
    if not is_dup:
        print(f"New URL: {url}")

print("Statistics:", multi_dedup.get_stats())

一致性检查机制

class ConsistencyChecker:
    """
    一致性检查机制
    """
    
    def __init__(self, redis_client, primary_dedup, backup_dedup):
        self.redis_client = redis_client
        self.primary_dedup = primary_dedup
        self.backup_dedup = backup_dedup
        self.inconsistency_log = []
        self.check_interval = 300  # 5分钟检查一次
    
    def periodic_check(self):
        """
        定期一致性检查
        """
        inconsistencies = []
        
        # 检查两套去重系统的差异
        # 这里简化实现,实际应用中可能需要更复杂的比较逻辑
        
        # 检查Redis中的数据一致性
        try:
            # 检查布隆过滤器和Set的一致性
            bloom_count = self._estimate_bloom_count()
            set_count = self.primary_dedup.get_size() if hasattr(self.primary_dedup, 'get_size') else 0
            
            if abs(bloom_count - set_count) > set_count * 0.1:  # 差异超过10%
                inconsistencies.append({
                    'type': 'count_mismatch',
                    'bloom_count': bloom_count,
                    'set_count': set_count,
                    'timestamp': time.time()
                })
        
        except Exception as e:
            inconsistencies.append({
                'type': 'check_error',
                'error': str(e),
                'timestamp': time.time()
            })
        
        if inconsistencies:
            self.inconsistency_log.extend(inconsistencies)
            # 可以触发告警或自动修复机制
            print(f"Found {len(inconsistencies)} inconsistencies")
        
        return inconsistencies
    
    def _estimate_bloom_count(self) -> int:
        """
        估算布隆过滤器中的元素数量
        """
        # 这里是一个简化的估算方法
        # 实际应用中可能需要更精确的算法
        return 0
    
    def repair_inconsistency(self, inconsistency_type: str):
        """
        修复不一致性
        """
        if inconsistency_type == 'count_mismatch':
            # 重建去重数据结构
            print("Rebuilding deduplication structures...")
            # 实际实现中可能需要重新处理数据
        elif inconsistency_type == 'data_corruption':
            # 数据修复逻辑
            print("Repairing corrupted data...")

# 使用示例
# consistency_checker = ConsistencyChecker(redis_client, dedup, backup_dedup)

监控与调试

去重性能监控

class DeduplicationMonitor:
    """
    去重性能监控器
    """
    
    def __init__(self):
        self.metrics = {
            'requests_total': 0,
            'duplicates_found': 0,
            'unique_items_added': 0,
            'false_positive_estimates': 0,
            'response_times': deque(maxlen=1000),
            'memory_usage': 0,
            'start_time': time.time()
        }
        self.performance_history = deque(maxlen=100)
    
    def record_request(self, is_duplicate: bool, response_time: float = 0):
        """
        记录请求指标
        """
        self.metrics['requests_total'] += 1
        
        if is_duplicate:
            self.metrics['duplicates_found'] += 1
        else:
            self.metrics['unique_items_added'] += 1
        
        if response_time > 0:
            self.metrics['response_times'].append(response_time)
    
    def get_current_metrics(self) -> dict:
        """
        获取当前指标
        """
        if self.metrics['response_times']:
            avg_response_time = statistics.mean(self.metrics['response_times'])
            p95_response_time = statistics.quantiles(self.metrics['response_times'], n=20)[-1]  # 95th percentile
        else:
            avg_response_time = 0
            p95_response_time = 0
        
        duplicate_rate = (
            self.metrics['duplicates_found'] / self.metrics['requests_total'] 
            if self.metrics['requests_total'] > 0 else 0
        )
        
        current_time = time.time()
        uptime = current_time - self.metrics['start_time']
        
        return {
            **self.metrics,
            'duplicate_rate': duplicate_rate,
            'avg_response_time': avg_response_time,
            'p95_response_time': p95_response_time,
            'uptime_seconds': uptime,
            'requests_per_second': self.metrics['requests_total'] / uptime if uptime > 0 else 0
        }
    
    def export_for_prometheus(self) -> str:
        """
        导出Prometheus格式的指标
        """
        metrics = self.get_current_metrics()
        prometheus_format = f"""
# HELP dedup_requests_total Total number of deduplication requests
# TYPE dedup_requests_total counter
dedup_requests_total {metrics['requests_total']}

# HELP dedup_duplicates_found Total number of duplicates found
# TYPE dedup_duplicates_found counter
dedup_duplicates_found {metrics['duplicates_found']}

# HELP dedup_unique_items_added Total number of unique items added
# TYPE dedup_unique_items_added counter
dedup_unique_items_added {metrics['unique_items_added']}

# HELP dedup_duplicate_rate Duplicate detection rate
# TYPE dedup_duplicate_rate gauge
dedup_duplicate_rate {metrics['duplicate_rate']}

# HELP dedup_avg_response_time Average response time in seconds
# TYPE dedup_avg_response_time gauge
dedup_avg_response_time {metrics['avg_response_time']}

# HELP dedup_uptime_seconds Uptime in seconds
# TYPE dedup_uptime_seconds gauge
dedup_uptime_seconds {metrics['uptime_seconds']}
"""
        return prometheus_format

# 使用示例
monitor = DeduplicationMonitor()

# 模拟一些请求
import random
for i in range(1000):
    is_duplicate = random.random() < 0.3  # 30% 重复率
    response_time = random.uniform(0.01, 0.1)  # 10-100ms 响应时间
    monitor.record_request(is_duplicate, response_time)

print("Current metrics:", monitor.get_current_metrics())
print("\nPrometheus format:")
print(monitor.export_for_prometheus())

常见问题与解决方案

问题1: 布隆过滤器误判率过高

现象: 去重效果不佳,很多新项目被误判为重复 解决方案:

# 调整布隆过滤器参数
def optimize_bloom_filter(expected_items, desired_error_rate):
    """
    优化布隆过滤器参数
    """
    # 计算所需的位数组大小
    bit_array_size = int(-(expected_items * math.log(desired_error_rate)) / (math.log(2) ** 2))
    
    # 计算哈希函数数量
    hash_count = int((bit_array_size / expected_items) * math.log(2))
    
    print(f"Recommended bit array size: {bit_array_size}")
    print(f"Recommended hash count: {hash_count}")
    
    return BloomFilter(expected_items, desired_error_rate)

# 使用更大的容量和更低的误判率
optimized_bf = optimize_bloom_filter(capacity=10000000, error_rate=0.0001)

问题2: 分布式锁竞争激烈

现象: 大量客户端竞争同一把锁,性能下降 解决方案:

# 使用分段锁策略
class SegmentedDistributedLock:
    """
    分段分布式锁,减少竞争
    """
    
    def __init__(self, redis_client, base_key: str, num_segments: int = 100):
        self.redis_client = redis_client
        self.base_key = base_key
        self.num_segments = num_segments
    
    def get_segment_key(self, resource: str) -> str:
        """
        根据资源获取对应的锁段
        """
        hash_val = hash(resource) % self.num_segments
        return f"{self.base_key}:segment:{hash_val}"
    
    def acquire_for_resource(self, resource: str, timeout: int = 30) -> bool:
        """
        为特定资源获取锁
        """
        segment_key = self.get_segment_key(resource)
        lock = BasicDistributedLock(self.redis_client, segment_key, timeout)
        return lock.acquire(timeout)
    
    def release_for_resource(self, resource: str) -> bool:
        """
        释放特定资源的锁
        """
        segment_key = self.get_segment_key(resource)
        lock = BasicDistributedLock(self.redis_client, segment_key)
        return lock.release()

问题3: 内存使用过高

现象: 去重系统内存占用过大 解决方案:

# 实现内存使用监控和自动清理
class MemoryManagedDeduplicator:
    """
    内存管理的去重器
    """
    
    def __init__(self, max_memory_mb: float = 100):
        self.max_memory_bytes = max_memory_mb * 1024 * 1024
        self.deduplicator = RedisSetDeduplicator(redis_client)
        self.size_estimator = RedisHyperLogLogDeduplicator(redis_client)
        self.cleanup_threshold = 0.8  # 80%阈值
    
    def add_with_memory_check(self, item: str) -> bool:
        """
        添加项目并检查内存使用
        """
        result = self.deduplicator.add(item)
        
        # 估算当前内存使用
        estimated_count = self.size_estimator.get_count()
        estimated_memory = estimated_count * 100  # 假设每个项目约100字节
        
        if estimated_memory > self.max_memory_bytes * self.cleanup_threshold:
            print(f"Memory usage high ({estimated_memory/1024/1024:.2f}MB), considering cleanup")
            # 可以实现数据老化策略
        
        return result

最佳实践总结

设计原则

  1. 分层去重: 结合本地缓存、布隆过滤器和精确存储
  2. 性能优先: 优先使用快速的去重方法,逐步深入
  3. 容错设计: 考虑单点故障,设计备份方案
  4. 监控完备: 实现全面的监控和告警机制

性能优化策略

  1. 批量处理: 减少网络调用次数
  2. 本地缓存: 缓存热点数据
  3. 异步处理: 避免阻塞主线程
  4. 参数调优: 根据实际负载调整参数

安全考虑

  1. 访问控制: 限制对去重数据的访问
  2. 数据保护: 加密敏感的去重标识
  3. 审计日志: 记录去重操作日志
  4. 容量规划: 防止存储溢出

💡 核心要点: 分布式去重和调度是大规模爬虫系统的核心技术。通过合理的算法选择、架构设计和性能优化,可以实现高效、可靠的去重和调度系统。

SEO优化策略

  1. 关键词优化: 在标题、内容中合理布局"分布式去重", "布隆过滤器", "分布式锁", "一致性哈希", "任务调度"等关键词
  2. 内容结构: 使用清晰的标题层级(H1-H6),便于搜索引擎理解内容结构
  3. 内部链接: 建立与其他相关教程的内部链接,提升页面权重
  4. 元数据优化: 在页面头部包含描述性的标题、描述和标签

🔗 相关教程推荐

🏷️ 标签云: 分布式去重 布隆过滤器 分布式锁 一致性哈希 任务调度 Redis 爬虫优化 性能优化 容错设计 监控系统