Scrapy数据去重与增量更新完全指南 - Redis指纹校验与智能增量抓取技术详解

📂 所属阶段:第二阶段 — 数据流转(数据处理篇)
🔗 相关章节:Pipeline管道实战 · 数据清洗与校验

目录

数据去重基础概念

数据去重是爬虫系统中的关键技术,用于识别和过滤重复的数据,避免重复抓取和存储,提高爬虫效率和数据质量。

数据去重的重要性

"""
数据去重的重要性:

1. 资源节约:避免重复抓取相同内容
2. 存储优化:减少重复数据存储
3. 效率提升:专注抓取新数据
4. 数据质量:保持数据唯一性
5. 成本控制:降低带宽和计算成本
"""

数据去重的挑战

"""
数据去重面临的主要挑战:

1. 大规模数据:海量URL和数据的去重
2. 实时性要求:快速判断是否重复
3. 存储开销:去重标识的存储成本
4. 分布式协调:多节点间的去重协调
5. 误判控制:平衡准确性和性能
"""

增量更新基础概念

增量更新是指只抓取新增或发生变化的数据,而不是重新抓取全部数据,这是大规模爬虫系统的必备功能。

增量更新的优势

"""
增量更新的主要优势:

1. 效率提升:只处理变化的数据
2. 资源节省:减少网络和计算资源消耗
3. 时效性强:及时获取最新数据
4. 成本控制:降低运营成本
5. 服务友好:减少对目标服务器的压力
"""

增量更新的策略

"""
增量更新的常见策略:

1. 时间戳策略:基于更新时间判断
2. 版本号策略:基于版本号变化判断
3. 内容哈希策略:基于内容变化判断
4. 状态标记策略:基于状态变化判断
5. 混合策略:组合多种策略
"""

Redis指纹去重实现

基础Redis去重Pipeline

import hashlib
import redis
from scrapy.exceptions import DropItem

class RedisDuplicatesPipeline:
    """
    基于Redis的去重Pipeline
    """
    
    def __init__(self, redis_host, redis_port, redis_db, redis_password=None):
        self.redis_host = redis_host
        self.redis_port = redis_port
        self.redis_db = redis_db
        self.redis_password = redis_password
        
        # 连接Redis
        self.redis_conn = redis.Redis(
            host=self.redis_host,
            port=self.redis_port,
            db=self.redis_db,
            password=self.redis_password,
            decode_responses=False  # 保持字节模式以处理哈希值
        )
    
    @classmethod
    def from_crawler(cls, crawler):
        """
        从crawler实例创建Pipeline
        """
        settings = crawler.settings
        return cls(
            redis_host=settings.get('REDIS_HOST', 'localhost'),
            redis_port=settings.getint('REDIS_PORT', 6379),
            redis_db=settings.getint('REDIS_DB', 0),
            redis_password=settings.get('REDIS_PASSWORD')
        )
    
    def process_item(self, item, spider):
        """
        处理项目去重
        """
        # 生成数据指纹
        fingerprint = self.get_fingerprint(item)
        
        # 检查是否已存在
        if self.redis_conn.exists(fingerprint):
            spider.logger.info(f"Duplicate item detected: {fingerprint}")
            raise DropItem(f"Duplicate item with fingerprint: {fingerprint}")
        
        # 存储指纹(设置过期时间)
        expire_time = spider.crawler.settings.getint('DUPLICATE_EXPIRE_TIME', 86400 * 7)  # 默认7天
        self.redis_conn.setex(fingerprint, expire_time, b'1')
        
        return item
    
    def get_fingerprint(self, item):
        """
        生成数据指纹
        """
        # 基于URL生成指纹(可扩展为基于多个字段)
        url = item.get('url', '')
        if url:
            return hashlib.sha1(url.encode('utf-8')).hexdigest().encode('utf-8')
        
        # 如果没有URL,基于整个item生成指纹
        item_str = str(dict(sorted(item.items()))).encode('utf-8')
        return hashlib.sha1(item_str).hexdigest().encode('utf-8')
    
    def close_spider(self, spider):
        """
        爬虫关闭时的清理工作
        """
        self.redis_conn.close()

高级Redis去重Pipeline

import hashlib
import redis
import json
from scrapy.exceptions import DropItem
from itemadapter import ItemAdapter

