#Pipeline管道实战完全指南 - 数据清洗、验证、存储与处理流程详解
📂 所属阶段:第二阶段 — 数据流转(数据处理篇)
🔗 相关章节:Item 与 Item Loader · Downloader Middleware
爬取到的零散网页数据,如何变成干净、规范、可复用的商业级数据?Scrapy 的 Pipeline 组件就是你的专属「数据加工流水线」——本文教你从0到1搭出稳定高效的处理链路!
#目录
#Pipeline基础入门
#作用与核心价值
Pipeline 是 Item 被 Spider 提取后,对其进行“加工→质检→入库/输出”的链式组件,核心解决:
- 脏数据清洗、格式统一
- 数据质量验证、不合格项丢弃
- 多种介质存储(本地/数据库/缓存)
- 重复数据过滤
#工作流程简化版
flowchart LR
A[Spider提取Item] --> B[按优先级进入Pipeline链]
B --> C[各组件依次处理/通过/丢弃]
C --> D{最后组件}
D -->|存储| E[完成]
D -->|丢弃| F[记录日志]#配置优先级与生命周期
#1. 优先级配置(settings.py)
这是 Pipeline 最容易踩坑的地方——数字越小优先级越高,越先执行!
# ✅ 正确的顺序:质检→清洗→去重→存储
ITEM_PIPELINES = {
# 先验证必填字段(缺了直接扔)
'myproject.pipelines.RequiredFieldsPipeline': 300,
# 再统一格式、清洗脏数据
'myproject.pipelines.CleaningPipeline': 400,
# 然后去掉重复项
'myproject.pipelines.DuplicateFilterPipeline': 500,
# 最后存库,避免浪费资源
'myproject.pipelines.MysqlPipeline': 600,
}#2. 核心生命周期方法
每个 Pipeline 类都要包含 process_item,其他3个可选:
| 方法名 | 触发时机 | 用途示例 |
|---|---|---|
open_spider | 爬虫启动时 | 连接数据库、创建输出文件 |
close_spider | 爬虫关闭时 | 断开数据库、关闭文件、写统计 |
process_item | 每个Item必进 | 核心处理逻辑 |
from_crawler | Pipeline被settings加载前调用 | 从settings读取自定义参数 |
#核心实战场景
以下是最常用的 Pipeline 实现,直接复制即可复用!
#场景1:数据清洗Pipeline
处理文本、价格、URL 三类高频脏数据:
import re
from itemadapter import ItemAdapter
from urllib.parse import urljoin
class CleaningPipeline:
def __init__(self, base_url=None):
self.base_url = base_url
@classmethod
def from_crawler(cls, crawler):
# 从settings读取自定义基准URL(补全相对URL用)
return cls(base_url=crawler.settings.get('DEFAULT_BASE_URL'))
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 1. 文本清理:去多余空白、保留中文/英文标点/数字
for field in ['title', 'content', 'author']:
if adapter.get(field):
text = re.sub(r'\s+', ' ', str(adapter[field]).strip())
text = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:()""''-]', '', text)
adapter[field] = text
# 2. 价格清洗:提取纯数字(处理¥199.99 / 原价299.00元这类)
for field in ['price', 'original_price']:
if adapter.get(field):
price_str = str(adapter[field]).replace(',', '')
numbers = re.findall(r'\d+(?:\.\d+)?', price_str)
adapter[field] = float(numbers[0]) if numbers else None
# 3. URL补全与标准化
if adapter.get('url'):
url = adapter['url'].strip()
if not url.startswith(('http://', 'https://')) and self.base_url:
url = urljoin(self.base_url, url)
adapter['url'] = url
return item#场景2:必填字段验证+丢弃Pipeline
from scrapy.exceptions import DropItem
from itemadapter import ItemAdapter
class RequiredFieldsPipeline:
def __init__(self):
# 可在此处修改必填字段
self.required = ['title', 'url']
def process_item(self, item, spider):
adapter = ItemAdapter(item)
for field in self.required:
if not adapter.get(field):
raise DropItem(f"丢弃缺少必填字段「{field}」的Item")
return item#场景3:MD5去重Pipeline
用Item的核心字段做MD5哈希判断重复:
import hashlib
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
class DuplicateFilterPipeline:
def __init__(self):
self.seen = set()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 选择最能唯一标识一条数据的字段组合
key = f"{adapter.get('title', '')}|{adapter.get('url', '')}"
item_hash = hashlib.md5(key.encode()).hexdigest()
if item_hash in self.seen:
raise DropItem(f"丢弃重复Item:{adapter.get('title', '')[:20]}...")
self.seen.add(item_hash)
return item#场景4:JSON存储Pipeline
import json
import os
from itemadapter import ItemAdapter
from datetime import datetime
class JsonPipeline:
def __init__(self, output_dir='data'):
self.output_dir = output_dir
@classmethod
def from_crawler(cls, crawler):
return cls(output_dir=crawler.settings.get('JSON_OUTPUT_DIR', 'data'))
def open_spider(self, spider):
os.makedirs(self.output_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.file = open(os.path.join(self.output_dir, f'{spider.name}_{timestamp}.json'), 'w', encoding='utf-8')
self.file.write('[\n')
self.first = True
def close_spider(self, spider):
self.file.write('\n]')
self.file.close()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
adapter['scraped_at'] = datetime.now().isoformat()
line = json.dumps(adapter.asdict(), ensure_ascii=False, indent=2)
if not self.first:
self.file.write(',\n')
else:
self.first = False
self.file.write(' ' + line)
return item#性能与错误优化
#1. 批量存储优化(适用于MySQL/MongoDB)
避免单条插入浪费数据库连接资源:
from collections import deque
from itemadapter import ItemAdapter
import time
import pymysql
class BatchMysqlPipeline:
def __init__(self, host, db, user, pwd, port, batch=100, flush=30):
self.host, self.db, self.user, self.pwd, self.port = host, db, user, pwd, port
self.batch_size = batch
self.flush_sec = flush
self.buffer = deque()
self.last_flush = time.time()
@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
return cls(
host=settings.get('MYSQL_HOST', 'localhost'),
db=settings.get('MYSQL_DATABASE', 'scrapy'),
user=settings.get('MYSQL_USER', 'root'),
pwd=settings.get('MYSQL_PASSWORD', ''),
port=settings.get('MYSQL_PORT', 3306),
batch=settings.get('MYSQL_BATCH_SIZE', 100),
flush=settings.get('MYSQL_FLUSH_SEC', 30)
)
def open_spider(self, spider):
self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user,
password=self.pwd, database=self.db, charset='utf8mb4')
self.cursor = self.conn.cursor()
def close_spider(self, spider):
self._flush_buffer(spider)
self.conn.close()
def process_item(self, item, spider):
self.buffer.append(ItemAdapter(item).asdict())
if len(self.buffer) >= self.batch_size or time.time() - self.last_flush >= self.flush_sec:
self._flush_buffer(spider)
return item
def _flush_buffer(self, spider):
if not self.buffer:
return
# 批量插入示例(根据自己的表结构修改SQL)
keys = list(self.buffer[0].keys())
values = [tuple(item[k] for k in keys) for item in self.buffer]
placeholders = ', '.join(['%s'] * len(keys))
sql = f"INSERT INTO scraped_data ({', '.join(keys)}) VALUES ({placeholders})"
try:
self.cursor.executemany(sql, values)
self.conn.commit()
spider.logger.info(f"批量插入 {len(values)} 条数据成功")
except Exception as e:
self.conn.rollback()
spider.logger.error(f"批量插入失败:{e}")
self.buffer.clear()
self.last_flush = time.time()#2. 基础错误处理
from scrapy.exceptions import DropItem
import traceback
from itemadapter import ItemAdapter
class ErrorHandlingPipeline:
def process_item(self, item, spider):
try:
# 在这里包裹可能出错的处理逻辑
return item
except Exception as e:
spider.logger.error(f"处理Item出错:{e},详情:{traceback.format_exc()}")
# 可选:记录失败Item
with open('failed_items.txt', 'a', encoding='utf-8') as f:
f.write(f"{ItemAdapter(item).asdict()}\n")
raise DropItem(f"因异常丢弃Item:{e}")#常见问题速查
#Q1:Pipeline没有生效?
检查两点:
- 类名和路径是否和
settings.py里的ITEM_PIPELINES完全一致 - 是否在代码中抛出了未捕获的异常(但没在
close_spider前处理)
#Q2:JSON文件最后有多余的逗号?
参考场景4的实现,用 first_item 标记控制逗号。
#Q3:去重Pipeline内存溢出?
改成Redis去重(适合分布式或大量数据),把 self.seen 换成 Redis 的 Set。
💡 核心要点:Pipeline 要遵循「单一职责」原则——每个类只做一件事,通过优先级串联起来,既清晰又好维护!

