Pipeline管道实战完全指南 - 数据清洗、验证、存储与处理流程详解

📂 所属阶段:第二阶段 — 数据流转(数据处理篇)
🔗 相关章节:Item 与 Item Loader · Downloader Middleware

爬取到的零散网页数据,如何变成干净、规范、可复用的商业级数据?Scrapy 的 Pipeline 组件就是你的专属「数据加工流水线」——本文教你从0到1搭出稳定高效的处理链路!


目录


Pipeline基础入门

作用与核心价值

Pipeline 是 Item 被 Spider 提取后,对其进行“加工→质检→入库/输出”的链式组件,核心解决:

  1. 脏数据清洗、格式统一
  2. 数据质量验证、不合格项丢弃
  3. 多种介质存储(本地/数据库/缓存)
  4. 重复数据过滤

工作流程简化版

flowchart LR
    A[Spider提取Item] --> B[按优先级进入Pipeline链]
    B --> C[各组件依次处理/通过/丢弃]
    C --> D{最后组件}
    D -->|存储| E[完成]
    D -->|丢弃| F[记录日志]

配置优先级与生命周期

1. 优先级配置(settings.py)

这是 Pipeline 最容易踩坑的地方——数字越小优先级越高,越先执行!

# ✅ 正确的顺序:质检→清洗→去重→存储
ITEM_PIPELINES = {
    # 先验证必填字段(缺了直接扔)
    'myproject.pipelines.RequiredFieldsPipeline': 300, 
    # 再统一格式、清洗脏数据
    'myproject.pipelines.CleaningPipeline': 400,    
    # 然后去掉重复项
    'myproject.pipelines.DuplicateFilterPipeline': 500,  
    # 最后存库,避免浪费资源
    'myproject.pipelines.MysqlPipeline': 600,    
}

2. 核心生命周期方法

每个 Pipeline 类都要包含 process_item,其他3个可选:

方法名触发时机用途示例
open_spider爬虫启动时连接数据库、创建输出文件
close_spider爬虫关闭时断开数据库、关闭文件、写统计
process_item每个Item必进核心处理逻辑
from_crawlerPipeline被settings加载前调用从settings读取自定义参数

核心实战场景

以下是最常用的 Pipeline 实现,直接复制即可复用

场景1:数据清洗Pipeline

处理文本、价格、URL 三类高频脏数据:

import re
from itemadapter import ItemAdapter
from urllib.parse import urljoin

class CleaningPipeline:
    def __init__(self, base_url=None):
        self.base_url = base_url

    @classmethod
    def from_crawler(cls, crawler):
        # 从settings读取自定义基准URL(补全相对URL用)
        return cls(base_url=crawler.settings.get('DEFAULT_BASE_URL'))

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        # 1. 文本清理:去多余空白、保留中文/英文标点/数字
        for field in ['title', 'content', 'author']:
            if adapter.get(field):
                text = re.sub(r'\s+', ' ', str(adapter[field]).strip())
                text = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:()""''-]', '', text)
                adapter[field] = text

        # 2. 价格清洗:提取纯数字(处理¥199.99 / 原价299.00元这类)
        for field in ['price', 'original_price']:
            if adapter.get(field):
                price_str = str(adapter[field]).replace(',', '')
                numbers = re.findall(r'\d+(?:\.\d+)?', price_str)
                adapter[field] = float(numbers[0]) if numbers else None

        # 3. URL补全与标准化
        if adapter.get('url'):
            url = adapter['url'].strip()
            if not url.startswith(('http://', 'https://')) and self.base_url:
                url = urljoin(self.base_url, url)
            adapter['url'] = url

        return item

场景2:必填字段验证+丢弃Pipeline

from scrapy.exceptions import DropItem
from itemadapter import ItemAdapter

class RequiredFieldsPipeline:
    def __init__(self):
        # 可在此处修改必填字段
        self.required = ['title', 'url']

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        for field in self.required:
            if not adapter.get(field):
                raise DropItem(f"丢弃缺少必填字段「{field}」的Item")
        return item

场景3:MD5去重Pipeline

用Item的核心字段做MD5哈希判断重复:

import hashlib
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem

class DuplicateFilterPipeline:
    def __init__(self):
        self.seen = set()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        # 选择最能唯一标识一条数据的字段组合
        key = f"{adapter.get('title', '')}|{adapter.get('url', '')}"
        item_hash = hashlib.md5(key.encode()).hexdigest()
        if item_hash in self.seen:
            raise DropItem(f"丢弃重复Item:{adapter.get('title', '')[:20]}...")
        self.seen.add(item_hash)
        return item