class AdvancedRedisDuplicatesPipeline:
    """
    高级Redis去重Pipeline
    """
    
    def __init__(self, redis_host, redis_port, redis_db, redis_password=None):
        self.redis_host = redis_host
        self.redis_port = redis_port
        self.redis_db = redis_db
        self.redis_password = redis_password
        
        self.redis_conn = redis.Redis(
            host=self.redis_host,
            port=self.redis_port,
            db=self.redis_db,
            password=self.redis_password,
            decode_responses=False
        )
    
    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        return cls(
            redis_host=settings.get('REDIS_HOST', 'localhost'),
            redis_port=settings.getint('REDIS_PORT', 6379),
            redis_db=settings.getint('REDIS_DB', 0),
            redis_password=settings.get('REDIS_PASSWORD')
        )
    
    def process_item(self, item, spider):
        """
        高级去重处理
        """
        # 获取去重策略
        dedup_strategy = spider.crawler.settings.get('DEDUP_STRATEGY', 'url')
        
        if dedup_strategy == 'url':
            fingerprint = self.get_url_fingerprint(item)
        elif dedup_strategy == 'content':
            fingerprint = self.get_content_fingerprint(item)
        elif dedup_strategy == 'custom':
            fingerprint = self.get_custom_fingerprint(item, spider)
        else:
            fingerprint = self.get_default_fingerprint(item)
        
        # 检查是否重复
        if self.is_duplicate(fingerprint):
            raise DropItem(f"Duplicate item detected: {fingerprint.decode('utf-8')[:16]}...")
        
        # 存储指纹
        self.store_fingerprint(fingerprint, item, spider)
        
        return item
    
    def get_url_fingerprint(self, item):
        """
        基于URL生成指纹
        """
        url = item.get('url', item.get('link', ''))
        if url:
            return hashlib.sha256(url.encode('utf-8')).hexdigest().encode('utf-8')
        return None
    
    def get_content_fingerprint(self, item):
        """
        基于内容生成指纹
        """
        adapter = ItemAdapter(item)
        content_fields = ['title', 'description', 'content', 'text']
        
        content_parts = []
        for field in content_fields:
            if field in adapter:
                value = adapter[field]
                if value:
                    content_parts.append(str(value))
        
        if content_parts:
            content = '|'.join(content_parts)
            return hashlib.sha256(content.encode('utf-8')).hexdigest().encode('utf-8')
        
        return self.get_default_fingerprint(item)
    
    def get_custom_fingerprint(self, item, spider):
        """
        基于自定义字段生成指纹
        """
        custom_fields = getattr(spider, 'dedup_fields', ['url'])
        adapter = ItemAdapter(item)
        
        parts = []
        for field in custom_fields:
            if field in adapter:
                value = adapter[field]
                if value is not None:
                    parts.append(str(value))
        
        if parts:
            content = '|'.join(parts)
            return hashlib.sha256(content.encode('utf-8')).hexdigest().encode('utf-8')
        
        return self.get_default_fingerprint(item)
    
    def get_default_fingerprint(self, item):
        """
        默认指纹生成方法
        """
        item_dict = ItemAdapter(item).asdict()
        item_sorted = json.dumps(item_dict, sort_keys=True, ensure_ascii=False)
        return hashlib.sha256(item_sorted.encode('utf-8')).hexdigest().encode('utf-8')
    
    def is_duplicate(self, fingerprint):
        """
        检查是否为重复数据
        """
        return self.redis_conn.exists(fingerprint) == 1
    
    def store_fingerprint(self, fingerprint, item, spider):
        """
        存储指纹
        """
        # 设置过期时间
        expire_time = spider.crawler.settings.getint('DUPLICATE_EXPIRE_TIME', 86400 * 7)  # 7天
        
        # 可选:存储额外信息
        store_extra = spider.crawler.settings.getbool('STORE_DEDUP_EXTRA_INFO', False)
        
        if store_extra:
            extra_info = {
                'timestamp': time.time(),
                'spider': spider.name,
                'item_keys': list(ItemAdapter(item).asdict().keys())
            }
            self.redis_conn.hset('dedup_info', fingerprint, json.dumps(extra_info))
        else:
            self.redis_conn.setex(fingerprint, expire_time, b'1')

    def close_spider(self, spider):
        """
        关闭Redis连接
        """
        self.redis_conn.close()

Redis去重优化

import hashlib
import redis
from scrapy.exceptions import DropItem
import time

class OptimizedRedisDuplicatesPipeline:
    """
    优化的Redis去重Pipeline
    """
    
    def __init__(self, redis_host, redis_port, redis_db, redis_password=None):
        self.redis_host = redis_host
        self.redis_port = redis_port
        self.redis_db = redis_db
        self.redis_password = redis_password
        
        self.redis_conn = redis.Redis(
            host=self.redis_host,
            port=self.redis_port,
            db=self.redis_db,
            password=self.redis_password,
            decode_responses=False,
            socket_keepalive=True,
            socket_keepalive_options={},
            health_check_interval=30
        )
        
        # 批量操作优化
        self.batch_size = 100
        self.pending_fingerprints = []
        
        # 统计信息
        self.duplicate_count = 0
        self.processed_count = 0
    
    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        return cls(
            redis_host=settings.get('REDIS_HOST', 'localhost'),
            redis_port=settings.getint('REDIS_PORT', 6379),
            redis_db=settings.getint('REDIS_DB', 0),
            redis_password=settings.get('REDIS_PASSWORD')
        )
    
    def process_item(self, item, spider):
        """
        处理项目,支持批量操作
        """
        fingerprint = self.get_fingerprint(item)
        
        # 检查是否重复
        if self.redis_conn.exists(fingerprint):
            self.duplicate_count += 1
            raise DropItem(f"Duplicate item: {fingerprint.decode('utf-8')[:16]}...")
        
        # 添加到待处理列表
        self.pending_fingerprints.append(fingerprint)
        
        # 批量存储
        if len(self.pending_fingerprints) >= self.batch_size:
            self.flush_pending_fingerprints(spider)
        
        self.processed_count += 1
        return item
    
    def flush_pending_fingerprints(self, spider):
        """
        批量刷新待处理的指纹
        """
        if not self.pending_fingerprints:
            return
        
        pipe = self.redis_conn.pipeline()
        expire_time = spider.crawler.settings.getint('DUPLICATE_EXPIRE_TIME', 86400 * 7)
        
        for fingerprint in self.pending_fingerprints:
            pipe.setex(fingerprint, expire_time, b'1')
        
        pipe.execute()
        self.pending_fingerprints.clear()
    
    def get_fingerprint(self, item):
        """
        生成指纹
        """
        # 可以根据需要实现不同的指纹生成策略
        url = item.get('url', '')
        if url:
            return hashlib.sha256(url.encode('utf-8')).hexdigest().encode('utf-8')
        
        # 默认策略
        item_str = str(sorted(item.items())).encode('utf-8')
        return hashlib.sha256(item_str).hexdigest().encode('utf-8')
    
    def close_spider(self, spider):
        """
        关闭爬虫时处理剩余指纹
        """
        self.flush_pending_fingerprints(spider)
        
        spider.logger.info(f"Duplicates pipeline stats: "
                          f"Processed: {self.processed_count}, "
                          f"Duplicates: {self.duplicate_count}")
        
        self.redis_conn.close()

布隆过滤器去重

本地布隆过滤器

import mmh3
from bitarray import bitarray
import math

