#Scrapy数据去重与增量更新完全指南
📂 所属阶段:第二阶段 — 数据流转(数据处理篇)
🔗 相关章节:Pipeline管道实战 · 数据清洗与校验
#目录
#快速入门场景
做爬虫时最常遇到的两个痛点:
- 重复爬取同一页面/内容,浪费带宽、时间、服务器资源
- 全量重爬,无法第一时间捕获新增或变更的数据
本文会覆盖从简单单机去重到带智能调度的增量爬取的完整方案,所有代码均可直接在项目中复用。
#Redis指纹去重实现
#基础可复用Pipeline
这是最通用的Redis指纹去重方案,支持自定义指纹字段、过期时间:
import hashlib
import redis
from scrapy.exceptions import DropItem
from itemadapter import ItemAdapter
class RedisFingerprintPipeline:
"""基于Redis SHA256指纹的通用去重Pipeline"""
def __init__(self, redis_conf):
self.redis_conf = redis_conf
self.redis_conn = None
self.expire_days = redis_conf.get("expire_days", 7) # 默认指纹保留7天
self.dedup_fields = redis_conf.get("dedup_fields", ["url"]) # 默认按URL去重
@classmethod
def from_crawler(cls, crawler):
"""从Scrapy配置中读取Redis参数"""
redis_conf = {
"host": crawler.settings.get("REDIS_HOST", "localhost"),
"port": crawler.settings.getint("REDIS_PORT", 6379),
"db": crawler.settings.getint("REDIS_DB", 0),
"password": crawler.settings.get("REDIS_PASSWORD"),
"expire_days": crawler.settings.getint("DUPLICATE_EXPIRE_DAYS", 7),
"dedup_fields": crawler.settings.getlist("DUPLICATE_DEDUP_FIELDS", ["url"])
}
return cls(redis_conf)
def open_spider(self, spider):
"""爬虫启动时建立Redis连接"""
self.redis_conn = redis.Redis(
host=self.redis_conf["host"],
port=self.redis_conf["port"],
db=self.redis_conf["db"],
password=self.redis_conf["password"],
decode_responses=False # 哈希值存字节更节省空间
)
def process_item(self, item, spider):
"""核心去重逻辑"""
# 1. 生成数据指纹
fp = self._gen_fingerprint(item)
# 2. 检查Redis中是否已存在
if self.redis_conn.exists(fp):
spider.logger.warning(f"检测到重复数据,指纹前缀: {fp.decode()[:16]}")
raise DropItem(f"Duplicate item: {fp.decode()[:16]}")
# 3. 存入新指纹并设置过期
self.redis_conn.setex(fp, 86400 * self.expire_days, b"1")
return item
def close_spider(self, spider):
"""爬虫关闭时断开连接"""
if self.redis_conn:
self.redis_conn.close()
def _gen_fingerprint(self, item):
"""生成SHA256数据指纹"""
adapter = ItemAdapter(item)
# 提取配置的去重字段并排序(避免字段顺序不同导致指纹不同)
parts = []
for field in sorted(self.dedup_fields):
if field in adapter:
val = adapter[field]
if val is not None:
parts.append(f"{field}={str(val)}")
# 如果没有配置字段,用整个item的排序后的字典生成
if not parts:
item_sorted = str(sorted(adapter.asdict().items()))
parts.append(item_sorted)
# 拼接并加密
return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest().encode("utf-8")#配置文件示例
在 settings.py 中添加以下配置:
# Pipeline启用
ITEM_PIPELINES = {
"myproject.pipelines.RedisFingerprintPipeline": 300, # 300代表优先级,数字越小越先执行
}
# Redis去重配置
REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PASSWORD = None
DUPLICATE_EXPIRE_DAYS = 7 # 7天后指纹自动清理,防止Redis内存溢出
DUPLICATE_DEDUP_FIELDS = ["url", "title"] # 按URL+标题组合去重,提高准确性#URL标准化与请求级去重
#URL标准化Pipeline
同一页面可能有多个URL形式(例如有无www、有无末尾斜杠、追踪参数不同),需要先标准化再去重:
from urllib.parse import urlparse, parse_qs, urlencode
class URLNormalizationPipeline:
"""URL标准化前置Pipeline,放在去重Pipeline之前"""
def process_item(self, item, spider):
if "url" not in item:
return item
item["url"] = self._normalize(item["url"])
return item
def _normalize(self, url):
"""标准化URL"""
parsed = urlparse(url)
# 1. 协议/域名转小写
scheme = parsed.scheme.lower()
netloc = parsed.netloc.lower()
# 2. 移除末尾斜杠(首页除外)
path = parsed.path.rstrip("/") if parsed.path != "/" else "/"
# 3. 按字母排序查询参数
qs = parse_qs(parsed.query, keep_blank_values=True)
# 4. 移除常见追踪参数
tracking = ["utm_*", "gclid", "fbclid", "ref", "from", "via"]
filtered_qs = {k: v for k, v in qs.items() if not any(k.startswith(t.replace("*", "")) for t in tracking)}
sorted_qs = urlencode(sorted(filtered_qs.items()), doseq=True)
# 5. 重建URL
normalized = f"{scheme}://{netloc}{path}"
if sorted_qs:
normalized += f"?{sorted_qs}"
return normalized注意优先级调整:在 settings.py 中把标准化Pipeline的优先级设为 200,比去重Pipeline的300高:
ITEM_PIPELINES = {
"myproject.pipelines.URLNormalizationPipeline": 200,
"myproject.pipelines.RedisFingerprintPipeline": 300,
}#本地/Redis布隆过滤器优化
布隆过滤器是一种高效的概率型数据结构,用于快速判断元素是否存在,适合超大规模数据去重(百万/千万级),缺点是有极低的误判率(不存在的元素可能被误判为存在,但存在的绝不会误判为不存在)。
#本地布隆过滤器(适合单机千万级以下)
依赖 mmh3 和 bitarray 库:
pip install mmh3 bitarrayimport mmh3
from bitarray import bitarray
import time
from scrapy.exceptions import DropItem
class LocalBloomFilterPipeline:
"""本地内存布隆过滤器Pipeline"""
def __init__(self, capacity=10_000_000, error_rate=0.001):
self.capacity = capacity # 预期存储的指纹数量
self.error_rate = error_rate # 预期误判率(0.1%)
self._init_bloom()
def _init_bloom(self):
"""计算位数组大小和哈希函数数量并初始化"""
# 简化公式(无数学公式版说明:根据容量和误判率算出合适的内存大小和哈希次数)
m = int(-self.capacity * (0.0014426950408889634 if self.error_rate == 0.001 else 0.007213475204444817)) # 预填0.1%和1%的常用系数
k = int(0.6931471805599453 * m / self.capacity) # ln2≈0.693的简化系数
self.m = m
self.k = k
self.bitarray = bitarray(m)
self.bitarray.setall(0)
@classmethod
def from_crawler(cls, crawler):
return cls(
capacity=crawler.settings.getint("BLOOM_CAPACITY", 10_000_000),
error_rate=crawler.settings.getfloat("BLOOM_ERROR_RATE", 0.001)
)
def process_item(self, item, spider):
fp = item.get("url", str(sorted(ItemAdapter(item).asdict().items())))
# 快速判断是否可能存在
if all(self.bitarray[mmh3.hash(fp, i) % self.m] for i in range(self.k)):
spider.logger.warning(f"布隆过滤器可能检测到重复: {fp[:50]}")
raise DropItem(f"Possible duplicate: {fp[:50]}")
# 添加到布隆过滤器
for i in range(self.k):
self.bitarray[mmh3.hash(fp, i) % self.m] = 1
return item#时间戳+智能增量抓取
增量抓取只抓取上次爬取后新增或变更的数据,大幅提升效率。这里介绍最通用的时间戳方案,以及简单的智能频率调整。
#时间戳增量Spider
import scrapy
import redis
from datetime import datetime
from myproject.items import MyItem
class IncrementalSpider(scrapy.Spider):
name = "incremental_spider"
allowed_domains = ["example.com"]
start_urls = ["https://example.com/news"]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 建立Redis连接,存上次爬取时间
self.redis_conn = redis.Redis(
host="127.0.0.1",
port=6379,
db=0,
decode_responses=True
)
self.last_crawl_key = f"last_crawl:{self.name}"
def parse(self, response):
"""解析列表页,只抓新增内容"""
# 1. 获取上次爬取时间(如果是首次爬取,设为30天前)
last_crawl_ts = float(self.redis_conn.get(self.last_crawl_key) or (datetime.now().timestamp() - 86400*30))
# 2. 解析列表中的每一条新闻
for news in response.css("div.news-item"):
# 提取发布时间(根据目标网站格式调整)
pub_time_str = news.css(".pub-time::text").get().strip()
pub_time_ts = datetime.strptime(pub_time_str, "%Y-%m-%d %H:%M").timestamp()
# 只抓发布时间晚于上次爬取的
if pub_time_ts > last_crawl_ts:
detail_url = news.css("a.title::attr(href)").get()
yield response.follow(detail_url, callback=self.parse_detail)
# 3. 更新本次爬取时间
self.redis_conn.set(self.last_crawl_key, datetime.now().timestamp())
def parse_detail(self, response):
"""解析详情页"""
item = MyItem()
item["url"] = response.url
item["title"] = response.css("h1::text").get().strip()
item["content"] = "".join(response.css("div.content p::text").getall()).strip()
yield item#核心性能优化
#内存LRU缓存 + Redis批量管道
这是最立竿见影的性能优化方案:
- 内存LRU缓存:快速过滤最近爬过的热门数据,减少Redis访问
- Redis批量管道:一次性发送多个Redis命令,减少网络往返
# 只列出优化后的核心部分,完整代码可参考基础Pipeline
def __init__(self, redis_conf):
# ... 原有代码
self.lru_cache = dict()
self.lru_max = 10000 # LRU缓存最多存10000个指纹
self.pending_fps = [] # 待批量存入的指纹
self.batch_size = 100 # 每100个指纹批量存一次
def process_item(self, item, spider):
fp = self._gen_fingerprint(item)
# 1. 先查LRU缓存
if fp in self.lru_cache:
spider.logger.debug(f"LRU缓存命中,跳过重复")
raise DropItem("LRU duplicate")
# 2. LRU未命中,查Redis
if self.redis_conn.exists(fp):
spider.logger.debug(f"Redis命中,跳过重复")
raise DropItem("Redis duplicate")
# 3. 加入LRU缓存和待批量存入队列
self.lru_cache[fp] = time.time()
# LRU淘汰最久未使用的
if len(self.lru_cache) > self.lru_max:
self.lru_cache.pop(next(iter(self.lru_cache)))
self.pending_fps.append(fp)
# 4. 达到批量大小或爬虫关闭时批量存入
if len(self.pending_fps) >= self.batch_size:
self._flush_pending(spider)
return item
def _flush_pending(self, spider):
if not self.pending_fps:
return
pipe = self.redis_conn.pipeline()
expire = 86400 * self.expire_days
for fp in self.pending_fps:
pipe.setex(fp, expire, b"1")
pipe.execute()
spider.logger.debug(f"批量存入 {len(self.pending_fps)} 个指纹到Redis")
self.pending_fps.clear()
def close_spider(self, spider):
self._flush_pending(spider)
# ... 原有代码#2个高频问题解决
#问题1:长时间运行内存溢出
原因:LRU缓存或Redis指纹未及时清理 解决:
- 调整LRU缓存大小
- 缩短Redis指纹过期时间
- 定期(如每周)手动清空Redis去重DB
#问题2:Redis连接超时/断开
原因:网络波动或Redis负载过高
解决:
在基础Pipeline的 open_spider 中配置Redis的重试机制和健康检查:
from redis.backoff import ExponentialBackoff
from redis.retry import Retry
def open_spider(self, spider):
self.redis_conn = redis.Redis(
host=self.redis_conf["host"],
port=self.redis_conf["port"],
db=self.redis_conf["db"],
password=self.redis_conf["password"],
decode_responses=False,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
retry=Retry(ExponentialBackoff(cap=10), 3), # 指数退避重试,最多3次
health_check_interval=30 # 每30秒检查一次连接健康
)💡 核心总结:
- 简单场景用 URL标准化 + SHA256 Redis指纹去重
- 超大规模场景用 本地布隆过滤器 + Redis批量管道
- 增量爬取优先用 时间戳方案,有条件再加智能频率调整
- 一定要做好 监控和容错处理,防止Redis故障导致爬虫停止

