#分布式去重与调度 - 高效去重算法与分布式协调机制详解
📂 所属阶段:第五阶段 — 战力升级(分布式与进阶篇)
🔗 相关章节:Scrapy-Redis分布式架构 · Spider中间件深度定制 · 大规模爬虫优化
#目录
#分布式去重的重要性
在分布式爬虫系统中,去重是一个至关重要的环节。由于多个爬虫节点同时工作,很容易出现重复抓取同一URL的情况,这不仅浪费网络资源,还可能导致数据质量问题。
#去重挑战分析
"""
分布式去重面临的主要挑战:
1. 数据一致性:
- 多节点间的去重数据同步
- 避免重复抓取同一URL
- 保证去重数据的准确性
2. 性能要求:
- 高并发下的快速去重判断
- 低延迟的去重操作
- 内存和存储效率
3. 可扩展性:
- 支持节点动态增减
- 处理海量URL去重
- 负载均衡考虑
"""#去重策略对比
"""
常见的去重策略及其特点:
1. 本地去重:
- 优点:速度快,内存占用少
- 缺点:无法处理跨节点重复
- 适用:单机爬虫或小规模分布式
2. 中心化去重:
- 优点:全局一致性好
- 缺点:单点故障风险,性能瓶颈
- 适用:中小规模爬虫集群
3. 分布式去重:
- 优点:高可用,高性能,可扩展
- 缺点:实现复杂,一致性难保证
- 适用:大规模分布式爬虫
"""#布隆过滤器详解
布隆过滤器是一种空间效率极高的概率型数据结构,用于判断一个元素是否存在于集合中。
#基础布隆过滤器实现
import hashlib
import math
from bitarray import bitarray
class BloomFilter:
"""
布隆过滤器实现
"""
def __init__(self, capacity: int, error_rate: float = 0.01):
"""
初始化布隆过滤器
Args:
capacity: 预期插入的元素数量
error_rate: 误判率(假阳性率)
"""
self.capacity = capacity
self.error_rate = error_rate
# 计算位数组大小和哈希函数数量
self.bit_array_size = self._get_bit_array_size()
self.hash_count = self._get_hash_count()
# 初始化位数组
self.bit_array = bitarray(self.bit_array_size)
self.bit_array.setall(0)
# 统计信息
self.inserted_count = 0
def _get_bit_array_size(self) -> int:
"""
计算位数组大小
公式: m = -(n * ln(p)) / (ln(2)^2)
"""
import math
m = -(self.capacity * math.log(self.error_rate)) / (math.log(2) ** 2)
return int(m)
def _get_hash_count(self) -> int:
"""
计算哈希函数数量
公式: k = (m/n) * ln(2)
"""
import math
k = (self.bit_array_size / self.capacity) * math.log(2)
return int(k)
def _hash(self, item: str, seed: int) -> int:
"""
生成哈希值(使用不同种子生成多个哈希函数)
"""
import mmh3
return mmh3.hash(item, seed) % self.bit_array_size
def add(self, item: str):
"""
添加元素到布隆过滤器
"""
for i in range(self.hash_count):
index = self._hash(item, i)
self.bit_array[index] = 1
self.inserted_count += 1
def contains(self, item: str) -> bool:
"""
检查元素是否可能存在于集合中
注意:可能存在假阳性,但不会有假阴性
"""
for i in range(self.hash_count):
index = self._hash(item, i)
if not self.bit_array[index]:
return False
return True
def estimate_size(self) -> int:
"""
估计集合中元素的实际数量
使用公式: n = -m/k * ln(1 - X/m)
其中 X 是位数组中1的数量
"""
x = self.bit_array.count(1)
if x == 0:
return 0
import math
estimated = -self.bit_array_size / self.hash_count * math.log(1 - x / self.bit_array_size)
return int(estimated)
def load_factor(self) -> float:
"""
计算负载因子
"""
return self.inserted_count / self.capacity if self.capacity > 0 else 0
# 使用示例
bf = BloomFilter(capacity=1000000, error_rate=0.01)
urls = [
'https://example.com/page1',
'https://example.com/page2',
'https://example.com/page3'
]
for url in urls:
if not bf.contains(url):
print(f"Processing: {url}")
bf.add(url)
else:
print(f"Duplicate detected: {url}")#Redis布隆过滤器
import redis
import mmh3
import math
from bitarray import bitarray
class RedisBloomFilter:
"""
基于Redis的布隆过滤器
"""
def __init__(self, redis_client, key_prefix: str, capacity: int = 1000000, error_rate: float = 0.01):
"""
初始化Redis布隆过滤器
Args:
redis_client: Redis客户端实例
key_prefix: Redis键前缀
capacity: 预期容量
error_rate: 误判率
"""
self.redis_client = redis_client
self.key_prefix = key_prefix
self.capacity = capacity
self.error_rate = error_rate
# 计算参数
self.bit_array_size = int(-(capacity * math.log(error_rate)) / (math.log(2) ** 2))
self.hash_count = int((self.bit_array_size / capacity) * math.log(2))
# Redis位图键
self.bitmap_key = f"{key_prefix}:bloom_bitmap"
def _get_hashes(self, item: str) -> list:
"""
获取多个哈希值
"""
hashes = []
for i in range(self.hash_count):
hash_val = mmh3.hash(item, i) % self.bit_array_size
hashes.append(hash_val)
return hashes
def add(self, item: str):
"""
添加元素到布隆过滤器
"""
hashes = self._get_hashes(item)
pipe = self.redis_client.pipeline()
for hash_val in hashes:
pipe.setbit(self.bitmap_key, hash_val, 1)
pipe.execute()
def contains(self, item: str) -> bool:
"""
检查元素是否存在
"""
hashes = self._get_hashes(item)
for hash_val in hashes:
if not self.redis_client.getbit(self.bitmap_key, hash_val):
return False
return True
def clear(self):
"""
清空布隆过滤器
"""
self.redis_client.delete(self.bitmap_key)
def info(self) -> dict:
"""
获取布隆过滤器信息
"""
# 获取位数组中1的数量
# 注意:这里只是估算,实际需要遍历整个位图
info = {
'capacity': self.capacity,
'error_rate': self.error_rate,
'bit_array_size': self.bit_array_size,
'hash_count': self.hash_count,
'bitmap_key': self.bitmap_key
}
return info
# 使用示例
redis_client = redis.Redis(host='localhost', port=6379, db=0)
redis_bf = RedisBloomFilter(redis_client, 'crawler_bloom', capacity=1000000, error_rate=0.001)
# 测试去重
test_urls = [
'https://example.com/test1',
'https://example.com/test2',
'https://example.com/test1' # 重复
]
for url in test_urls:
if not redis_bf.contains(url):
print(f"Processing: {url}")
redis_bf.add(url)
else:
print(f"Duplicate detected: {url}")#高级布隆过滤器特性
import time
from typing import Tuple
class AdvancedBloomFilter:
"""
高级布隆过滤器,支持计数和过期功能
"""
def __init__(self, capacity: int, error_rate: float = 0.01, expiration_time: int = 86400):
"""
初始化高级布隆过滤器
Args:
capacity: 预期容量
error_rate: 误判率
expiration_time: 过期时间(秒)
"""
self.capacity = capacity
self.error_rate = error_rate
self.expiration_time = expiration_time
# 基础布隆过滤器
self.main_bf = BloomFilter(capacity, error_rate)
# 时间戳记录
self.timestamps = {}
# 本地缓存(减少Redis访问)
self.local_cache = {}
self.cache_size_limit = 10000
def add(self, item: str, timestamp: float = None):
"""
添加元素并记录时间戳
"""
if timestamp is None:
timestamp = time.time()
self.main_bf.add(item)
self.timestamps[item] = timestamp
# 更新本地缓存
self._update_local_cache(item, True)
def contains(self, item: str) -> Tuple[bool, bool]:
"""
检查元素是否存在,并返回是否可能过期
Returns:
(存在标志, 是否可能过期)
"""
# 检查本地缓存
if item in self.local_cache:
cached_time, cached_exists = self.local_cache[item]
if time.time() - cached_time < 300: # 5分钟缓存
return cached_exists, False
# 检查布隆过滤器
exists = self.main_bf.contains(item)
# 检查过期
expired = False
if exists and item in self.timestamps:
if time.time() - self.timestamps[item] > self.expiration_time:
expired = True
# 更新本地缓存
self._update_local_cache(item, exists)
return exists, expired
def _update_local_cache(self, item: str, exists: bool):
"""
更新本地缓存
"""
if len(self.local_cache) >= self.cache_size_limit:
# 清理一半缓存
keys_to_remove = list(self.local_cache.keys())[:self.cache_size_limit // 2]
for key in keys_to_remove:
del self.local_cache[key]
self.local_cache[item] = (time.time(), exists)
def cleanup_expired(self):
"""
清理过期数据
"""
current_time = time.time()
expired_keys = []
for item, timestamp in self.timestamps.items():
if current_time - timestamp > self.expiration_time:
expired_keys.append(item)
# 从布隆过滤器中移除过期数据(实际上无法移除,这里只是清理时间戳)
for key in expired_keys:
del self.timestamps[key]
# 从本地缓存中移除
if key in self.local_cache:
del self.local_cache[key]
# 使用示例
adv_bf = AdvancedBloomFilter(capacity=100000, error_rate=0.01, expiration_time=3600) # 1小时过期
# 添加URL
adv_bf.add('https://example.com/page1')
time.sleep(2)
adv_bf.add('https://example.com/page2')
# 检查是否存在
exists, expired = adv_bf.contains('https://example.com/page1')
print(f"URL exists: {exists}, Expired: {expired}")#Redis分布式去重
Redis提供了多种数据结构来实现高效的分布式去重。
#Redis Set去重
import redis
import hashlib
from typing import List, Union
class RedisSetDeduplicator:
"""
基于Redis Set的去重器
"""
def __init__(self, redis_client, set_key: str = 'dedup_set'):
self.redis_client = redis_client
self.set_key = set_key
def add(self, item: str) -> bool:
"""
添加元素,返回是否为新元素
Returns:
True: 新元素,False: 重复元素
"""
# 生成元素的哈希值作为集合成员
item_hash = hashlib.md5(item.encode()).hexdigest()
added = self.redis_client.sadd(self.set_key, item_hash)
return bool(added)
def contains(self, item: str) -> bool:
"""
检查元素是否存在
"""
item_hash = hashlib.md5(item.encode()).hexdigest()
return bool(self.redis_client.sismember(self.set_key, item_hash))
def batch_add(self, items: List[str]) -> int:
"""
批量添加元素
Returns:
成功添加的新元素数量
"""
item_hashes = [hashlib.md5(item.encode()).hexdigest() for item in items]
pipe = self.redis_client.pipeline()
added_count = 0
for item_hash in item_hashes:
pipe.sadd(self.set_key, item_hash)
results = pipe.execute()
added_count = sum(results)
return added_count
def get_size(self) -> int:
"""
获取去重集合大小
"""
return self.redis_client.scard(self.set_key)
def clear(self):
"""
清空去重集合
"""
self.redis_client.delete(self.set_key)
# 使用示例
redis_client = redis.Redis(host='localhost', port=6379, db=0)
dedup = RedisSetDeduplicator(redis_client, 'crawler_dedup_set')
urls = [
'https://example.com/page1',
'https://example.com/page2',
'https://example.com/page1', # 重复
'https://example.com/page3'
]
for url in urls:
is_new = dedup.add(url)
print(f"URL: {url}, Is New: {is_new}, Set Size: {dedup.get_size()}")#Redis HyperLogLog去重
class RedisHyperLogLogDeduplicator:
"""
基于Redis HyperLogLog的去重器(用于估算基数)
"""
def __init__(self, redis_client, hll_key: str = 'dedup_hll'):
self.redis_client = redis_client
self.hll_key = hll_key
def add(self, item: str):
"""
添加元素到HyperLogLog
"""
item_hash = hashlib.md5(item.encode()).hexdigest()
self.redis_client.pfadd(self.hll_key, item_hash)
def batch_add(self, items: List[str]):
"""
批量添加元素
"""
item_hashes = [hashlib.md5(item.encode()).hexdigest() for item in items]
self.redis_client.pfadd(self.hll_key, *item_hashes)
def get_count(self) -> int:
"""
获取去重后的元素数量估算值
"""
return self.redis_client.pfcount(self.hll_key)
def merge(self, other_hll_key: str):
"""
合并另一个HyperLogLog
"""
self.redis_client.pfmerge(self.hll_key, self.hll_key, other_hll_key)
# 使用示例
hll_dedup = RedisHyperLogLogDeduplicator(redis_client, 'crawler_hll_dedup')
# 添加大量URL
test_urls = [f'https://example.com/page{i}' for i in range(1000)]
# 添加一些重复URL
test_urls.extend([f'https://example.com/page{i}' for i in range(100)])
hll_dedup.batch_add(test_urls)
print(f"Estimated unique URLs: {hll_dedup.get_count()}")#Redis Sorted Set去重(带时间戳)
class RedisSortedSetDeduplicator:
"""
基于Redis Sorted Set的去重器(支持时间戳)
"""
def __init__(self, redis_client, zset_key: str = 'dedup_zset', expiration_days: int = 7):
self.redis_client = redis_client
self.zset_key = zset_key
self.expiration_seconds = expiration_days * 24 * 3600 # 转换为秒
def add(self, item: str, timestamp: float = None) -> bool:
"""
添加元素,返回是否为新元素
Args:
item: 要添加的元素
timestamp: 时间戳,默认为当前时间
Returns:
True: 新元素,False: 重复元素
"""
if timestamp is None:
timestamp = time.time()
item_hash = hashlib.md5(item.encode()).hexdigest()
# 检查是否已存在
exists = bool(self.redis_client.zscore(self.zset_key, item_hash))
if not exists:
# 添加新元素
self.redis_client.zadd(self.zset_key, {item_hash: timestamp})
# 设置过期时间
self.redis_client.expire(self.zset_key, self.expiration_seconds)
return True
return False
def contains(self, item: str) -> bool:
"""
检查元素是否存在
"""
item_hash = hashlib.md5(item.encode()).hexdigest()
score = self.redis_client.zscore(self.zset_key, item_hash)
return score is not None
def get_recent_items(self, count: int = 100) -> List[tuple]:
"""
获取最近添加的项目
Returns:
[(item_hash, timestamp), ...]
"""
items = self.redis_client.zrevrange(self.zset_key, 0, count - 1, withscores=True)
return [(item.decode(), score) for item, score in items]
def cleanup_old_items(self, older_than_days: int = 30):
"""
清理指定天数之前的旧项目
"""
cutoff_time = time.time() - (older_than_days * 24 * 3600)
self.redis_client.zremrangebyscore(self.zset_key, 0, cutoff_time)
# 使用示例
zset_dedup = RedisSortedSetDeduplicator(redis_client, 'crawler_zset_dedup', expiration_days=14)
urls = [
'https://example.com/page1',
'https://example.com/page2',
'https://example.com/page1', # 重复
'https://example.com/page3'
]
for url in urls:
is_new = zset_dedup.add(url)
print(f"URL: {url}, Is New: {is_new}")
# 获取最近添加的项目
recent = zset_dedup.get_recent_items(10)
print(f"Recent items: {len(recent)}")#分布式锁机制
在分布式环境中,需要使用分布式锁来协调多个节点的访问。
#基础分布式锁
import uuid
import time
from typing import Optional
class BasicDistributedLock:
"""
基础分布式锁实现
"""
def __init__(self, redis_client, lock_key: str, default_timeout: int = 30):
self.redis_client = redis_client
self.lock_key = lock_key
self.default_timeout = default_timeout
self.identifier = str(uuid.uuid4()) # 锁标识符,用于释放自己的锁
def acquire(self, timeout: int = None) -> bool:
"""
获取锁
Args:
timeout: 锁超时时间(秒)
Returns:
True: 成功获取锁,False: 获取锁失败
"""
if timeout is None:
timeout = self.default_timeout
end_time = time.time() + timeout
while time.time() < end_time:
# 使用SET命令的NX和EX选项原子性地设置锁
if self.redis_client.set(
self.lock_key,
self.identifier,
nx=True, # 仅在键不存在时设置
ex=timeout # 设置过期时间
):
return True
# 等待一小段时间再重试
time.sleep(0.01)
return False
def release(self) -> bool:
"""
释放锁
Returns:
True: 成功释放,False: 释放失败(可能是因为锁已经被其他客户端获取)
"""
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis_client.eval(
lua_script,
1, # 一个KEYS参数
self.lock_key, # KEYS[1]
self.identifier # ARGV[1]
)
return bool(result)
def is_locked(self) -> bool:
"""
检查锁是否被持有
"""
return self.redis_client.exists(self.lock_key) == 1
# 使用示例
lock = BasicDistributedLock(redis_client, 'crawler_lock', default_timeout=30)
if lock.acquire(timeout=10):
try:
print("Lock acquired, performing critical operation...")
time.sleep(2) # 模拟临界区操作
print("Critical operation completed.")
finally:
lock.release()
print("Lock released.")
else:
print("Failed to acquire lock")#Redlock算法实现
import random
import time
from typing import List
class Redlock:
"""
Redlock算法实现(Redis分布式锁算法)
"""
def __init__(self, redis_clients: List[redis.Redis], retry_count: int = 3, retry_delay: float = 200):
"""
初始化Redlock
Args:
redis_clients: Redis客户端列表(用于多个Redis实例)
retry_count: 重试次数
retry_delay: 重试延迟(毫秒)
"""
self.servers = redis_clients
self.quorum = len(redis_clients) // 2 + 1 # 多数节点
self.retry_count = retry_count
self.retry_delay = retry_delay / 1000.0 # 转换为秒
self.clock_drift_factor = 0.01 # 时钟漂移因子
self.lock_timeout = 10000 # 默认锁超时时间(毫秒)
def lock(self, resource: str, ttl: int = None) -> Optional[str]:
"""
获取分布式锁
Args:
resource: 要锁定的资源
ttl: 锁的生存时间(毫秒)
Returns:
锁标识符,如果获取失败则返回None
"""
if ttl is None:
ttl = self.lock_timeout
token = str(uuid.uuid4())
retry = 0
while retry < self.retry_count:
retry += 1
servers_locked = 0
start_time = time.time() * 1000 # 毫秒时间戳
# 尝试在所有服务器上获取锁
for server in self.servers:
try:
if server.set(resource, token, nx=True, px=ttl):
servers_locked += 1
except:
pass # 忽略单个服务器的错误
# 计算经过的时间(考虑网络延迟和时钟漂移)
drift = int(self.clock_drift_factor * ttl) + 2
elapsed_time = (time.time() * 1000) - start_time - drift
validity_time = ttl - elapsed_time
# 检查是否在多数服务器上获取了锁,并且锁仍然有效
if servers_locked >= self.quorum and validity_time > 0:
return token
else:
# 释放已获取的锁
self.unlock(resource, token)
# 等待后重试
time.sleep(random.uniform(0, self.retry_delay))
return None
def unlock(self, resource: str, token: str) -> bool:
"""
释放分布式锁
Args:
resource: 要解锁的资源
token: 锁标识符
Returns:
True: 成功释放,False: 释放失败
"""
servers_unlocked = 0
for server in self.servers:
try:
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = server.eval(lua_script, 1, resource, token)
servers_unlocked += 1 if result else 0
except:
pass # 忽略单个服务器的错误
return servers_unlocked >= self.quorum
# 使用示例(需要多个Redis实例)
# redis_servers = [
# redis.Redis(host='localhost', port=6379, db=0),
# redis.Redis(host='localhost', port=6380, db=0),
# redis.Redis(host='localhost', port=6381, db=0)
# ]
# redlock = Redlock(redis_servers)
# token = redlock.lock('my_resource', ttl=10000)
# if token:
# try:
# print("Redlock acquired, performing critical operation...")
# time.sleep(2)
# print("Critical operation completed.")
# finally:
# redlock.unlock('my_resource', token)
# print("Redlock released.")
# else:
# print("Failed to acquire redlock")#可重入分布式锁
class ReentrantDistributedLock:
"""
可重入分布式锁
"""
def __init__(self, redis_client, lock_key: str, owner_id: str = None):
self.redis_client = redis_client
self.lock_key = lock_key
self.owner_id = owner_id or str(uuid.uuid4())
self.reentrancy_key = f"{lock_key}:reentrancy:{self.owner_id}"
self.timeout = 30
def acquire(self) -> bool:
"""
获取锁(可重入)
"""
# 检查是否已经持有锁
current_owner = self.redis_client.get(self.lock_key)
if current_owner and current_owner.decode() == self.owner_id:
# 已经持有锁,增加重入计数
self.redis_client.incr(self.reentrancy_key)
return True
# 尝试获取锁
if self.redis_client.set(
self.lock_key,
self.owner_id,
nx=True,
ex=self.timeout
):
# 成功获取锁,设置重入计数为1
self.redis_client.set(self.reentrancy_key, 1)
self.redis_client.expire(self.reentrancy_key, self.timeout)
return True
return False
def release(self) -> bool:
"""
释放锁(可重入)
"""
# 检查是否持有锁
current_owner = self.redis_client.get(self.lock_key)
if not current_owner or current_owner.decode() != self.owner_id:
return False
# 减少重入计数
reentrancy_count = self.redis_client.decr(self.reentrancy_key)
if reentrancy_count <= 0:
# 重入计数为0,真正释放锁
self.redis_client.delete(self.lock_key)
self.redis_client.delete(self.reentrancy_key)
return True
def is_held_by_current_owner(self) -> bool:
"""
检查锁是否被当前所有者持有
"""
current_owner = self.redis_client.get(self.lock_key)
return current_owner and current_owner.decode() == self.owner_id
# 使用示例
reentrant_lock = ReentrantDistributedLock(redis_client, 'reentrant_lock', 'client_1')
def recursive_function(depth=0):
if depth >= 3:
return
if reentrant_lock.acquire():
print(f"Acquired lock at depth {depth}")
recursive_function(depth + 1)
reentrant_lock.release()
print(f"Released lock at depth {depth}")
recursive_function()#一致性哈希算法
一致性哈希用于在分布式系统中均匀分配请求到不同节点。
#一致性哈希实现
import hashlib
import bisect
from typing import List, Dict, Optional
class ConsistentHashRing:
"""
一致性哈希环实现
"""
def __init__(self, nodes: List[str] = None, replicas: int = 150):
"""
初始化一致性哈希环
Args:
nodes: 节点列表
replicas: 每个节点的虚拟节点数量
"""
self.replicas = replicas
self.ring = {} # 哈希环,存储哈希值到节点的映射
self.sorted_keys = [] # 排序的哈希值列表
if nodes:
for node in nodes:
self.add_node(node)
def add_node(self, node: str):
"""
添加节点到哈希环
"""
for i in range(self.replicas):
virtual_key = f"{node}:{i}"
hash_key = self._hash(virtual_key)
self.ring[hash_key] = node
self.sorted_keys.append(hash_key)
self.sorted_keys.sort()
def remove_node(self, node: str):
"""
从哈希环移除节点
"""
for i in range(self.replicas):
virtual_key = f"{node}:{i}"
hash_key = self._hash(virtual_key)
if hash_key in self.ring:
del self.ring[hash_key]
self.sorted_keys.remove(hash_key)
self.sorted_keys.sort()
def get_node(self, key: str) -> Optional[str]:
"""
根据键获取对应的节点
Args:
key: 查询键
Returns:
对应的节点,如果哈希环为空则返回None
"""
if not self.ring:
return None
hash_key = self._hash(key)
# 在排序的键列表中找到第一个大于等于hash_key的位置
idx = bisect.bisect_right(self.sorted_keys, hash_key)
if idx == len(self.sorted_keys):
# 如果没找到,返回第一个节点(环形结构)
idx = 0
return self.ring[self.sorted_keys[idx]]
def _hash(self, key: str) -> int:
"""
计算哈希值
"""
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def get_nodes_for_key(self, key: str, count: int) -> List[str]:
"""
获取键对应的多个节点(用于副本机制)
Args:
key: 查询键
count: 需要的节点数量
Returns:
节点列表
"""
if not self.ring or count <= 0:
return []
hash_key = self._hash(key)
start_idx = bisect.bisect_right(self.sorted_keys, hash_key)
nodes = []
visited = set()
for i in range(len(self.sorted_keys)):
idx = (start_idx + i) % len(self.sorted_keys)
node = self.ring[self.sorted_keys[idx]]
if node not in visited:
nodes.append(node)
visited.add(node)
if len(nodes) >= count:
break
return nodes
class DistributedScheduler:
"""
基于一致性哈希的分布式调度器
"""
def __init__(self, nodes: List[str]):
self.consistent_hash = ConsistentHashRing(nodes)
self.node_loads = {node: 0 for node in nodes}
self.task_assignments = {} # 任务到节点的分配记录
def assign_task(self, task_id: str, task_data: dict = None) -> str:
"""
分配任务到节点
Args:
task_id: 任务ID
task_data: 任务数据
Returns:
分配的节点
"""
node = self.consistent_hash.get_node(task_id)
if node:
# 更新节点负载
self.node_loads[node] += 1
# 记录任务分配
self.task_assignments[task_id] = node
return node
def get_node_for_task(self, task_id: str) -> str:
"""
获取任务对应的节点
"""
return self.task_assignments.get(task_id)
def release_task(self, task_id: str):
"""
释放任务(减少节点负载)
"""
node = self.task_assignments.get(task_id)
if node and node in self.node_loads:
self.node_loads[node] = max(0, self.node_loads[node] - 1)
if task_id in self.task_assignments:
del self.task_assignments[task_id]
def get_load_distribution(self) -> Dict[str, int]:
"""
获取负载分布
"""
return self.node_loads.copy()
# 使用示例
nodes = ['node1', 'node2', 'node3', 'node4']
scheduler = DistributedScheduler(nodes)
# 分配任务
for i in range(100):
task_id = f"task_{i}"
assigned_node = scheduler.assign_task(task_id)
print(f"Task {task_id} assigned to {assigned_node}")
# 查看负载分布
print("Load distribution:", scheduler.get_load_distribution())#任务调度策略
#负载均衡调度器
import heapq
import time
from enum import Enum
from typing import Dict, List, Callable, Any
class TaskPriority(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
class Task:
"""
任务类
"""
def __init__(self, task_id: str, priority: TaskPriority, task_type: str,
data: dict = None, created_at: float = None):
self.task_id = task_id
self.priority = priority
self.task_type = task_type
self.data = data or {}
self.created_at = created_at or time.time()
self.assigned_node = None
self.started_at = None
self.completed_at = None
def __lt__(self, other):
"""
优先级队列比较方法
"""
# 优先级数字越大优先级越高,所以使用负数进行比较
return (-self.priority.value, self.created_at) < (-other.priority.value, other.created_at)
class LoadBalancedScheduler:
"""
负载均衡调度器
"""
def __init__(self, nodes: List[str]):
self.nodes = nodes
self.node_stats = {
node: {
'load': 0,
'active_tasks': 0,
'completed_tasks': 0,
'avg_response_time': 0.0,
'last_activity': time.time()
} for node in nodes
}
self.task_queue = [] # 优先级队列
self.pending_tasks = {} # 待处理任务
self.assigned_tasks = {} # 已分配任务
self.task_history = [] # 任务历史
def submit_task(self, task: Task) -> bool:
"""
提交任务
"""
heapq.heappush(self.task_queue, task)
self.pending_tasks[task.task_id] = task
return True
def assign_next_task(self) -> tuple:
"""
分配下一个任务
Returns:
(任务, 节点) 或 (None, None)
"""
if not self.task_queue:
return None, None
# 获取最优节点
best_node = self._select_best_node()
if not best_node:
return None, None
# 获取最高优先级的任务
task = heapq.heappop(self.task_queue)
del self.pending_tasks[task.task_id]
# 分配任务
task.assigned_node = best_node
task.started_at = time.time()
self.assigned_tasks[task.task_id] = task
# 更新节点统计
self.node_stats[best_node]['load'] += 1
self.node_stats[best_node]['active_tasks'] += 1
self.node_stats[best_node]['last_activity'] = time.time()
return task, best_node
def _select_best_node(self) -> str:
"""
选择最佳节点(基于负载均衡)
"""
# 策略1: 选择负载最低的节点
min_load = float('inf')
best_node = None
for node, stats in self.node_stats.items():
current_load = stats['active_tasks']
if current_load < min_load:
min_load = current_load
best_node = node
return best_node
def mark_task_completed(self, task_id: str, node: str, response_time: float = None):
"""
标记任务完成
"""
if task_id in self.assigned_tasks:
task = self.assigned_tasks[task_id]
task.completed_at = time.time()
# 更新节点统计
self.node_stats[node]['active_tasks'] -= 1
self.node_stats[node]['completed_tasks'] += 1
if response_time:
# 更新平均响应时间(使用指数移动平均)
old_avg = self.node_stats[node]['avg_response_time']
alpha = 0.1
new_avg = alpha * response_time + (1 - alpha) * old_avg
self.node_stats[node]['avg_response_time'] = new_avg
# 移除分配记录
del self.assigned_tasks[task_id]
# 添加到历史记录
self.task_history.append(task)
def get_node_status(self) -> Dict[str, dict]:
"""
获取节点状态
"""
return self.node_stats.copy()
def get_queue_size(self) -> int:
"""
获取队列大小
"""
return len(self.task_queue)
# 使用示例
scheduler = LoadBalancedScheduler(['node1', 'node2', 'node3'])
# 提交不同类型的任务
tasks = [
Task('task1', TaskPriority.HIGH, 'crawl', {'url': 'https://example.com'}),
Task('task2', TaskPriority.MEDIUM, 'parse', {'data': 'html_content'}),
Task('task3', TaskPriority.LOW, 'index', {'doc_id': '123'}),
Task('task4', TaskPriority.CRITICAL, 'crawl', {'url': 'https://priority-site.com'})
]
for task in tasks:
scheduler.submit_task(task)
# 分配任务
while scheduler.get_queue_size() > 0:
task, node = scheduler.assign_next_task()
if task and node:
print(f"Assigned {task.task_id} to {node}")
# 模拟任务完成
scheduler.mark_task_completed(task.task_id, node, 2.5)
print("Node status:", scheduler.get_node_status())#智能调度策略
import statistics
from collections import defaultdict, deque
class IntelligentScheduler:
"""
智能调度器 - 基于历史性能的调度
"""
def __init__(self, nodes: List[str]):
self.nodes = nodes
self.node_performance = defaultdict(lambda: deque(maxlen=100)) # 性能历史
self.task_type_preferences = defaultdict(dict) # 任务类型偏好
self.current_loads = {node: 0 for node in nodes}
self.task_assignment_history = defaultdict(list)
def record_task_performance(self, task_id: str, node: str, task_type: str,
execution_time: float, success: bool = True):
"""
记录任务执行性能
"""
# 记录节点性能
self.node_performance[node].append(execution_time)
# 记录任务类型偏好
if task_type not in self.task_type_preferences[node]:
self.task_type_preferences[node][task_type] = []
self.task_type_preferences[node][task_type].append(execution_time)
# 更新负载
if success:
self.current_loads[node] = max(0, self.current_loads[node] - 1)
def predict_execution_time(self, node: str, task_type: str) -> float:
"""
预测在指定节点执行指定类型任务的执行时间
"""
if task_type in self.task_type_preferences[node]:
times = self.task_type_preferences[node][task_type]
if times:
return statistics.mean(times)
# 如果没有历史数据,返回节点平均执行时间
if self.node_performance[node]:
return statistics.mean(self.node_performance[node])
return 1.0 # 默认执行时间
def calculate_node_score(self, node: str, task_type: str) -> float:
"""
计算节点分数(分数越低越好)
"""
# 基于预测执行时间的分数
predicted_time = self.predict_execution_time(node, task_type)
# 基于当前负载的分数
load_factor = self.current_loads[node] / 10.0 # 假设最大负载为10
# 综合分数
score = predicted_time * (1 + load_factor)
return score
def assign_task_intelligent(self, task_type: str) -> str:
"""
智能分配任务到最优节点
"""
best_node = None
best_score = float('inf')
for node in self.nodes:
score = self.calculate_node_score(node, task_type)
if score < best_score:
best_score = score
best_node = node
if best_node:
self.current_loads[best_node] += 1
return best_node
# 使用示例
intelligent_scheduler = IntelligentScheduler(['node1', 'node2', 'node3'])
# 模拟任务执行历史
execution_history = [
('node1', 'crawl', 1.2, True),
('node1', 'crawl', 1.1, True),
('node1', 'parse', 0.8, True),
('node2', 'crawl', 1.5, True),
('node2', 'parse', 0.6, True),
('node3', 'index', 2.0, True),
('node3', 'index', 1.8, True),
]
for node, task_type, exec_time, success in execution_history:
intelligent_scheduler.record_task_performance(f'task_{len(intelligent_scheduler.node_performance[node])}',
node, task_type, exec_time, success)
# 智能分配新任务
new_task_type = 'crawl'
assigned_node = intelligent_scheduler.assign_task_intelligent(new_task_type)
print(f"Intelligently assigned {new_task_type} task to {assigned_node}")
# 检查预测时间
for node in ['node1', 'node2', 'node3']:
pred_time = intelligent_scheduler.predict_execution_time(node, 'crawl')
print(f"Predicted time for crawl on {node}: {pred_time:.2f}s")#去重性能优化
#批量去重优化
import asyncio
from typing import Iterator, Tuple
class BatchDeduplicator:
"""
批量去重优化器
"""
def __init__(self, deduplicator, batch_size: int = 1000):
self.deduplicator = deduplicator
self.batch_size = batch_size
self.local_cache = set() # 本地缓存,减少远程调用
self.cache_size_limit = 10000
def add_batch(self, items: List[str]) -> List[Tuple[str, bool]]:
"""
批量添加项目
Returns:
[(item, is_new), ...]
"""
results = []
remaining_items = []
# 首先检查本地缓存
for item in items:
item_hash = hashlib.md5(item.encode()).hexdigest()
if item_hash in self.local_cache:
results.append((item, False))
else:
remaining_items.append(item)
# 对剩余项目进行去重检查
if remaining_items:
batch_results = self._check_remote_batch(remaining_items)
results.extend(batch_results)
# 更新本地缓存
for item, is_new in batch_results:
if is_new:
item_hash = hashlib.md5(item.encode()).hexdigest()
self.local_cache.add(item_hash)
# 控制缓存大小
if len(self.local_cache) > self.cache_size_limit:
# 随机移除一半缓存项
items_to_remove = random.sample(list(self.local_cache),
len(self.local_cache) // 2)
for item_hash in items_to_remove:
self.local_cache.discard(item_hash)
return results
def _check_remote_batch(self, items: List[str]) -> List[Tuple[str, bool]]:
"""
批量检查远程去重器
"""
results = []
# 分批处理,避免单次请求过大
for i in range(0, len(items), self.batch_size):
batch = items[i:i + self.batch_size]
batch_results = self._process_batch(batch)
results.extend(batch_results)
return results
def _process_batch(self, batch: List[str]) -> List[Tuple[str, bool]]:
"""
处理单个批次
"""
results = []
for item in batch:
is_new = self.deduplicator.add(item)
results.append((item, is_new))
return results
# 使用示例
batch_dedup = BatchDeduplicator(dedup, batch_size=100)
test_items = [f"https://example.com/page{i}" for i in range(1000)]
# 添加一些重复项
test_items.extend([f"https://example.com/page{i}" for i in range(100)])
batch_results = batch_dedup.add_batch(test_items)
new_count = sum(1 for _, is_new in batch_results if is_new)
print(f"Processed {len(test_items)} items, {new_count} were new")#内存优化的去重器
class MemoryEfficientDeduplicator:
"""
内存优化的去重器
"""
def __init__(self, max_memory_mb: float = 100):
self.max_memory_bytes = max_memory_mb * 1024 * 1024
self.small_items = set() # 小项目直接存储
self.bloom_filter = BloomFilter(capacity=100000, error_rate=0.01)
self.memory_usage = 0
self.item_sizes = {} # 记录项目大小
def add(self, item: str) -> bool:
"""
添加项目
"""
item_hash = hashlib.md5(item.encode()).hexdigest()
# 检查是否已存在
if item_hash in self.small_items or self.bloom_filter.contains(item_hash):
return False
# 计算项目大小
item_size = len(item.encode('utf-8'))
# 对于小项目,直接存储
if item_size < 100: # 小于100字节
self.small_items.add(item_hash)
self.item_sizes[item_hash] = item_size
self.memory_usage += item_size
else:
# 对于大项目,只存储在布隆过滤器中
self.bloom_filter.add(item_hash)
# 如果内存使用超过限制,清理一些数据
if self.memory_usage > self.max_memory_bytes * 0.8: # 80%阈值
self._cleanup_memory()
return True
def contains(self, item: str) -> bool:
"""
检查项目是否存在
"""
item_hash = hashlib.md5(item.encode()).hexdigest()
return item_hash in self.small_items or self.bloom_filter.contains(item_hash)
def _cleanup_memory(self):
"""
清理内存
"""
# 移除最大的一些项目
sorted_items = sorted(
[(size, item_hash) for item_hash, size in self.item_sizes.items()],
reverse=True
)
items_to_remove = 0
for size, item_hash in sorted_items:
if self.memory_usage <= self.max_memory_bytes * 0.7: # 降到70%
break
self.small_items.discard(item_hash)
if item_hash in self.item_sizes:
self.memory_usage -= self.item_sizes[item_hash]
del self.item_sizes[item_hash]
items_to_remove += 1
print(f"Cleaned up {items_to_remove} items to reduce memory usage")
# 使用示例
mem_efficient_dedup = MemoryEfficientDeduplicator(max_memory_mb=50)
urls = [f"https://example.com/page{i}" for i in range(1000)]
for url in urls:
is_new = mem_efficient_dedup.add(url)
if not is_new:
print(f"Duplicate found: {url}")#容错与一致性
#多层去重机制
class MultiLayerDeduplicator:
"""
多层去重机制
"""
def __init__(self, redis_client):
self.local_bloom = BloomFilter(capacity=10000, error_rate=0.01)
self.redis_bloom = RedisBloomFilter(redis_client, 'multi_layer_bloom',
capacity=1000000, error_rate=0.001)
self.redis_set = RedisSetDeduplicator(redis_client, 'multi_layer_set')
# 统计信息
self.stats = {
'local_hits': 0,
'redis_bloom_hits': 0,
'redis_set_hits': 0,
'misses': 0,
'false_positives_avoided': 0
}
def is_duplicate(self, item: str) -> bool:
"""
检查是否为重复项(多层检查)
"""
# 第一层:本地布隆过滤器(最快)
item_hash = hashlib.md5(item.encode()).hexdigest()
if self.local_bloom.contains(item_hash):
self.stats['local_hits'] += 1
return True
# 第二层:Redis布隆过滤器(较快,可能存在误判)
if self.redis_bloom.contains(item_hash):
# 可能是重复项,需要第三层确认
if self.redis_set.contains(item_hash):
self.stats['redis_set_hits'] += 1
self.stats['false_positives_avoided'] += 1
return True
else:
# 布隆过滤器误判,实际不是重复项
pass
# 第三层:Redis Set精确检查
is_duplicate = self.redis_set.contains(item_hash)
if is_duplicate:
self.stats['redis_set_hits'] += 1
else:
self.stats['misses'] += 1
# 添加到所有层
self.local_bloom.add(item_hash)
self.redis_bloom.add(item_hash)
self.redis_set.add(item_hash)
return is_duplicate
def get_stats(self) -> dict:
"""
获取统计信息
"""
total_checks = sum([
self.stats['local_hits'],
self.stats['redis_bloom_hits'],
self.stats['redis_set_hits'],
self.stats['misses']
])
hit_rate = (self.stats['local_hits'] + self.stats['redis_set_hits']) / total_checks if total_checks > 0 else 0
return {
**self.stats,
'hit_rate': hit_rate,
'total_checks': total_checks
}
# 使用示例
multi_dedup = MultiLayerDeduplicator(redis_client)
test_urls = [
'https://example.com/page1',
'https://example.com/page2',
'https://example.com/page1', # 重复
'https://example.com/page3',
'https://example.com/page2', # 重复
] * 100 # 重复测试
for url in test_urls:
is_dup = multi_dedup.is_duplicate(url)
if not is_dup:
print(f"New URL: {url}")
print("Statistics:", multi_dedup.get_stats())#一致性检查机制
class ConsistencyChecker:
"""
一致性检查机制
"""
def __init__(self, redis_client, primary_dedup, backup_dedup):
self.redis_client = redis_client
self.primary_dedup = primary_dedup
self.backup_dedup = backup_dedup
self.inconsistency_log = []
self.check_interval = 300 # 5分钟检查一次
def periodic_check(self):
"""
定期一致性检查
"""
inconsistencies = []
# 检查两套去重系统的差异
# 这里简化实现,实际应用中可能需要更复杂的比较逻辑
# 检查Redis中的数据一致性
try:
# 检查布隆过滤器和Set的一致性
bloom_count = self._estimate_bloom_count()
set_count = self.primary_dedup.get_size() if hasattr(self.primary_dedup, 'get_size') else 0
if abs(bloom_count - set_count) > set_count * 0.1: # 差异超过10%
inconsistencies.append({
'type': 'count_mismatch',
'bloom_count': bloom_count,
'set_count': set_count,
'timestamp': time.time()
})
except Exception as e:
inconsistencies.append({
'type': 'check_error',
'error': str(e),
'timestamp': time.time()
})
if inconsistencies:
self.inconsistency_log.extend(inconsistencies)
# 可以触发告警或自动修复机制
print(f"Found {len(inconsistencies)} inconsistencies")
return inconsistencies
def _estimate_bloom_count(self) -> int:
"""
估算布隆过滤器中的元素数量
"""
# 这里是一个简化的估算方法
# 实际应用中可能需要更精确的算法
return 0
def repair_inconsistency(self, inconsistency_type: str):
"""
修复不一致性
"""
if inconsistency_type == 'count_mismatch':
# 重建去重数据结构
print("Rebuilding deduplication structures...")
# 实际实现中可能需要重新处理数据
elif inconsistency_type == 'data_corruption':
# 数据修复逻辑
print("Repairing corrupted data...")
# 使用示例
# consistency_checker = ConsistencyChecker(redis_client, dedup, backup_dedup)#监控与调试
#去重性能监控
class DeduplicationMonitor:
"""
去重性能监控器
"""
def __init__(self):
self.metrics = {
'requests_total': 0,
'duplicates_found': 0,
'unique_items_added': 0,
'false_positive_estimates': 0,
'response_times': deque(maxlen=1000),
'memory_usage': 0,
'start_time': time.time()
}
self.performance_history = deque(maxlen=100)
def record_request(self, is_duplicate: bool, response_time: float = 0):
"""
记录请求指标
"""
self.metrics['requests_total'] += 1
if is_duplicate:
self.metrics['duplicates_found'] += 1
else:
self.metrics['unique_items_added'] += 1
if response_time > 0:
self.metrics['response_times'].append(response_time)
def get_current_metrics(self) -> dict:
"""
获取当前指标
"""
if self.metrics['response_times']:
avg_response_time = statistics.mean(self.metrics['response_times'])
p95_response_time = statistics.quantiles(self.metrics['response_times'], n=20)[-1] # 95th percentile
else:
avg_response_time = 0
p95_response_time = 0
duplicate_rate = (
self.metrics['duplicates_found'] / self.metrics['requests_total']
if self.metrics['requests_total'] > 0 else 0
)
current_time = time.time()
uptime = current_time - self.metrics['start_time']
return {
**self.metrics,
'duplicate_rate': duplicate_rate,
'avg_response_time': avg_response_time,
'p95_response_time': p95_response_time,
'uptime_seconds': uptime,
'requests_per_second': self.metrics['requests_total'] / uptime if uptime > 0 else 0
}
def export_for_prometheus(self) -> str:
"""
导出Prometheus格式的指标
"""
metrics = self.get_current_metrics()
prometheus_format = f"""
# HELP dedup_requests_total Total number of deduplication requests
# TYPE dedup_requests_total counter
dedup_requests_total {metrics['requests_total']}
# HELP dedup_duplicates_found Total number of duplicates found
# TYPE dedup_duplicates_found counter
dedup_duplicates_found {metrics['duplicates_found']}
# HELP dedup_unique_items_added Total number of unique items added
# TYPE dedup_unique_items_added counter
dedup_unique_items_added {metrics['unique_items_added']}
# HELP dedup_duplicate_rate Duplicate detection rate
# TYPE dedup_duplicate_rate gauge
dedup_duplicate_rate {metrics['duplicate_rate']}
# HELP dedup_avg_response_time Average response time in seconds
# TYPE dedup_avg_response_time gauge
dedup_avg_response_time {metrics['avg_response_time']}
# HELP dedup_uptime_seconds Uptime in seconds
# TYPE dedup_uptime_seconds gauge
dedup_uptime_seconds {metrics['uptime_seconds']}
"""
return prometheus_format
# 使用示例
monitor = DeduplicationMonitor()
# 模拟一些请求
import random
for i in range(1000):
is_duplicate = random.random() < 0.3 # 30% 重复率
response_time = random.uniform(0.01, 0.1) # 10-100ms 响应时间
monitor.record_request(is_duplicate, response_time)
print("Current metrics:", monitor.get_current_metrics())
print("\nPrometheus format:")
print(monitor.export_for_prometheus())#常见问题与解决方案
#问题1: 布隆过滤器误判率过高
现象: 去重效果不佳,很多新项目被误判为重复 解决方案:
# 调整布隆过滤器参数
def optimize_bloom_filter(expected_items, desired_error_rate):
"""
优化布隆过滤器参数
"""
# 计算所需的位数组大小
bit_array_size = int(-(expected_items * math.log(desired_error_rate)) / (math.log(2) ** 2))
# 计算哈希函数数量
hash_count = int((bit_array_size / expected_items) * math.log(2))
print(f"Recommended bit array size: {bit_array_size}")
print(f"Recommended hash count: {hash_count}")
return BloomFilter(expected_items, desired_error_rate)
# 使用更大的容量和更低的误判率
optimized_bf = optimize_bloom_filter(capacity=10000000, error_rate=0.0001)#问题2: 分布式锁竞争激烈
现象: 大量客户端竞争同一把锁,性能下降 解决方案:
# 使用分段锁策略
class SegmentedDistributedLock:
"""
分段分布式锁,减少竞争
"""
def __init__(self, redis_client, base_key: str, num_segments: int = 100):
self.redis_client = redis_client
self.base_key = base_key
self.num_segments = num_segments
def get_segment_key(self, resource: str) -> str:
"""
根据资源获取对应的锁段
"""
hash_val = hash(resource) % self.num_segments
return f"{self.base_key}:segment:{hash_val}"
def acquire_for_resource(self, resource: str, timeout: int = 30) -> bool:
"""
为特定资源获取锁
"""
segment_key = self.get_segment_key(resource)
lock = BasicDistributedLock(self.redis_client, segment_key, timeout)
return lock.acquire(timeout)
def release_for_resource(self, resource: str) -> bool:
"""
释放特定资源的锁
"""
segment_key = self.get_segment_key(resource)
lock = BasicDistributedLock(self.redis_client, segment_key)
return lock.release()#问题3: 内存使用过高
现象: 去重系统内存占用过大 解决方案:
# 实现内存使用监控和自动清理
class MemoryManagedDeduplicator:
"""
内存管理的去重器
"""
def __init__(self, max_memory_mb: float = 100):
self.max_memory_bytes = max_memory_mb * 1024 * 1024
self.deduplicator = RedisSetDeduplicator(redis_client)
self.size_estimator = RedisHyperLogLogDeduplicator(redis_client)
self.cleanup_threshold = 0.8 # 80%阈值
def add_with_memory_check(self, item: str) -> bool:
"""
添加项目并检查内存使用
"""
result = self.deduplicator.add(item)
# 估算当前内存使用
estimated_count = self.size_estimator.get_count()
estimated_memory = estimated_count * 100 # 假设每个项目约100字节
if estimated_memory > self.max_memory_bytes * self.cleanup_threshold:
print(f"Memory usage high ({estimated_memory/1024/1024:.2f}MB), considering cleanup")
# 可以实现数据老化策略
return result#最佳实践总结
#设计原则
- 分层去重: 结合本地缓存、布隆过滤器和精确存储
- 性能优先: 优先使用快速的去重方法,逐步深入
- 容错设计: 考虑单点故障,设计备份方案
- 监控完备: 实现全面的监控和告警机制
#性能优化策略
- 批量处理: 减少网络调用次数
- 本地缓存: 缓存热点数据
- 异步处理: 避免阻塞主线程
- 参数调优: 根据实际负载调整参数
#安全考虑
- 访问控制: 限制对去重数据的访问
- 数据保护: 加密敏感的去重标识
- 审计日志: 记录去重操作日志
- 容量规划: 防止存储溢出
💡 核心要点: 分布式去重和调度是大规模爬虫系统的核心技术。通过合理的算法选择、架构设计和性能优化,可以实现高效、可靠的去重和调度系统。
#SEO优化策略
- 关键词优化: 在标题、内容中合理布局"分布式去重", "布隆过滤器", "分布式锁", "一致性哈希", "任务调度"等关键词
- 内容结构: 使用清晰的标题层级(H1-H6),便于搜索引擎理解内容结构
- 内部链接: 建立与其他相关教程的内部链接,提升页面权重
- 元数据优化: 在页面头部包含描述性的标题、描述和标签
🔗 相关教程推荐
- Scrapy-Redis分布式架构 - 分布式爬虫实现
- Spider中间件深度定制 - 数据处理中间件
- 大规模爬虫优化 - 性能优化策略
- 数据去重与增量更新 - 本地去重技术
🏷️ 标签云: 分布式去重 布隆过滤器 分布式锁 一致性哈希 任务调度 Redis 爬虫优化 性能优化 容错设计 监控系统