class BloomFilter:
    """
    本地布隆过滤器实现
    """
    
    def __init__(self, capacity, error_rate=0.001):
        """
        初始化布隆过滤器
        :param capacity: 预期容量
        :param error_rate: 期望的误判率
        """
        self.capacity = capacity
        self.error_rate = error_rate
        
        # 计算位数组大小和哈希函数数量
        self.bit_array_size = self.get_size(capacity, error_rate)
        self.hash_count = self.get_hash_count(self.bit_array_size, capacity)
        
        # 初始化位数组
        self.bit_array = bitarray(self.bit_array_size)
        self.bit_array.setall(0)
        
        self.count = 0
    
    def add(self, item):
        """
        添加元素到布隆过滤器
        """
        digests = []
        
        for i in range(self.hash_count):
            digest = mmh3.hash(item, i) % self.bit_array_size
            digests.append(digest)
            self.bit_array[digest] = True
        
        self.count += 1
    
    def check(self, item):
        """
        检查元素是否可能存在
        """
        for i in range(self.hash_count):
            digest = mmh3.hash(item, i) % self.bit_array_size
            if self.bit_array[digest] == False:
                return False
        
        return True
    
    def get_size(self, n, p):
        """
        计算位数组大小
        :param n: 预期插入元素数量
        :param p: 误判率
        """
        m = -(n * math.log(p)) / (math.log(2) ** 2)
        return int(m)
    
    def get_hash_count(self, m, n):
        """
        计算哈希函数数量
        :param m: 位数组大小
        :param n: 预期插入元素数量
        """
        k = (m / n) * math.log(2)
        return int(k)

class BloomFilterDuplicatesPipeline:
    """
    使用布隆过滤器的去重Pipeline
    """
    
    def __init__(self, capacity=1000000, error_rate=0.001):
        self.bloom_filter = BloomFilter(capacity, error_rate)
        self.count = 0
    
    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        return cls(
            capacity=settings.getint('BLOOM_FILTER_CAPACITY', 1000000),
            error_rate=settings.getfloat('BLOOM_FILTER_ERROR_RATE', 0.001)
        )
    
    def process_item(self, item, spider):
        """
        使用布隆过滤器检查重复
        """
        fingerprint = self.get_fingerprint(item)
        
        # 检查是否可能重复
        if self.bloom_filter.check(fingerprint):
            # 布隆过滤器可能误判,这里可以进一步验证
            spider.logger.info(f"Possible duplicate detected: {fingerprint[:16]}...")
            # 可以选择抛出异常或记录日志
            raise DropItem(f"Possible duplicate: {fingerprint[:16]}...")
        
        # 添加到布隆过滤器
        self.bloom_filter.add(fingerprint)
        self.count += 1
        
        return item
    
    def get_fingerprint(self, item):
        """
        生成指纹字符串
        """
        url = item.get('url', '')
        if url:
            return url
        
        # 基于整个item生成指纹
        import hashlib
        item_str = str(sorted(item.items()))
        return hashlib.sha256(item_str.encode('utf-8')).hexdigest()

Redis布隆过滤器

import redis
from scrapy.exceptions import DropItem

class RedisBloomFilterDuplicatesPipeline:
    """
    基于Redis布隆过滤器的去重Pipeline
    需要Redis服务器支持RedisBloom模块
    """
    
    def __init__(self, redis_host, redis_port, redis_db, redis_password=None):
        self.redis_host = redis_host
        self.redis_port = redis_port
        self.redis_db = redis_db
        self.redis_password = redis_password
        
        self.redis_conn = redis.Redis(
            host=self.redis_host,
            port=self.redis_port,
            db=self.redis_db,
            password=self.redis_password
        )
    
    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        return cls(
            redis_host=settings.get('REDIS_HOST', 'localhost'),
            redis_port=settings.getint('REDIS_PORT', 6379),
            redis_db=settings.getint('REDIS_DB', 0),
            redis_password=settings.get('REDIS_PASSWORD')
        )
    
    def process_item(self, item, spider):
        """
        使用Redis布隆过滤器去重
        """
        fingerprint = self.get_fingerprint(item)
        
        # 使用RedisBloom的BF.EXISTS命令检查
        try:
            exists = self.redis_conn.execute_command('BF.EXISTS', 'dedup_bloom', fingerprint)
            if exists:
                raise DropItem(f"Duplicate item: {fingerprint[:16]}...")
            
            # 添加到布隆过滤器
            self.redis_conn.execute_command('BF.ADD', 'dedup_bloom', fingerprint)
            
        except redis.exceptions.ResponseError as e:
            # 如果Redis没有布隆过滤器模块,回退到普通去重
            spider.logger.warning(f"Redis Bloom Filter not available: {e}")
            # 回退逻辑
            if self.redis_conn.sismember('dedup_set', fingerprint):
                raise DropItem(f"Duplicate item: {fingerprint[:16]}...")
            self.redis_conn.sadd('dedup_set', fingerprint)
        
        return item
    
    def get_fingerprint(self, item):
        """
        生成指纹
        """
        import hashlib
        url = item.get('url', '')
        if url:
            return hashlib.sha256(url.encode('utf-8')).hexdigest()
        
        item_str = str(sorted(item.items()))
        return hashlib.sha256(item_str.encode('utf-8')).hexdigest()

URL去重策略

URL标准化去重

from urllib.parse import urlparse, parse_qs, urlencode
import hashlib
import re

class URLNormalizationDuplicatesPipeline:
    """
    URL标准化去重Pipeline
    """
    
    def __init__(self):
        self.seen_urls = set()
    
    def process_item(self, item, spider):
        """
        处理项目,对URL进行标准化去重
        """
        url = item.get('url', '')
        if not url:
            return item
        
        normalized_url = self.normalize_url(url)
        url_hash = hashlib.sha256(normalized_url.encode('utf-8')).hexdigest()
        
        if url_hash in self.seen_urls:
            raise DropItem(f"Duplicate URL: {url}")
        
        self.seen_urls.add(url_hash)
        return item
    
    def normalize_url(self, url):
        """
        标准化URL
        """
        parsed = urlparse(url)
        
        # 标准化协议和域名
        scheme = parsed.scheme.lower()
        netloc = parsed.netloc.lower()
        
        # 标准化路径(移除末尾斜杠)
        path = parsed.path.rstrip('/')
        
        # 标准化查询参数(按字母顺序排序)
        query_params = parse_qs(parsed.query, keep_blank_values=True)
        sorted_query = sorted(query_params.items())
        query = urlencode(sorted_query, doseq=True)
        
        # 重建URL
        normalized = f"{scheme}://{netloc}{path}"
        if query:
            normalized += f"?{query}"
        
        # 移除一些常见的无关参数
        normalized = self.remove_common_tracking_params(normalized)
        
        return normalized
    
    def remove_common_tracking_params(self, url):
        """
        移除常见的追踪参数
        """
        tracking_params = [
            'utm_source', 'utm_medium', 'utm_campaign', 'utm_term', 'utm_content',
            'gclid', 'fbclid', 'ref', 'source', 'from', 'via', 'tracking',
            'session_id', 'token', 'timestamp'
        ]
        
        parsed = urlparse(url)
        query_params = parse_qs(parsed.query, keep_blank_values=True)
        
        # 移除追踪参数
        filtered_params = {k: v for k, v in query_params.items() 
                          if k.lower() not in tracking_params}
        
        query = urlencode(filtered_params, doseq=True)
        return f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{query}" if query else url

