#Scrapy数据去重与增量更新完全指南 - Redis指纹校验与智能增量抓取技术详解
📂 所属阶段:第二阶段 — 数据流转(数据处理篇)
🔗 相关章节:Pipeline管道实战 · 数据清洗与校验
#目录
#数据去重基础概念
数据去重是爬虫系统中的关键技术,用于识别和过滤重复的数据,避免重复抓取和存储,提高爬虫效率和数据质量。
#数据去重的重要性
"""
数据去重的重要性:
1. 资源节约:避免重复抓取相同内容
2. 存储优化:减少重复数据存储
3. 效率提升:专注抓取新数据
4. 数据质量:保持数据唯一性
5. 成本控制:降低带宽和计算成本
"""#数据去重的挑战
"""
数据去重面临的主要挑战:
1. 大规模数据:海量URL和数据的去重
2. 实时性要求:快速判断是否重复
3. 存储开销:去重标识的存储成本
4. 分布式协调:多节点间的去重协调
5. 误判控制:平衡准确性和性能
"""#增量更新基础概念
增量更新是指只抓取新增或发生变化的数据,而不是重新抓取全部数据,这是大规模爬虫系统的必备功能。
#增量更新的优势
"""
增量更新的主要优势:
1. 效率提升:只处理变化的数据
2. 资源节省:减少网络和计算资源消耗
3. 时效性强:及时获取最新数据
4. 成本控制:降低运营成本
5. 服务友好:减少对目标服务器的压力
"""#增量更新的策略
"""
增量更新的常见策略:
1. 时间戳策略:基于更新时间判断
2. 版本号策略:基于版本号变化判断
3. 内容哈希策略:基于内容变化判断
4. 状态标记策略:基于状态变化判断
5. 混合策略:组合多种策略
"""#Redis指纹去重实现
#基础Redis去重Pipeline
import hashlib
import redis
from scrapy.exceptions import DropItem
class RedisDuplicatesPipeline:
"""
基于Redis的去重Pipeline
"""
def __init__(self, redis_host, redis_port, redis_db, redis_password=None):
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_db = redis_db
self.redis_password = redis_password
# 连接Redis
self.redis_conn = redis.Redis(
host=self.redis_host,
port=self.redis_port,
db=self.redis_db,
password=self.redis_password,
decode_responses=False # 保持字节模式以处理哈希值
)
@classmethod
def from_crawler(cls, crawler):
"""
从crawler实例创建Pipeline
"""
settings = crawler.settings
return cls(
redis_host=settings.get('REDIS_HOST', 'localhost'),
redis_port=settings.getint('REDIS_PORT', 6379),
redis_db=settings.getint('REDIS_DB', 0),
redis_password=settings.get('REDIS_PASSWORD')
)
def process_item(self, item, spider):
"""
处理项目去重
"""
# 生成数据指纹
fingerprint = self.get_fingerprint(item)
# 检查是否已存在
if self.redis_conn.exists(fingerprint):
spider.logger.info(f"Duplicate item detected: {fingerprint}")
raise DropItem(f"Duplicate item with fingerprint: {fingerprint}")
# 存储指纹(设置过期时间)
expire_time = spider.crawler.settings.getint('DUPLICATE_EXPIRE_TIME', 86400 * 7) # 默认7天
self.redis_conn.setex(fingerprint, expire_time, b'1')
return item
def get_fingerprint(self, item):
"""
生成数据指纹
"""
# 基于URL生成指纹(可扩展为基于多个字段)
url = item.get('url', '')
if url:
return hashlib.sha1(url.encode('utf-8')).hexdigest().encode('utf-8')
# 如果没有URL,基于整个item生成指纹
item_str = str(dict(sorted(item.items()))).encode('utf-8')
return hashlib.sha1(item_str).hexdigest().encode('utf-8')
def close_spider(self, spider):
"""
爬虫关闭时的清理工作
"""
self.redis_conn.close()#高级Redis去重Pipeline
import hashlib
import redis
import json
from scrapy.exceptions import DropItem
from itemadapter import ItemAdapter
class AdvancedRedisDuplicatesPipeline:
"""
高级Redis去重Pipeline
"""
def __init__(self, redis_host, redis_port, redis_db, redis_password=None):
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_db = redis_db
self.redis_password = redis_password
self.redis_conn = redis.Redis(
host=self.redis_host,
port=self.redis_port,
db=self.redis_db,
password=self.redis_password,
decode_responses=False
)
@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
return cls(
redis_host=settings.get('REDIS_HOST', 'localhost'),
redis_port=settings.getint('REDIS_PORT', 6379),
redis_db=settings.getint('REDIS_DB', 0),
redis_password=settings.get('REDIS_PASSWORD')
)
def process_item(self, item, spider):
"""
高级去重处理
"""
# 获取去重策略
dedup_strategy = spider.crawler.settings.get('DEDUP_STRATEGY', 'url')
if dedup_strategy == 'url':
fingerprint = self.get_url_fingerprint(item)
elif dedup_strategy == 'content':
fingerprint = self.get_content_fingerprint(item)
elif dedup_strategy == 'custom':
fingerprint = self.get_custom_fingerprint(item, spider)
else:
fingerprint = self.get_default_fingerprint(item)
# 检查是否重复
if self.is_duplicate(fingerprint):
raise DropItem(f"Duplicate item detected: {fingerprint.decode('utf-8')[:16]}...")
# 存储指纹
self.store_fingerprint(fingerprint, item, spider)
return item
def get_url_fingerprint(self, item):
"""
基于URL生成指纹
"""
url = item.get('url', item.get('link', ''))
if url:
return hashlib.sha256(url.encode('utf-8')).hexdigest().encode('utf-8')
return None
def get_content_fingerprint(self, item):
"""
基于内容生成指纹
"""
adapter = ItemAdapter(item)
content_fields = ['title', 'description', 'content', 'text']
content_parts = []
for field in content_fields:
if field in adapter:
value = adapter[field]
if value:
content_parts.append(str(value))
if content_parts:
content = '|'.join(content_parts)
return hashlib.sha256(content.encode('utf-8')).hexdigest().encode('utf-8')
return self.get_default_fingerprint(item)
def get_custom_fingerprint(self, item, spider):
"""
基于自定义字段生成指纹
"""
custom_fields = getattr(spider, 'dedup_fields', ['url'])
adapter = ItemAdapter(item)
parts = []
for field in custom_fields:
if field in adapter:
value = adapter[field]
if value is not None:
parts.append(str(value))
if parts:
content = '|'.join(parts)
return hashlib.sha256(content.encode('utf-8')).hexdigest().encode('utf-8')
return self.get_default_fingerprint(item)
def get_default_fingerprint(self, item):
"""
默认指纹生成方法
"""
item_dict = ItemAdapter(item).asdict()
item_sorted = json.dumps(item_dict, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(item_sorted.encode('utf-8')).hexdigest().encode('utf-8')
def is_duplicate(self, fingerprint):
"""
检查是否为重复数据
"""
return self.redis_conn.exists(fingerprint) == 1
def store_fingerprint(self, fingerprint, item, spider):
"""
存储指纹
"""
# 设置过期时间
expire_time = spider.crawler.settings.getint('DUPLICATE_EXPIRE_TIME', 86400 * 7) # 7天
# 可选:存储额外信息
store_extra = spider.crawler.settings.getbool('STORE_DEDUP_EXTRA_INFO', False)
if store_extra:
extra_info = {
'timestamp': time.time(),
'spider': spider.name,
'item_keys': list(ItemAdapter(item).asdict().keys())
}
self.redis_conn.hset('dedup_info', fingerprint, json.dumps(extra_info))
else:
self.redis_conn.setex(fingerprint, expire_time, b'1')
def close_spider(self, spider):
"""
关闭Redis连接
"""
self.redis_conn.close()#Redis去重优化
import hashlib
import redis
from scrapy.exceptions import DropItem
import time
class OptimizedRedisDuplicatesPipeline:
"""
优化的Redis去重Pipeline
"""
def __init__(self, redis_host, redis_port, redis_db, redis_password=None):
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_db = redis_db
self.redis_password = redis_password
self.redis_conn = redis.Redis(
host=self.redis_host,
port=self.redis_port,
db=self.redis_db,
password=self.redis_password,
decode_responses=False,
socket_keepalive=True,
socket_keepalive_options={},
health_check_interval=30
)
# 批量操作优化
self.batch_size = 100
self.pending_fingerprints = []
# 统计信息
self.duplicate_count = 0
self.processed_count = 0
@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
return cls(
redis_host=settings.get('REDIS_HOST', 'localhost'),
redis_port=settings.getint('REDIS_PORT', 6379),
redis_db=settings.getint('REDIS_DB', 0),
redis_password=settings.get('REDIS_PASSWORD')
)
def process_item(self, item, spider):
"""
处理项目,支持批量操作
"""
fingerprint = self.get_fingerprint(item)
# 检查是否重复
if self.redis_conn.exists(fingerprint):
self.duplicate_count += 1
raise DropItem(f"Duplicate item: {fingerprint.decode('utf-8')[:16]}...")
# 添加到待处理列表
self.pending_fingerprints.append(fingerprint)
# 批量存储
if len(self.pending_fingerprints) >= self.batch_size:
self.flush_pending_fingerprints(spider)
self.processed_count += 1
return item
def flush_pending_fingerprints(self, spider):
"""
批量刷新待处理的指纹
"""
if not self.pending_fingerprints:
return
pipe = self.redis_conn.pipeline()
expire_time = spider.crawler.settings.getint('DUPLICATE_EXPIRE_TIME', 86400 * 7)
for fingerprint in self.pending_fingerprints:
pipe.setex(fingerprint, expire_time, b'1')
pipe.execute()
self.pending_fingerprints.clear()
def get_fingerprint(self, item):
"""
生成指纹
"""
# 可以根据需要实现不同的指纹生成策略
url = item.get('url', '')
if url:
return hashlib.sha256(url.encode('utf-8')).hexdigest().encode('utf-8')
# 默认策略
item_str = str(sorted(item.items())).encode('utf-8')
return hashlib.sha256(item_str).hexdigest().encode('utf-8')
def close_spider(self, spider):
"""
关闭爬虫时处理剩余指纹
"""
self.flush_pending_fingerprints(spider)
spider.logger.info(f"Duplicates pipeline stats: "
f"Processed: {self.processed_count}, "
f"Duplicates: {self.duplicate_count}")
self.redis_conn.close()#布隆过滤器去重
#本地布隆过滤器
import mmh3
from bitarray import bitarray
import math
class BloomFilter:
"""
本地布隆过滤器实现
"""
def __init__(self, capacity, error_rate=0.001):
"""
初始化布隆过滤器
:param capacity: 预期容量
:param error_rate: 期望的误判率
"""
self.capacity = capacity
self.error_rate = error_rate
# 计算位数组大小和哈希函数数量
self.bit_array_size = self.get_size(capacity, error_rate)
self.hash_count = self.get_hash_count(self.bit_array_size, capacity)
# 初始化位数组
self.bit_array = bitarray(self.bit_array_size)
self.bit_array.setall(0)
self.count = 0
def add(self, item):
"""
添加元素到布隆过滤器
"""
digests = []
for i in range(self.hash_count):
digest = mmh3.hash(item, i) % self.bit_array_size
digests.append(digest)
self.bit_array[digest] = True
self.count += 1
def check(self, item):
"""
检查元素是否可能存在
"""
for i in range(self.hash_count):
digest = mmh3.hash(item, i) % self.bit_array_size
if self.bit_array[digest] == False:
return False
return True
def get_size(self, n, p):
"""
计算位数组大小
:param n: 预期插入元素数量
:param p: 误判率
"""
m = -(n * math.log(p)) / (math.log(2) ** 2)
return int(m)
def get_hash_count(self, m, n):
"""
计算哈希函数数量
:param m: 位数组大小
:param n: 预期插入元素数量
"""
k = (m / n) * math.log(2)
return int(k)
class BloomFilterDuplicatesPipeline:
"""
使用布隆过滤器的去重Pipeline
"""
def __init__(self, capacity=1000000, error_rate=0.001):
self.bloom_filter = BloomFilter(capacity, error_rate)
self.count = 0
@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
return cls(
capacity=settings.getint('BLOOM_FILTER_CAPACITY', 1000000),
error_rate=settings.getfloat('BLOOM_FILTER_ERROR_RATE', 0.001)
)
def process_item(self, item, spider):
"""
使用布隆过滤器检查重复
"""
fingerprint = self.get_fingerprint(item)
# 检查是否可能重复
if self.bloom_filter.check(fingerprint):
# 布隆过滤器可能误判,这里可以进一步验证
spider.logger.info(f"Possible duplicate detected: {fingerprint[:16]}...")
# 可以选择抛出异常或记录日志
raise DropItem(f"Possible duplicate: {fingerprint[:16]}...")
# 添加到布隆过滤器
self.bloom_filter.add(fingerprint)
self.count += 1
return item
def get_fingerprint(self, item):
"""
生成指纹字符串
"""
url = item.get('url', '')
if url:
return url
# 基于整个item生成指纹
import hashlib
item_str = str(sorted(item.items()))
return hashlib.sha256(item_str.encode('utf-8')).hexdigest()#Redis布隆过滤器
import redis
from scrapy.exceptions import DropItem
class RedisBloomFilterDuplicatesPipeline:
"""
基于Redis布隆过滤器的去重Pipeline
需要Redis服务器支持RedisBloom模块
"""
def __init__(self, redis_host, redis_port, redis_db, redis_password=None):
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_db = redis_db
self.redis_password = redis_password
self.redis_conn = redis.Redis(
host=self.redis_host,
port=self.redis_port,
db=self.redis_db,
password=self.redis_password
)
@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
return cls(
redis_host=settings.get('REDIS_HOST', 'localhost'),
redis_port=settings.getint('REDIS_PORT', 6379),
redis_db=settings.getint('REDIS_DB', 0),
redis_password=settings.get('REDIS_PASSWORD')
)
def process_item(self, item, spider):
"""
使用Redis布隆过滤器去重
"""
fingerprint = self.get_fingerprint(item)
# 使用RedisBloom的BF.EXISTS命令检查
try:
exists = self.redis_conn.execute_command('BF.EXISTS', 'dedup_bloom', fingerprint)
if exists:
raise DropItem(f"Duplicate item: {fingerprint[:16]}...")
# 添加到布隆过滤器
self.redis_conn.execute_command('BF.ADD', 'dedup_bloom', fingerprint)
except redis.exceptions.ResponseError as e:
# 如果Redis没有布隆过滤器模块,回退到普通去重
spider.logger.warning(f"Redis Bloom Filter not available: {e}")
# 回退逻辑
if self.redis_conn.sismember('dedup_set', fingerprint):
raise DropItem(f"Duplicate item: {fingerprint[:16]}...")
self.redis_conn.sadd('dedup_set', fingerprint)
return item
def get_fingerprint(self, item):
"""
生成指纹
"""
import hashlib
url = item.get('url', '')
if url:
return hashlib.sha256(url.encode('utf-8')).hexdigest()
item_str = str(sorted(item.items()))
return hashlib.sha256(item_str.encode('utf-8')).hexdigest()#URL去重策略
#URL标准化去重
from urllib.parse import urlparse, parse_qs, urlencode
import hashlib
import re
class URLNormalizationDuplicatesPipeline:
"""
URL标准化去重Pipeline
"""
def __init__(self):
self.seen_urls = set()
def process_item(self, item, spider):
"""
处理项目,对URL进行标准化去重
"""
url = item.get('url', '')
if not url:
return item
normalized_url = self.normalize_url(url)
url_hash = hashlib.sha256(normalized_url.encode('utf-8')).hexdigest()
if url_hash in self.seen_urls:
raise DropItem(f"Duplicate URL: {url}")
self.seen_urls.add(url_hash)
return item
def normalize_url(self, url):
"""
标准化URL
"""
parsed = urlparse(url)
# 标准化协议和域名
scheme = parsed.scheme.lower()
netloc = parsed.netloc.lower()
# 标准化路径(移除末尾斜杠)
path = parsed.path.rstrip('/')
# 标准化查询参数(按字母顺序排序)
query_params = parse_qs(parsed.query, keep_blank_values=True)
sorted_query = sorted(query_params.items())
query = urlencode(sorted_query, doseq=True)
# 重建URL
normalized = f"{scheme}://{netloc}{path}"
if query:
normalized += f"?{query}"
# 移除一些常见的无关参数
normalized = self.remove_common_tracking_params(normalized)
return normalized
def remove_common_tracking_params(self, url):
"""
移除常见的追踪参数
"""
tracking_params = [
'utm_source', 'utm_medium', 'utm_campaign', 'utm_term', 'utm_content',
'gclid', 'fbclid', 'ref', 'source', 'from', 'via', 'tracking',
'session_id', 'token', 'timestamp'
]
parsed = urlparse(url)
query_params = parse_qs(parsed.query, keep_blank_values=True)
# 移除追踪参数
filtered_params = {k: v for k, v in query_params.items()
if k.lower() not in tracking_params}
query = urlencode(filtered_params, doseq=True)
return f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{query}" if query else url#请求级别去重
import hashlib
from scrapy.dupefilters import RFPDupeFilter
class CustomRFPDupeFilter(RFPDupeFilter):
"""
自定义请求去重过滤器
"""
def request_seen(self, request):
"""
检查请求是否已见过
"""
fp = self.request_fingerprint(request)
if self.file.exists(fp):
return True
self.file.write(fp)
return False
def request_fingerprint(self, request):
"""
生成请求指纹,可自定义生成策略
"""
# 默认策略:基于URL、method、body
fingerprint_data = (
request.method.encode('utf-8'),
request.url.encode('utf-8'),
request.body,
)
return hashlib.sha1(b''.join(fingerprint_data)).hexdigest().encode('utf-8')
def close(self, reason):
"""
关闭去重过滤器
"""
self.file.close()
class AdvancedRequestDuplicatesPipeline:
"""
高级请求去重Pipeline
"""
def process_request(self, request, spider):
"""
处理请求级别的去重
"""
# 可以在这里实现更复杂的请求去重逻辑
# 这通常在Downloader Middleware中实现
pass#数据指纹生成算法
#多字段指纹生成
import hashlib
import json
from itemadapter import ItemAdapter
class MultiFieldFingerprintGenerator:
"""
多字段指纹生成器
"""
def __init__(self, weight_config=None):
"""
初始化权重配置
:param weight_config: 字段权重配置
"""
self.weight_config = weight_config or {
'url': 10,
'title': 8,
'content': 5,
'description': 3,
'category': 2,
'tags': 1
}
def generate_fingerprint(self, item, spider=None):
"""
生成多字段数据指纹
"""
adapter = ItemAdapter(item)
weighted_parts = []
for field_name, weight in self.weight_config.items():
if field_name in adapter:
value = adapter[field_name]
if value:
# 根据权重重复字段值
field_str = str(value)
weighted_part = field_str * weight
weighted_parts.append(weighted_part)
if not weighted_parts:
# 如果没有配置的字段,使用所有字段
item_dict = adapter.asdict()
item_str = json.dumps(item_dict, sort_keys=True, ensure_ascii=False)
weighted_parts.append(item_str)
combined = '|'.join(weighted_parts)
return hashlib.sha256(combined.encode('utf-8')).hexdigest()
def generate_content_signature(self, item):
"""
生成内容签名,用于检测内容变化
"""
adapter = ItemAdapter(item)
content_fields = ['title', 'content', 'description', 'text']
content_parts = []
for field in content_fields:
if field in adapter:
value = adapter[field]
if value:
# 只取前1000个字符作为签名的一部分
content_str = str(value)[:1000]
content_parts.append(content_str)
if content_parts:
content_combined = ''.join(content_parts)
# 使用MD5生成较短的签名
signature = hashlib.md5(content_combined.encode('utf-8')).hexdigest()
return signature
return None
class ContentChangeDetectionPipeline:
"""
内容变化检测Pipeline
"""
def __init__(self):
self.fingerprint_generator = MultiFieldFingerprintGenerator()
def process_item(self, item, spider):
"""
检测内容是否发生变化
"""
# 生成当前内容的指纹
current_fingerprint = self.fingerprint_generator.generate_fingerprint(item)
# 这里可以与存储的历史指纹进行比较
# 如果使用Redis存储历史指纹
# historical_fingerprint = self.redis_conn.get(f"fingerprint:{item.get('url', '')}")
# 如果指纹不同,说明内容发生了变化
# if current_fingerprint != historical_fingerprint:
# # 标记为更新内容
# item['content_updated'] = True
# 存储当前指纹
# self.redis_conn.setex(f"fingerprint:{item.get('url', '')}", 86400*30, current_fingerprint)
item['fingerprint'] = current_fingerprint
return item#语义去重算法
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
class SemanticDuplicatesPipeline:
"""
语义去重Pipeline
"""
def __init__(self, similarity_threshold=0.8):
self.similarity_threshold = similarity_threshold
self.content_vectors = [] # 存储已处理内容的向量
self.content_texts = [] # 存储已处理的内容文本
self.vectorizer = TfidfVectorizer(
max_features=1000,
stop_words='english',
ngram_range=(1, 2)
)
def process_item(self, item, spider):
"""
语义去重处理
"""
# 提取主要内容
content = self.extract_main_content(item)
if not content:
return item
# 向量化当前内容
current_vector = self.vectorizer.fit_transform([content])
# 与历史内容比较
if self.content_vectors:
similarities = cosine_similarity(current_vector, np.vstack(self.content_vectors))
max_similarity = np.max(similarities)
if max_similarity > self.similarity_threshold:
raise DropItem(f"Semantic duplicate detected with similarity: {max_similarity:.3f}")
# 存储当前内容向量
if len(self.content_vectors) == 0:
self.content_vectors.append(current_vector.toarray()[0])
else:
self.content_vectors.append(current_vector.toarray()[0])
self.content_texts.append(content)
return item
def extract_main_content(self, item):
"""
提取项目的主要内容
"""
adapter = ItemAdapter(item)
content_fields = ['title', 'content', 'description', 'text']
for field in content_fields:
if field in adapter:
content = adapter[field]
if content and str(content).strip():
return str(content)[:2000] # 限制长度
return ""#增量抓取策略
#时间戳增量抓取
import time
from datetime import datetime, timedelta
import redis
class TimestampIncrementalSpider:
"""
基于时间戳的增量抓取Spider
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.redis_conn = redis.Redis(
host=self.settings.get('REDIS_HOST', 'localhost'),
port=self.settings.getint('REDIS_PORT', 6379),
db=self.settings.getint('REDIS_DB', 0)
)
def get_last_crawl_time(self, source):
"""
获取上次抓取时间
"""
last_time = self.redis_conn.get(f"last_crawl_time:{source}")
if last_time:
return float(last_time)
return 0 # 如果没有记录,从最开始
def set_last_crawl_time(self, source):
"""
设置本次抓取时间
"""
self.redis_conn.set(f"last_crawl_time:{source}", time.time())
def parse_list_page(self, response):
"""
解析列表页面,只抓取新增内容
"""
last_crawl_time = self.get_last_crawl_time(self.name)
for item_selector in response.css('div.item'):
# 提取发布时间
pub_time_str = item_selector.css('.pub-time::text').get()
if pub_time_str:
try:
pub_time = datetime.strptime(pub_time_str, '%Y-%m-%d %H:%M:%S').timestamp()
# 只抓取发布时间晚于上次抓取时间的内容
if pub_time > last_crawl_time:
detail_url = item_selector.css('a::attr(href)').get()
yield response.follow(detail_url, callback=self.parse_detail)
except ValueError:
# 如果时间格式不对,仍然抓取
detail_url = item_selector.css('a::attr(href)').get()
yield response.follow(detail_url, callback=self.parse_detail)
# 设置本次抓取时间
self.set_last_crawl_time(self.name)#版本号增量抓取
import redis
class VersionIncrementalSpider:
"""
基于版本号的增量抓取Spider
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.redis_conn = redis.Redis(
host=self.settings.get('REDIS_HOST', 'localhost'),
port=self.settings.getint('REDIS_PORT', 6379),
db=self.settings.getint('REDIS_DB', 0)
)
def parse_list_page(self, response):
"""
解析列表页面,基于版本号判断是否需要抓取
"""
for item_selector in response.css('div.item'):
item_id = item_selector.css('.item-id::text').get()
current_version = item_selector.css('.version::text').get()
if item_id and current_version:
stored_version = self.redis_conn.get(f"item_version:{item_id}")
if not stored_version or int(current_version) > int(stored_version):
# 版本号更新,需要重新抓取
detail_url = item_selector.css('a::attr(href)').get()
yield response.follow(
detail_url,
callback=self.parse_detail,
meta={'item_id': item_id, 'version': current_version}
)
# 更新存储的版本号
self.redis_conn.set(f"item_version:{item_id}", current_version)
else:
# 版本未更新,跳过
self.logger.info(f"Item {item_id} version unchanged, skipping")#智能增量抓取
import redis
import time
from datetime import datetime, timedelta
class SmartIncrementalSpider:
"""
智能增量抓取Spider
根据内容更新频率动态调整抓取策略
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.redis_conn = redis.Redis(
host=self.settings.get('REDIS_HOST', 'localhost'),
port=self.settings.getint('REDIS_PORT', 6379),
db=self.settings.getint('REDIS_DB', 0)
)
def should_crawl_url(self, url, content_hash=None):
"""
判断是否应该抓取URL
"""
# 获取URL的抓取记录
record_key = f"crawl_record:{url}"
record = self.redis_conn.hgetall(record_key)
if not record:
# 首次抓取
return True
last_crawl_time = float(record.get(b'last_crawl_time', 0))
crawl_frequency = int(record.get(b'crawl_frequency', 3600)) # 默认1小时
content_changed = record.get(b'content_changed', b'0') == b'1'
current_time = time.time()
# 如果内容最近发生变化,缩短抓取间隔
if content_changed and current_time - last_crawl_time < crawl_frequency / 2:
return False
# 如果超过了抓取间隔,进行抓取
if current_time - last_crawl_time >= crawl_frequency:
return True
return False
def update_crawl_record(self, url, content_hash=None, content_changed=False):
"""
更新抓取记录
"""
record_key = f"crawl_record:{url}"
# 更新最后抓取时间
self.redis_conn.hset(record_key, 'last_crawl_time', time.time())
# 更新内容哈希
if content_hash:
old_hash = self.redis_conn.hget(record_key, 'content_hash')
if old_hash and old_hash.decode('utf-8') != content_hash:
content_changed = True
self.redis_conn.hset(record_key, 'content_hash', content_hash)
# 更新内容变化标记
self.redis_conn.hset(record_key, 'content_changed', int(content_changed))
# 根据内容变化频率调整抓取频率
if content_changed:
# 内容变化频繁,增加抓取频率
current_freq = int(self.redis_conn.hget(record_key, 'crawl_frequency') or 3600)
new_freq = max(300, current_freq // 2) # 最短5分钟
self.redis_conn.hset(record_key, 'crawl_frequency', new_freq)
else:
# 内容稳定,减少抓取频率
current_freq = int(self.redis_conn.hget(record_key, 'crawl_frequency') or 3600)
new_freq = min(86400, current_freq * 2) # 最长1天
self.redis_conn.hset(record_key, 'crawl_frequency', new_freq)
def parse_list_page(self, response):
"""
智能解析列表页面
"""
for item_selector in response.css('div.item'):
detail_url = item_selector.css('a::attr(href)').get()
if detail_url and self.should_crawl_url(detail_url):
yield response.follow(detail_url, callback=self.parse_detail)
else:
self.logger.info(f"Skipping URL based on incremental logic: {detail_url}")#分布式去重方案
#分布式Redis去重
import redis
from redis.sentinel import Sentinel
import hashlib
from scrapy.exceptions import DropItem
class DistributedRedisDuplicatesPipeline:
"""
分布式Redis去重Pipeline
支持Redis集群和哨兵模式
"""
def __init__(self, redis_hosts, redis_port, redis_db, use_sentinel=False, master_name='mymaster'):
self.redis_hosts = redis_hosts
self.redis_port = redis_port
self.redis_db = redis_db
self.use_sentinel = use_sentinel
self.master_name = master_name
if use_sentinel:
sentinel = Sentinel([(host, redis_port) for host in redis_hosts])
self.redis_conn = sentinel.master_for(master_name, db=redis_db)
else:
# 使用Redis集群或单机模式
if len(redis_hosts) > 1:
# 简单轮询,实际应该使用Redis Cluster客户端
self.redis_conn = redis.Redis(
host=redis_hosts[0],
port=redis_port,
db=redis_db
)
else:
self.redis_conn = redis.Redis(
host=redis_hosts[0],
port=redis_port,
db=redis_db
)
@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
redis_hosts = settings.getlist('REDIS_HOSTS', ['localhost'])
return cls(
redis_hosts=redis_hosts,
redis_port=settings.getint('REDIS_PORT', 6379),
redis_db=settings.getint('REDIS_DB', 0),
use_sentinel=settings.getbool('REDIS_USE_SENTINEL', False),
master_name=settings.get('REDIS_MASTER_NAME', 'mymaster')
)
def process_item(self, item, spider):
"""
分布式去重处理
"""
fingerprint = self.get_fingerprint(item)
# 使用分布式锁确保原子操作
lock_key = f"lock:{fingerprint}"
lock_value = f"{spider.name}:{time.time()}"
lock_timeout = 10 # 10秒锁超时
# 获取分布式锁
if self.acquire_lock(lock_key, lock_value, lock_timeout):
try:
# 检查是否已存在
if self.redis_conn.exists(f"duplicate:{fingerprint}"):
raise DropItem(f"Distributed duplicate: {fingerprint[:16]}...")
# 存储指纹
expire_time = spider.crawler.settings.getint('DUPLICATE_EXPIRE_TIME', 86400 * 7)
self.redis_conn.setex(f"duplicate:{fingerprint}", expire_time, b'1')
return item
finally:
# 释放锁
self.release_lock(lock_key, lock_value)
else:
# 获取锁失败,可能正在被其他实例处理
spider.logger.warning(f"Could not acquire lock for: {fingerprint[:16]}...")
raise DropItem(f"Lock acquisition failed: {fingerprint[:16]}...")
def acquire_lock(self, lock_key, lock_value, timeout):
"""
获取分布式锁
"""
lua_acquire = """
if redis.call("get", KEYS[1]) == ARGV[1] then
redis.call("expire", KEYS[1], ARGV[2])
return 1
elseif redis.call("set", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) then
return 1
else
return 0
end
"""
return bool(self.redis_conn.eval(lua_acquire, 1, lock_key, lock_value, timeout))
def release_lock(self, lock_key, lock_value):
"""
释放分布式锁
"""
lua_release = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.redis_conn.eval(lua_release, 1, lock_key, lock_value)
def get_fingerprint(self, item):
"""
生成指纹
"""
import hashlib
url = item.get('url', '')
if url:
return hashlib.sha256(url.encode('utf-8')).hexdigest()
item_str = str(sorted(item.items()))
return hashlib.sha256(item_str.encode('utf-8')).hexdigest()#一致性哈希去重
import hashlib
import bisect
from collections import OrderedDict
class ConsistentHashDuplicatesPipeline:
"""
基于一致性哈希的分布式去重Pipeline
"""
def __init__(self, nodes=None, replicas=150):
self.nodes = nodes or ['node1', 'node2', 'node3']
self.replicas = replicas # 每个节点的虚拟副本数
self.ring = OrderedDict() # 一致性哈希环
self._sorted_keys = []
self.setup_ring()
def setup_ring(self):
"""
设置一致性哈希环
"""
for node in self.nodes:
for i in range(self.replicas):
virtual_key = f"{node}:{i}"
key = self.gen_key(virtual_key.encode('utf-8'))
self.ring[key] = node
self._sorted_keys = sorted(self.ring.keys())
def gen_key(self, key):
"""
生成哈希键
"""
return hashlib.sha256(key).hexdigest()
def get_node(self, key):
"""
获取负责该key的节点
"""
if not self.ring:
return None
hash_key = self.gen_key(key.encode('utf-8'))
idx = bisect.bisect(self._sorted_keys, hash_key)
if idx == len(self._sorted_keys):
idx = 0
return self.ring[self._sorted_keys[idx]]
def process_item(self, item, spider):
"""
一致性哈希去重处理
"""
fingerprint = self.get_fingerprint(item)
# 根据指纹确定负责的节点
target_node = self.get_node(fingerprint)
# 这里可以连接到对应的Redis实例或其他存储
# 实际应用中需要维护到各个节点的连接
redis_key = f"duplicate:{target_node}:{fingerprint}"
# 简化实现:使用全局Redis连接
# 实际应该根据target_node选择对应的Redis实例
global_redis = spider.crawler.settings.get('GLOBAL_REDIS_CONN')
if global_redis.exists(redis_key):
raise DropItem(f"Consistent hash duplicate: {fingerprint[:16]}...")
expire_time = spider.crawler.settings.getint('DUPLICATE_EXPIRE_TIME', 86400 * 7)
global_redis.setex(redis_key, expire_time, b'1')
return item
def get_fingerprint(self, item):
"""
生成指纹
"""
import hashlib
url = item.get('url', '')
if url:
return hashlib.sha256(url.encode('utf-8')).hexdigest()
item_str = str(sorted(item.items()))
return hashlib.sha256(item_str.encode('utf-8')).hexdigest()#性能优化策略
#批量操作优化
import redis
import time
from collections import deque
import hashlib
class BatchOptimizedDuplicatesPipeline:
"""
批量操作优化的去重Pipeline
"""
def __init__(self, redis_host, redis_port, redis_db, batch_size=1000, flush_interval=5):
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_db = redis_db
self.redis_conn = redis.Redis(
host=self.redis_host,
port=self.redis_port,
db=self.redis_db,
decode_responses=False
)
self.batch_size = batch_size
self.flush_interval = flush_interval
self.pending_items = deque()
self.last_flush_time = time.time()
# 统计信息
self.processed_count = 0
self.duplicate_count = 0
@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
return cls(
redis_host=settings.get('REDIS_HOST', 'localhost'),
redis_port=settings.getint('REDIS_PORT', 6379),
redis_db=settings.getint('REDIS_DB', 0),
batch_size=settings.getint('DEDUP_BATCH_SIZE', 1000),
flush_interval=settings.getint('DEDUP_FLUSH_INTERVAL', 5)
)
def process_item(self, item, spider):
"""
批量处理项目
"""
fingerprint = self.get_fingerprint(item)
# 检查是否在内存缓存中(快速去重)
if hasattr(self, 'memory_cache'):
if fingerprint in self.memory_cache:
self.duplicate_count += 1
raise DropItem(f"In-memory duplicate: {fingerprint[:16]}...")
# 添加到待处理队列
self.pending_items.append((fingerprint, item))
# 检查是否需要批量处理
current_time = time.time()
if (len(self.pending_items) >= self.batch_size or
current_time - self.last_flush_time >= self.flush_interval):
self.flush_batch(spider)
self.processed_count += 1
return item
def flush_batch(self, spider):
"""
批量处理待处理项目
"""
if not self.pending_items:
return
# 批量检查Redis中是否已存在
fingerprints = [item[0] for item in self.pending_items]
# 使用管道批量操作
pipe = self.redis_conn.pipeline()
for fp in fingerprints:
pipe.exists(f"duplicate:{fp}")
results = pipe.execute()
# 处理结果
items_to_store = []
for i, exists in enumerate(results):
fingerprint, item = self.pending_items[i]
if exists:
# 重复项,记录统计信息
self.duplicate_count += 1
spider.logger.debug(f"Duplicate found: {fingerprint[:16]}...")
else:
# 新项目,准备存储
items_to_store.append((fingerprint, item))
# 批量存储新项目
if items_to_store:
pipe = self.redis_conn.pipeline()
expire_time = spider.crawler.settings.getint('DUPLICATE_EXPIRE_TIME', 86400 * 7)
for fingerprint, item in items_to_store:
pipe.setex(f"duplicate:{fingerprint}", expire_time, b'1')
pipe.execute()
# 清空待处理队列
self.pending_items.clear()
self.last_flush_time = time.time()
spider.logger.info(f"Batch processed: {len(fingerprints)} items, "
f"{len(items_to_store)} new, {len(fingerprints) - len(items_to_store)} duplicates")
def close_spider(self, spider):
"""
关闭爬虫时处理剩余项目
"""
self.flush_batch(spider)
spider.logger.info(f"Pipeline stats - Processed: {self.processed_count}, "
f"Duplicates: {self.duplicate_count}, "
f"Uniques: {self.processed_count - self.duplicate_count}")
self.redis_conn.close()
def get_fingerprint(self, item):
"""
生成指纹
"""
import hashlib
url = item.get('url', '')
if url:
return hashlib.sha256(url.encode('utf-8')).hexdigest()
item_str = str(sorted(item.items()))
return hashlib.sha256(item_str.encode('utf-8')).hexdigest()#内存缓存优化
import hashlib
import time
from collections import OrderedDict
from scrapy.exceptions import DropItem
class MemoryCachedDuplicatesPipeline:
"""
内存缓存优化的去重Pipeline
"""
def __init__(self, cache_size=10000, cache_expire=3600):
self.cache_size = cache_size
self.cache_expire = cache_expire
self.cache = OrderedDict() # LRU缓存
# 统计信息
self.hit_count = 0
self.miss_count = 0
self.cache_hits = 0
self.cache_misses = 0
@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
return cls(
cache_size=settings.getint('DEDUP_CACHE_SIZE', 10000),
cache_expire=settings.getint('DEDUP_CACHE_EXPIRE', 3600)
)
def process_item(self, item, spider):
"""
使用内存缓存进行快速去重检查
"""
fingerprint = self.get_fingerprint(item)
current_time = time.time()
# 检查内存缓存
if fingerprint in self.cache:
cached_time = self.cache[fingerprint]
# 检查是否过期
if current_time - cached_time < self.cache_expire:
self.cache_hits += 1
raise DropItem(f"Cached duplicate: {fingerprint[:16]}...")
else:
# 缓存过期,从缓存中移除
del self.cache[fingerprint]
# 内存缓存未命中,添加到缓存
self.cache[fingerprint] = current_time
# LRU: 如果缓存满了,移除最久未使用的项
if len(self.cache) > self.cache_size:
self.cache.pop(next(iter(self.cache)))
self.cache_misses += 1
return item
def get_fingerprint(self, item):
"""
生成指纹
"""
import hashlib
url = item.get('url', '')
if url:
return hashlib.sha256(url.encode('utf-8')).hexdigest()
item_str = str(sorted(item.items()))
return hashlib.sha256(item_str.encode('utf-8')).hexdigest()
def close_spider(self, spider):
"""
关闭爬虫时的统计信息
"""
total_ops = self.cache_hits + self.cache_misses
hit_rate = self.cache_hits / total_ops if total_ops > 0 else 0
spider.logger.info(f"Cache stats - Hits: {self.cache_hits}, Misses: {self.cache_misses}, "
f"Hit rate: {hit_rate:.2%}, Final cache size: {len(self.cache)}")#常见问题与解决方案
#问题1: 内存溢出
现象: 长时间运行后内存占用过高 解决方案:
class MemoryEfficientDuplicatesPipeline:
def __init__(self):
# 使用固定大小的集合,定期清理
self.seen_items = set()
self.max_items = 100000
self.cleanup_threshold = 90000
def process_item(self, item, spider):
fingerprint = self.get_fingerprint(item)
if len(self.seen_items) > self.cleanup_threshold:
# 定期清理以控制内存使用
self.seen_items.clear()
if fingerprint in self.seen_items:
raise DropItem("Duplicate item")
if len(self.seen_items) < self.max_items:
self.seen_items.add(fingerprint)
return item#问题2: Redis连接问题
现象: Redis连接超时或断开 解决方案:
import redis
from redis.backoff import ExponentialBackoff
from redis.retry import Retry
class RobustRedisDuplicatesPipeline:
def __init__(self):
self.redis_conn = redis.Redis(
host='localhost',
port=6379,
db=0,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
retry=Retry(ExponentialBackoff(cap=10), 3),
health_check_interval=30
)
def process_item(self, item, spider):
try:
fingerprint = self.get_fingerprint(item)
if self.redis_conn.exists(fingerprint):
raise DropItem("Duplicate item")
self.redis_conn.setex(fingerprint, 86400*7, b'1')
except redis.ConnectionError:
spider.logger.error("Redis connection error, skipping deduplication")
# 连接错误时跳过去重,避免爬虫停止
pass
except redis.TimeoutError:
spider.logger.warning("Redis timeout, continuing without deduplication")
pass
return item#问题3: 去重效果不佳
现象: 仍有大量重复数据通过 解决方案:
class ComprehensiveDuplicatesPipeline:
def __init__(self):
self.strategies = [
self.url_strategy,
self.content_strategy,
self.field_strategy
]
def process_item(self, item, spider):
# 多重策略验证
for strategy in self.strategies:
if strategy(item, spider):
raise DropItem(f"Duplicate detected by {strategy.__name__}")
# 如果都没有发现重复,则通过
return item
def url_strategy(self, item, spider):
# URL去重策略
url = item.get('url')
if url:
# 检查URL是否已存在
pass
return False
def content_strategy(self, item, spider):
# 内容相似度去重策略
return False
def field_strategy(self, item, spider):
# 特定字段组合去重策略
return False#最佳实践建议
#设计原则
- 分层去重: 内存缓存 + Redis存储
- 性能平衡: 在准确性和性能间找平衡
- 容错处理: Redis故障时的降级策略
- 资源管理: 内存和存储的合理使用
#部署建议
- 监控: 实施去重效果监控
- 调优: 根据数据特点调整参数
- 备份: 定期备份去重数据
💡 核心要点: 数据去重和增量更新是大规模爬虫系统的基础设施,通过合理的架构设计和优化策略,可以显著提升爬虫效率和数据质量。
#SEO优化建议
为了提高这篇数据去重与增量更新教程在搜索引擎中的排名,以下是几个关键的SEO优化建议:
#标题优化
- 主标题: 包含核心关键词"数据去重", "增量更新", "Redis", "指纹校验"
- 二级标题: 每个章节标题都包含相关的长尾关键词
- H1-H6层次结构: 保持正确的标题层级,便于搜索引擎理解内容结构
#内容优化
- 关键词密度: 在内容中自然地融入关键词如"Scrapy", "数据去重", "增量更新", "Redis", "布隆过滤器", "爬虫框架"等
- 元描述: 在文章开头的元数据中包含吸引人的描述
- 内部链接: 链接到其他相关教程,如Pipeline管道实战等
- 外部权威链接: 引用官方文档和权威资源
#技术SEO
- 页面加载速度: 优化代码块和图片加载
- 移动端适配: 确保在移动设备上良好显示
- 结构化数据: 使用适当的HTML标签和语义化元素
#用户体验优化
- 内容可读性: 使用清晰的段落结构和代码示例
- 互动元素: 提供实际可运行的代码示例
- 更新频率: 定期更新内容以保持时效性
🔗 相关教程推荐
- Pipeline管道实战 - 数据处理基础
- 数据清洗与校验 - 数据质量保证
- Redis指纹校验 - 去重与增量策略
- Downloader Middleware - 请求响应处理
- Spider中间件深度定制 - 数据预处理与后处理
🏷️ 标签云: Scrapy 数据去重 增量更新 Redis 指纹校验 布隆过滤器 爬虫框架 网络爬虫 Python爬虫