场景4:JSON存储Pipeline

import json
import os
from itemadapter import ItemAdapter
from datetime import datetime

class JsonPipeline:
    def __init__(self, output_dir='data'):
        self.output_dir = output_dir

    @classmethod
    def from_crawler(cls, crawler):
        return cls(output_dir=crawler.settings.get('JSON_OUTPUT_DIR', 'data'))

    def open_spider(self, spider):
        os.makedirs(self.output_dir, exist_ok=True)
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        self.file = open(os.path.join(self.output_dir, f'{spider.name}_{timestamp}.json'), 'w', encoding='utf-8')
        self.file.write('[\n')
        self.first = True

    def close_spider(self, spider):
        self.file.write('\n]')
        self.file.close()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        adapter['scraped_at'] = datetime.now().isoformat()
        line = json.dumps(adapter.asdict(), ensure_ascii=False, indent=2)
        if not self.first:
            self.file.write(',\n')
        else:
            self.first = False
        self.file.write('  ' + line)
        return item

性能与错误优化

1. 批量存储优化(适用于MySQL/MongoDB)

避免单条插入浪费数据库连接资源:

from collections import deque
from itemadapter import ItemAdapter
import time
import pymysql

class BatchMysqlPipeline:
    def __init__(self, host, db, user, pwd, port, batch=100, flush=30):
        self.host, self.db, self.user, self.pwd, self.port = host, db, user, pwd, port
        self.batch_size = batch
        self.flush_sec = flush
        self.buffer = deque()
        self.last_flush = time.time()

    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        return cls(
            host=settings.get('MYSQL_HOST', 'localhost'),
            db=settings.get('MYSQL_DATABASE', 'scrapy'),
            user=settings.get('MYSQL_USER', 'root'),
            pwd=settings.get('MYSQL_PASSWORD', ''),
            port=settings.get('MYSQL_PORT', 3306),
            batch=settings.get('MYSQL_BATCH_SIZE', 100),
            flush=settings.get('MYSQL_FLUSH_SEC', 30)
        )

    def open_spider(self, spider):
        self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user,
                                     password=self.pwd, database=self.db, charset='utf8mb4')
        self.cursor = self.conn.cursor()

    def close_spider(self, spider):
        self._flush_buffer(spider)
        self.conn.close()

    def process_item(self, item, spider):
        self.buffer.append(ItemAdapter(item).asdict())
        if len(self.buffer) >= self.batch_size or time.time() - self.last_flush >= self.flush_sec:
            self._flush_buffer(spider)
        return item

    def _flush_buffer(self, spider):
        if not self.buffer:
            return
        # 批量插入示例(根据自己的表结构修改SQL)
        keys = list(self.buffer[0].keys())
        values = [tuple(item[k] for k in keys) for item in self.buffer]
        placeholders = ', '.join(['%s'] * len(keys))
        sql = f"INSERT INTO scraped_data ({', '.join(keys)}) VALUES ({placeholders})"
        try:
            self.cursor.executemany(sql, values)
            self.conn.commit()
            spider.logger.info(f"批量插入 {len(values)} 条数据成功")
        except Exception as e:
            self.conn.rollback()
            spider.logger.error(f"批量插入失败:{e}")
        self.buffer.clear()
        self.last_flush = time.time()

2. 基础错误处理

from scrapy.exceptions import DropItem
import traceback
from itemadapter import ItemAdapter

class ErrorHandlingPipeline:
    def process_item(self, item, spider):
        try:
            # 在这里包裹可能出错的处理逻辑
            return item
        except Exception as e:
            spider.logger.error(f"处理Item出错:{e},详情:{traceback.format_exc()}")
            # 可选:记录失败Item
            with open('failed_items.txt', 'a', encoding='utf-8') as f:
                f.write(f"{ItemAdapter(item).asdict()}\n")
            raise DropItem(f"因异常丢弃Item:{e}")

常见问题速查

Q1:Pipeline没有生效?

检查两点:

  1. 类名和路径是否和 settings.py 里的 ITEM_PIPELINES 完全一致
  2. 是否在代码中抛出了未捕获的异常(但没在 close_spider 前处理)

Q2:JSON文件最后有多余的逗号?

参考场景4的实现,用 first_item 标记控制逗号。

Q3:去重Pipeline内存溢出?

改成Redis去重(适合分布式或大量数据),把 self.seen 换成 Redis 的 Set。


💡 核心要点:Pipeline 要遵循「单一职责」原则——每个类只做一件事,通过优先级串联起来,既清晰又好维护!