请求级别去重

import hashlib
from scrapy.dupefilters import RFPDupeFilter

class CustomRFPDupeFilter(RFPDupeFilter):
    """
    自定义请求去重过滤器
    """
    
    def request_seen(self, request):
        """
        检查请求是否已见过
        """
        fp = self.request_fingerprint(request)
        
        if self.file.exists(fp):
            return True
        
        self.file.write(fp)
        return False
    
    def request_fingerprint(self, request):
        """
        生成请求指纹,可自定义生成策略
        """
        # 默认策略:基于URL、method、body
        fingerprint_data = (
            request.method.encode('utf-8'),
            request.url.encode('utf-8'),
            request.body,
        )
        
        return hashlib.sha1(b''.join(fingerprint_data)).hexdigest().encode('utf-8')
    
    def close(self, reason):
        """
        关闭去重过滤器
        """
        self.file.close()

class AdvancedRequestDuplicatesPipeline:
    """
    高级请求去重Pipeline
    """
    
    def process_request(self, request, spider):
        """
        处理请求级别的去重
        """
        # 可以在这里实现更复杂的请求去重逻辑
        # 这通常在Downloader Middleware中实现
        pass

数据指纹生成算法

多字段指纹生成

import hashlib
import json
from itemadapter import ItemAdapter

class MultiFieldFingerprintGenerator:
    """
    多字段指纹生成器
    """
    
    def __init__(self, weight_config=None):
        """
        初始化权重配置
        :param weight_config: 字段权重配置
        """
        self.weight_config = weight_config or {
            'url': 10,
            'title': 8,
            'content': 5,
            'description': 3,
            'category': 2,
            'tags': 1
        }
    
    def generate_fingerprint(self, item, spider=None):
        """
        生成多字段数据指纹
        """
        adapter = ItemAdapter(item)
        
        weighted_parts = []
        
        for field_name, weight in self.weight_config.items():
            if field_name in adapter:
                value = adapter[field_name]
                if value:
                    # 根据权重重复字段值
                    field_str = str(value)
                    weighted_part = field_str * weight
                    weighted_parts.append(weighted_part)
        
        if not weighted_parts:
            # 如果没有配置的字段,使用所有字段
            item_dict = adapter.asdict()
            item_str = json.dumps(item_dict, sort_keys=True, ensure_ascii=False)
            weighted_parts.append(item_str)
        
        combined = '|'.join(weighted_parts)
        return hashlib.sha256(combined.encode('utf-8')).hexdigest()
    
    def generate_content_signature(self, item):
        """
        生成内容签名,用于检测内容变化
        """
        adapter = ItemAdapter(item)
        
        content_fields = ['title', 'content', 'description', 'text']
        content_parts = []
        
        for field in content_fields:
            if field in adapter:
                value = adapter[field]
                if value:
                    # 只取前1000个字符作为签名的一部分
                    content_str = str(value)[:1000]
                    content_parts.append(content_str)
        
        if content_parts:
            content_combined = ''.join(content_parts)
            # 使用MD5生成较短的签名
            signature = hashlib.md5(content_combined.encode('utf-8')).hexdigest()
            return signature
        
        return None

class ContentChangeDetectionPipeline:
    """
    内容变化检测Pipeline
    """
    
    def __init__(self):
        self.fingerprint_generator = MultiFieldFingerprintGenerator()
    
    def process_item(self, item, spider):
        """
        检测内容是否发生变化
        """
        # 生成当前内容的指纹
        current_fingerprint = self.fingerprint_generator.generate_fingerprint(item)
        
        # 这里可以与存储的历史指纹进行比较
        # 如果使用Redis存储历史指纹
        # historical_fingerprint = self.redis_conn.get(f"fingerprint:{item.get('url', '')}")
        
        # 如果指纹不同,说明内容发生了变化
        # if current_fingerprint != historical_fingerprint:
        #     # 标记为更新内容
        #     item['content_updated'] = True
        
        # 存储当前指纹
        # self.redis_conn.setex(f"fingerprint:{item.get('url', '')}", 86400*30, current_fingerprint)
        
        item['fingerprint'] = current_fingerprint
        return item

语义去重算法

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

class SemanticDuplicatesPipeline:
    """
    语义去重Pipeline
    """
    
    def __init__(self, similarity_threshold=0.8):
        self.similarity_threshold = similarity_threshold
        self.content_vectors = []  # 存储已处理内容的向量
        self.content_texts = []    # 存储已处理的内容文本
        self.vectorizer = TfidfVectorizer(
            max_features=1000,
            stop_words='english',
            ngram_range=(1, 2)
        )
    
    def process_item(self, item, spider):
        """
        语义去重处理
        """
        # 提取主要内容
        content = self.extract_main_content(item)
        
        if not content:
            return item
        
        # 向量化当前内容
        current_vector = self.vectorizer.fit_transform([content])
        
        # 与历史内容比较
        if self.content_vectors:
            similarities = cosine_similarity(current_vector, np.vstack(self.content_vectors))
            max_similarity = np.max(similarities)
            
            if max_similarity > self.similarity_threshold:
                raise DropItem(f"Semantic duplicate detected with similarity: {max_similarity:.3f}")
        
        # 存储当前内容向量
        if len(self.content_vectors) == 0:
            self.content_vectors.append(current_vector.toarray()[0])
        else:
            self.content_vectors.append(current_vector.toarray()[0])
        
        self.content_texts.append(content)
        
        return item
    
    def extract_main_content(self, item):
        """
        提取项目的主要内容
        """
        adapter = ItemAdapter(item)
        
        content_fields = ['title', 'content', 'description', 'text']
        
        for field in content_fields:
            if field in adapter:
                content = adapter[field]
                if content and str(content).strip():
                    return str(content)[:2000]  # 限制长度
        
        return ""

增量抓取策略

时间戳增量抓取

import time
from datetime import datetime, timedelta
import redis

class TimestampIncrementalSpider:
    """
    基于时间戳的增量抓取Spider
    """
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.redis_conn = redis.Redis(
            host=self.settings.get('REDIS_HOST', 'localhost'),
            port=self.settings.getint('REDIS_PORT', 6379),
            db=self.settings.getint('REDIS_DB', 0)
        )
    
    def get_last_crawl_time(self, source):
        """
        获取上次抓取时间
        """
        last_time = self.redis_conn.get(f"last_crawl_time:{source}")
        if last_time:
            return float(last_time)
        return 0  # 如果没有记录,从最开始
    
    def set_last_crawl_time(self, source):
        """
        设置本次抓取时间
        """
        self.redis_conn.set(f"last_crawl_time:{source}", time.time())
    
    def parse_list_page(self, response):
        """
        解析列表页面,只抓取新增内容
        """
        last_crawl_time = self.get_last_crawl_time(self.name)
        
        for item_selector in response.css('div.item'):
            # 提取发布时间
            pub_time_str = item_selector.css('.pub-time::text').get()
            if pub_time_str:
                try:
                    pub_time = datetime.strptime(pub_time_str, '%Y-%m-%d %H:%M:%S').timestamp()
                    
                    # 只抓取发布时间晚于上次抓取时间的内容
                    if pub_time > last_crawl_time:
                        detail_url = item_selector.css('a::attr(href)').get()
                        yield response.follow(detail_url, callback=self.parse_detail)
                except ValueError:
                    # 如果时间格式不对,仍然抓取
                    detail_url = item_selector.css('a::attr(href)').get()
                    yield response.follow(detail_url, callback=self.parse_detail)
        
        # 设置本次抓取时间
        self.set_last_crawl_time(self.name)

版本号增量抓取

import redis

class VersionIncrementalSpider:
    """
    基于版本号的增量抓取Spider
    """
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.redis_conn = redis.Redis(
            host=self.settings.get('REDIS_HOST', 'localhost'),
            port=self.settings.getint('REDIS_PORT', 6379),
            db=self.settings.getint('REDIS_DB', 0)
        )
    
    def parse_list_page(self, response):
        """
        解析列表页面,基于版本号判断是否需要抓取
        """
        for item_selector in response.css('div.item'):
            item_id = item_selector.css('.item-id::text').get()
            current_version = item_selector.css('.version::text').get()
            
            if item_id and current_version:
                stored_version = self.redis_conn.get(f"item_version:{item_id}")
                
                if not stored_version or int(current_version) > int(stored_version):
                    # 版本号更新,需要重新抓取
                    detail_url = item_selector.css('a::attr(href)').get()
                    yield response.follow(
                        detail_url, 
                        callback=self.parse_detail,
                        meta={'item_id': item_id, 'version': current_version}
                    )
                    
                    # 更新存储的版本号
                    self.redis_conn.set(f"item_version:{item_id}", current_version)
                else:
                    # 版本未更新,跳过
                    self.logger.info(f"Item {item_id} version unchanged, skipping")

智能增量抓取

import redis
import time
from datetime import datetime, timedelta

class SmartIncrementalSpider:
    """
    智能增量抓取Spider
    根据内容更新频率动态调整抓取策略
    """
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.redis_conn = redis.Redis(
            host=self.settings.get('REDIS_HOST', 'localhost'),
            port=self.settings.getint('REDIS_PORT', 6379),
            db=self.settings.getint('REDIS_DB', 0)
        )
    
    def should_crawl_url(self, url, content_hash=None):
        """
        判断是否应该抓取URL
        """
        # 获取URL的抓取记录
        record_key = f"crawl_record:{url}"
        record = self.redis_conn.hgetall(record_key)
        
        if not record:
            # 首次抓取
            return True
        
        last_crawl_time = float(record.get(b'last_crawl_time', 0))
        crawl_frequency = int(record.get(b'crawl_frequency', 3600))  # 默认1小时
        content_changed = record.get(b'content_changed', b'0') == b'1'
        
        current_time = time.time()
        
        # 如果内容最近发生变化,缩短抓取间隔
        if content_changed and current_time - last_crawl_time < crawl_frequency / 2:
            return False
        
        # 如果超过了抓取间隔,进行抓取
        if current_time - last_crawl_time >= crawl_frequency:
            return True
        
        return False
    
    def update_crawl_record(self, url, content_hash=None, content_changed=False):
        """
        更新抓取记录
        """
        record_key = f"crawl_record:{url}"
        
        # 更新最后抓取时间
        self.redis_conn.hset(record_key, 'last_crawl_time', time.time())
        
        # 更新内容哈希
        if content_hash:
            old_hash = self.redis_conn.hget(record_key, 'content_hash')
            if old_hash and old_hash.decode('utf-8') != content_hash:
                content_changed = True
            self.redis_conn.hset(record_key, 'content_hash', content_hash)
        
        # 更新内容变化标记
        self.redis_conn.hset(record_key, 'content_changed', int(content_changed))
        
        # 根据内容变化频率调整抓取频率
        if content_changed:
            # 内容变化频繁,增加抓取频率
            current_freq = int(self.redis_conn.hget(record_key, 'crawl_frequency') or 3600)
            new_freq = max(300, current_freq // 2)  # 最短5分钟
            self.redis_conn.hset(record_key, 'crawl_frequency', new_freq)
        else:
            # 内容稳定,减少抓取频率
            current_freq = int(self.redis_conn.hget(record_key, 'crawl_frequency') or 3600)
            new_freq = min(86400, current_freq * 2)  # 最长1天
            self.redis_conn.hset(record_key, 'crawl_frequency', new_freq)
    
    def parse_list_page(self, response):
        """
        智能解析列表页面
        """
        for item_selector in response.css('div.item'):
            detail_url = item_selector.css('a::attr(href)').get()
            
            if detail_url and self.should_crawl_url(detail_url):
                yield response.follow(detail_url, callback=self.parse_detail)
            else:
                self.logger.info(f"Skipping URL based on incremental logic: {detail_url}")

分布式去重方案

分布式Redis去重

import redis
from redis.sentinel import Sentinel
import hashlib
from scrapy.exceptions import DropItem

class DistributedRedisDuplicatesPipeline:
    """
    分布式Redis去重Pipeline
    支持Redis集群和哨兵模式
    """
    
    def __init__(self, redis_hosts, redis_port, redis_db, use_sentinel=False, master_name='mymaster'):
        self.redis_hosts = redis_hosts
        self.redis_port = redis_port
        self.redis_db = redis_db
        self.use_sentinel = use_sentinel
        self.master_name = master_name
        
        if use_sentinel:
            sentinel = Sentinel([(host, redis_port) for host in redis_hosts])
            self.redis_conn = sentinel.master_for(master_name, db=redis_db)
        else:
            # 使用Redis集群或单机模式
            if len(redis_hosts) > 1:
                # 简单轮询,实际应该使用Redis Cluster客户端
                self.redis_conn = redis.Redis(
                    host=redis_hosts[0],
                    port=redis_port,
                    db=redis_db
                )
            else:
                self.redis_conn = redis.Redis(
                    host=redis_hosts[0],
                    port=redis_port,
                    db=redis_db
                )
    
    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        redis_hosts = settings.getlist('REDIS_HOSTS', ['localhost'])
        return cls(
            redis_hosts=redis_hosts,
            redis_port=settings.getint('REDIS_PORT', 6379),
            redis_db=settings.getint('REDIS_DB', 0),
            use_sentinel=settings.getbool('REDIS_USE_SENTINEL', False),
            master_name=settings.get('REDIS_MASTER_NAME', 'mymaster')
        )
    
    def process_item(self, item, spider):
        """
        分布式去重处理
        """
        fingerprint = self.get_fingerprint(item)
        
        # 使用分布式锁确保原子操作
        lock_key = f"lock:{fingerprint}"
        lock_value = f"{spider.name}:{time.time()}"
        lock_timeout = 10  # 10秒锁超时
        
        # 获取分布式锁
        if self.acquire_lock(lock_key, lock_value, lock_timeout):
            try:
                # 检查是否已存在
                if self.redis_conn.exists(f"duplicate:{fingerprint}"):
                    raise DropItem(f"Distributed duplicate: {fingerprint[:16]}...")
                
                # 存储指纹
                expire_time = spider.crawler.settings.getint('DUPLICATE_EXPIRE_TIME', 86400 * 7)
                self.redis_conn.setex(f"duplicate:{fingerprint}", expire_time, b'1')
                
                return item
            finally:
                # 释放锁
                self.release_lock(lock_key, lock_value)
        else:
            # 获取锁失败,可能正在被其他实例处理
            spider.logger.warning(f"Could not acquire lock for: {fingerprint[:16]}...")
            raise DropItem(f"Lock acquisition failed: {fingerprint[:16]}...")
    
    def acquire_lock(self, lock_key, lock_value, timeout):
        """
        获取分布式锁
        """
        lua_acquire = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            redis.call("expire", KEYS[1], ARGV[2])
            return 1
        elseif redis.call("set", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) then
            return 1
        else
            return 0
        end
        """
        
        return bool(self.redis_conn.eval(lua_acquire, 1, lock_key, lock_value, timeout))
    
    def release_lock(self, lock_key, lock_value):
        """
        释放分布式锁
        """
        lua_release = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        
        self.redis_conn.eval(lua_release, 1, lock_key, lock_value)
    
    def get_fingerprint(self, item):
        """
        生成指纹
        """
        import hashlib
        url = item.get('url', '')
        if url:
            return hashlib.sha256(url.encode('utf-8')).hexdigest()
        
        item_str = str(sorted(item.items()))
        return hashlib.sha256(item_str.encode('utf-8')).hexdigest()

一致性哈希去重

import hashlib
import bisect
from collections import OrderedDict

class ConsistentHashDuplicatesPipeline:
    """
    基于一致性哈希的分布式去重Pipeline
    """
    
    def __init__(self, nodes=None, replicas=150):
        self.nodes = nodes or ['node1', 'node2', 'node3']
        self.replicas = replicas  # 每个节点的虚拟副本数
        self.ring = OrderedDict()  # 一致性哈希环
        self._sorted_keys = []
        
        self.setup_ring()
    
    def setup_ring(self):
        """
        设置一致性哈希环
        """
        for node in self.nodes:
            for i in range(self.replicas):
                virtual_key = f"{node}:{i}"
                key = self.gen_key(virtual_key.encode('utf-8'))
                self.ring[key] = node
        
        self._sorted_keys = sorted(self.ring.keys())
    
    def gen_key(self, key):
        """
        生成哈希键
        """
        return hashlib.sha256(key).hexdigest()
    
    def get_node(self, key):
        """
        获取负责该key的节点
        """
        if not self.ring:
            return None
        
        hash_key = self.gen_key(key.encode('utf-8'))
        idx = bisect.bisect(self._sorted_keys, hash_key)
        
        if idx == len(self._sorted_keys):
            idx = 0
        
        return self.ring[self._sorted_keys[idx]]
    
    def process_item(self, item, spider):
        """
        一致性哈希去重处理
        """
        fingerprint = self.get_fingerprint(item)
        
        # 根据指纹确定负责的节点
        target_node = self.get_node(fingerprint)
        
        # 这里可以连接到对应的Redis实例或其他存储
        # 实际应用中需要维护到各个节点的连接
        redis_key = f"duplicate:{target_node}:{fingerprint}"
        
        # 简化实现:使用全局Redis连接
        # 实际应该根据target_node选择对应的Redis实例
        global_redis = spider.crawler.settings.get('GLOBAL_REDIS_CONN')
        
        if global_redis.exists(redis_key):
            raise DropItem(f"Consistent hash duplicate: {fingerprint[:16]}...")
        
        expire_time = spider.crawler.settings.getint('DUPLICATE_EXPIRE_TIME', 86400 * 7)
        global_redis.setex(redis_key, expire_time, b'1')
        
        return item
    
    def get_fingerprint(self, item):
        """
        生成指纹
        """
        import hashlib
        url = item.get('url', '')
        if url:
            return hashlib.sha256(url.encode('utf-8')).hexdigest()
        
        item_str = str(sorted(item.items()))
        return hashlib.sha256(item_str.encode('utf-8')).hexdigest()

性能优化策略

批量操作优化

import redis
import time
from collections import deque
import hashlib

class BatchOptimizedDuplicatesPipeline:
    """
    批量操作优化的去重Pipeline
    """
    
    def __init__(self, redis_host, redis_port, redis_db, batch_size=1000, flush_interval=5):
        self.redis_host = redis_host
        self.redis_port = redis_port
        self.redis_db = redis_db
        
        self.redis_conn = redis.Redis(
            host=self.redis_host,
            port=self.redis_port,
            db=self.redis_db,
            decode_responses=False
        )
        
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        
        self.pending_items = deque()
        self.last_flush_time = time.time()
        
        # 统计信息
        self.processed_count = 0
        self.duplicate_count = 0
    
    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        return cls(
            redis_host=settings.get('REDIS_HOST', 'localhost'),
            redis_port=settings.getint('REDIS_PORT', 6379),
            redis_db=settings.getint('REDIS_DB', 0),
            batch_size=settings.getint('DEDUP_BATCH_SIZE', 1000),
            flush_interval=settings.getint('DEDUP_FLUSH_INTERVAL', 5)
        )
    
    def process_item(self, item, spider):
        """
        批量处理项目
        """
        fingerprint = self.get_fingerprint(item)
        
        # 检查是否在内存缓存中(快速去重)
        if hasattr(self, 'memory_cache'):
            if fingerprint in self.memory_cache:
                self.duplicate_count += 1
                raise DropItem(f"In-memory duplicate: {fingerprint[:16]}...")
        
        # 添加到待处理队列
        self.pending_items.append((fingerprint, item))
        
        # 检查是否需要批量处理
        current_time = time.time()
        if (len(self.pending_items) >= self.batch_size or 
            current_time - self.last_flush_time >= self.flush_interval):
            self.flush_batch(spider)
        
        self.processed_count += 1
        return item
    
    def flush_batch(self, spider):
        """
        批量处理待处理项目
        """
        if not self.pending_items:
            return
        
        # 批量检查Redis中是否已存在
        fingerprints = [item[0] for item in self.pending_items]
        
        # 使用管道批量操作
        pipe = self.redis_conn.pipeline()
        for fp in fingerprints:
            pipe.exists(f"duplicate:{fp}")
        results = pipe.execute()
        
        # 处理结果
        items_to_store = []
        for i, exists in enumerate(results):
            fingerprint, item = self.pending_items[i]
            
            if exists:
                # 重复项,记录统计信息
                self.duplicate_count += 1
                spider.logger.debug(f"Duplicate found: {fingerprint[:16]}...")
            else:
                # 新项目,准备存储
                items_to_store.append((fingerprint, item))
        
        # 批量存储新项目
        if items_to_store:
            pipe = self.redis_conn.pipeline()
            expire_time = spider.crawler.settings.getint('DUPLICATE_EXPIRE_TIME', 86400 * 7)
            
            for fingerprint, item in items_to_store:
                pipe.setex(f"duplicate:{fingerprint}", expire_time, b'1')
            
            pipe.execute()
        
        # 清空待处理队列
        self.pending_items.clear()
        self.last_flush_time = time.time()
        
        spider.logger.info(f"Batch processed: {len(fingerprints)} items, "
                          f"{len(items_to_store)} new, {len(fingerprints) - len(items_to_store)} duplicates")
    
    def close_spider(self, spider):
        """
        关闭爬虫时处理剩余项目
        """
        self.flush_batch(spider)
        
        spider.logger.info(f"Pipeline stats - Processed: {self.processed_count}, "
                          f"Duplicates: {self.duplicate_count}, "
                          f"Uniques: {self.processed_count - self.duplicate_count}")
        
        self.redis_conn.close()
    
    def get_fingerprint(self, item):
        """
        生成指纹
        """
        import hashlib
        url = item.get('url', '')
        if url:
            return hashlib.sha256(url.encode('utf-8')).hexdigest()
        
        item_str = str(sorted(item.items()))
        return hashlib.sha256(item_str.encode('utf-8')).hexdigest()

内存缓存优化

import hashlib
import time
from collections import OrderedDict
from scrapy.exceptions import DropItem

class MemoryCachedDuplicatesPipeline:
    """
    内存缓存优化的去重Pipeline
    """
    
    def __init__(self, cache_size=10000, cache_expire=3600):
        self.cache_size = cache_size
        self.cache_expire = cache_expire
        self.cache = OrderedDict()  # LRU缓存
        
        # 统计信息
        self.hit_count = 0
        self.miss_count = 0
        self.cache_hits = 0
        self.cache_misses = 0
    
    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        return cls(
            cache_size=settings.getint('DEDUP_CACHE_SIZE', 10000),
            cache_expire=settings.getint('DEDUP_CACHE_EXPIRE', 3600)
        )
    
    def process_item(self, item, spider):
        """
        使用内存缓存进行快速去重检查
        """
        fingerprint = self.get_fingerprint(item)
        current_time = time.time()
        
        # 检查内存缓存
        if fingerprint in self.cache:
            cached_time = self.cache[fingerprint]
            
            # 检查是否过期
            if current_time - cached_time < self.cache_expire:
                self.cache_hits += 1
                raise DropItem(f"Cached duplicate: {fingerprint[:16]}...")
            else:
                # 缓存过期,从缓存中移除
                del self.cache[fingerprint]
        
        # 内存缓存未命中,添加到缓存
        self.cache[fingerprint] = current_time
        
        # LRU: 如果缓存满了,移除最久未使用的项
        if len(self.cache) > self.cache_size:
            self.cache.pop(next(iter(self.cache)))
        
        self.cache_misses += 1
        return item
    
    def get_fingerprint(self, item):
        """
        生成指纹
        """
        import hashlib
        url = item.get('url', '')
        if url:
            return hashlib.sha256(url.encode('utf-8')).hexdigest()
        
        item_str = str(sorted(item.items()))
        return hashlib.sha256(item_str.encode('utf-8')).hexdigest()
    
    def close_spider(self, spider):
        """
        关闭爬虫时的统计信息
        """
        total_ops = self.cache_hits + self.cache_misses
        hit_rate = self.cache_hits / total_ops if total_ops > 0 else 0
        
        spider.logger.info(f"Cache stats - Hits: {self.cache_hits}, Misses: {self.cache_misses}, "
                          f"Hit rate: {hit_rate:.2%}, Final cache size: {len(self.cache)}")

常见问题与解决方案

问题1: 内存溢出

现象: 长时间运行后内存占用过高 解决方案:

class MemoryEfficientDuplicatesPipeline:
    def __init__(self):
        # 使用固定大小的集合,定期清理
        self.seen_items = set()
        self.max_items = 100000
        self.cleanup_threshold = 90000
    
    def process_item(self, item, spider):
        fingerprint = self.get_fingerprint(item)
        
        if len(self.seen_items) > self.cleanup_threshold:
            # 定期清理以控制内存使用
            self.seen_items.clear()
        
        if fingerprint in self.seen_items:
            raise DropItem("Duplicate item")
        
        if len(self.seen_items) < self.max_items:
            self.seen_items.add(fingerprint)
        
        return item

问题2: Redis连接问题

现象: Redis连接超时或断开 解决方案:

import redis
from redis.backoff import ExponentialBackoff
from redis.retry import Retry

class RobustRedisDuplicatesPipeline:
    def __init__(self):
        self.redis_conn = redis.Redis(
            host='localhost',
            port=6379,
            db=0,
            socket_connect_timeout=5,
            socket_timeout=5,
            retry_on_timeout=True,
            retry=Retry(ExponentialBackoff(cap=10), 3),
            health_check_interval=30
        )
    
    def process_item(self, item, spider):
        try:
            fingerprint = self.get_fingerprint(item)
            
            if self.redis_conn.exists(fingerprint):
                raise DropItem("Duplicate item")
            
            self.redis_conn.setex(fingerprint, 86400*7, b'1')
            
        except redis.ConnectionError:
            spider.logger.error("Redis connection error, skipping deduplication")
            # 连接错误时跳过去重,避免爬虫停止
            pass
        except redis.TimeoutError:
            spider.logger.warning("Redis timeout, continuing without deduplication")
            pass
        
        return item

问题3: 去重效果不佳

现象: 仍有大量重复数据通过 解决方案:

class ComprehensiveDuplicatesPipeline:
    def __init__(self):
        self.strategies = [
            self.url_strategy,
            self.content_strategy,
            self.field_strategy
        ]
    
    def process_item(self, item, spider):
        # 多重策略验证
        for strategy in self.strategies:
            if strategy(item, spider):
                raise DropItem(f"Duplicate detected by {strategy.__name__}")
        
        # 如果都没有发现重复,则通过
        return item
    
    def url_strategy(self, item, spider):
        # URL去重策略
        url = item.get('url')
        if url:
            # 检查URL是否已存在
            pass
        return False
    
    def content_strategy(self, item, spider):
        # 内容相似度去重策略
        return False
    
    def field_strategy(self, item, spider):
        # 特定字段组合去重策略
        return False

最佳实践建议

设计原则

  1. 分层去重: 内存缓存 + Redis存储
  2. 性能平衡: 在准确性和性能间找平衡
  3. 容错处理: Redis故障时的降级策略
  4. 资源管理: 内存和存储的合理使用

部署建议

  1. 监控: 实施去重效果监控
  2. 调优: 根据数据特点调整参数
  3. 备份: 定期备份去重数据

💡 核心要点: 数据去重和增量更新是大规模爬虫系统的基础设施,通过合理的架构设计和优化策略,可以显著提升爬虫效率和数据质量。


SEO优化建议

为了提高这篇数据去重与增量更新教程在搜索引擎中的排名,以下是几个关键的SEO优化建议:

标题优化

  • 主标题: 包含核心关键词"数据去重", "增量更新", "Redis", "指纹校验"
  • 二级标题: 每个章节标题都包含相关的长尾关键词
  • H1-H6层次结构: 保持正确的标题层级,便于搜索引擎理解内容结构

内容优化

  • 关键词密度: 在内容中自然地融入关键词如"Scrapy", "数据去重", "增量更新", "Redis", "布隆过滤器", "爬虫框架"等
  • 元描述: 在文章开头的元数据中包含吸引人的描述
  • 内部链接: 链接到其他相关教程,如Pipeline管道实战
  • 外部权威链接: 引用官方文档和权威资源

技术SEO

  • 页面加载速度: 优化代码块和图片加载
  • 移动端适配: 确保在移动设备上良好显示
  • 结构化数据: 使用适当的HTML标签和语义化元素

用户体验优化

  • 内容可读性: 使用清晰的段落结构和代码示例
  • 互动元素: 提供实际可运行的代码示例
  • 更新频率: 定期更新内容以保持时效性

🔗 相关教程推荐

🏷️ 标签云: Scrapy 数据去重 增量更新 Redis 指纹校验 布隆过滤器 爬虫框架 网络爬虫 Python爬虫