Pipeline Complete Practical Guide - Detailed explanation of data cleaning, verification, storage and processing processes

📂 Stage: Stage 2 - Data Flow (Data Processing) 🔗 Related chapters: Item 与 Item Loader · Downloader Middleware

The data captured by crawlers from web pages is often "rough" - in a confusing format, missing a lot, and full of repetitions. Scrapy's Pipeline is your exclusive "finished construction team" that can process messy original Items into clean, standardized, commercial-grade data that can be directly stored in the database.

This article will start with the most basic concepts, gradually lead you to build a reliable data processing pipeline, and provide practical code that can be directly reused. After reading, you will find: It turns out that standardized data processing can be so simple.


Table of contents


Pipeline Basics Introduction

Function and core value

In Scrapy, Pipeline is the "data processing chain" that follows Spider. After each Item is output from Spider, it will flow through each Pipeline component in sequence in the order you configured. In each component, you can complete:

  • Clean Dirty Data: Remove excess spaces, garbled characters, HTML tags, and unify date, price and other formats
  • Quality Verification: Check whether required fields are missing, and data that does not meet the requirements will be discarded directly.
  • Removal: Avoid the same posts, products, and news from being collected repeatedly
  • Sorting Storage: Write JSON files, MySQL, MongoDB, Redis and other different media on demand

Summary in one sentence: **Pipeline turns your scattered data into clean, usable, and traceable assets. **

Simplified version of workflow

The entire process is like a factory assembly line. Items are raw materials, and each Pipeline is a machine at a different station:

flowchart LR
    A[Spider提取Item] --> B[按优先级进入Pipeline链]
    B --> C[各组件依次处理/通过/丢弃]
    C --> D{最后组件}
    D -->|存储| E[完成]
    D -->|丢弃| F[记录日志]

After the item leaves the Spider, it first enters the Pipeline with the highest priority (the smallest number). After processing, it is passed to the next one until it is stored or discarded midway.


Configure priority and life cycle

1. Priority configuration (settings.py)

This is where Pipeline is most prone to pitfalls - the smaller the number, the higher the priority and the earlier it will be executed. Many beginners put storage at the forefront. As a result, dirty data is written into the database before cleaning, which wastes resources and pollutes the data.

A reasonable sequence should be: Verification → Cleaning → Deduplication → Storage.

# ✅ 正确的顺序:质检→清洗→去重→存储
ITEM_PIPELINES = {
    # 先验证必填字段(缺了直接扔掉)
    'myproject.pipelines.RequiredFieldsPipeline': 300,
    # 再统一格式、清洗脏数据
    'myproject.pipelines.CleaningPipeline': 400,
    # 然后去掉重复项
    'myproject.pipelines.DuplicateFilterPipeline': 500,
    # 最后存库,避免浪费资源
    'myproject.pipelines.MysqlPipeline': 600,
}

💡 Tips: It is customary to configure the priority in intervals of 100, so that you can insert a new Pipeline in the middle later without having to readjust all the numbers.

2. Core life cycle methods

In each Pipeline class, you must defineprocess_itemmethod, and the other three methods can be selected as needed. Their calling timing is as follows:

Method nameTrigger timingCommon uses
open_spiderWhen the crawler startsConnect to the database and create output files
close_spiderWhen the crawler is closedDisconnect the database, close the file, and write statistics
process_itemCalled when each Item is processedCore processing, verification, and filtering logic
from_crawlerCalled before Pipeline is loadedRead custom parameters from settings.py

Used in conjunction with these four methods, you can finely control the opening and release of resources during the entire life cycle of the crawler, avoiding problems such as frequent connections being established or files not being closed.


Core Actual Combat Scenario

The following four Pipelines are the most common in crawler projects. You can copy them directly into the project according to your needs and use them with slight modifications.

Scenario 1: Data Cleaning Pipeline

Text, prices, and URLs on the Internet are often ridiculously dirty. For example, the title contains a bunch of line breaks, the price contains currency symbols and Chinese characters, and the URL is a relative path. This Pipeline is specially designed to help you clean them up:

import re
from itemadapter import ItemAdapter
from urllib.parse import urljoin

class CleaningPipeline:
    def __init__(self, base_url=None):
        self.base_url = base_url

    @classmethod
    def from_crawler(cls, crawler):
        # 从settings读取自定义基准URL(补全相对URL用)
        return cls(base_url=crawler.settings.get('DEFAULT_BASE_URL'))

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        # 1. 文本清理:去多余空白、保留中文/英文标点/数字
        for field in ['title', 'content', 'author']:
            if adapter.get(field):
                text = re.sub(r'\s+', ' ', str(adapter[field]).strip())
                text = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:()""''-]', '', text)
                adapter[field] = text

        # 2. 价格清洗:提取纯数字(处理¥199.99 / 原价299.00元这类)
        for field in ['price', 'original_price']:
            if adapter.get(field):
                price_str = str(adapter[field]).replace(',', '')
                numbers = re.findall(r'\d+(?:\.\d+)?', price_str)
                adapter[field] = float(numbers[0]) if numbers else None

        # 3. URL补全与标准化
        if adapter.get('url'):
            url = adapter['url'].strip()
            if not url.startswith(('http://', 'https://')) and self.base_url:
                url = urljoin(self.base_url, url)
            adapter['url'] = url

        return item

Here we usedItemAdapter, which is compatible with dictionaries and Item objects, allowing Pipeline code not to be bound to specific data types.

Scenario 2: Required field verification + discarding unqualified items

Keeping news without titles and products without URLs will only waste storage space. This Pipeline intercepts data as soon as it enters the system:

from scrapy.exceptions import DropItem
from itemadapter import ItemAdapter

class RequiredFieldsPipeline:
    def __init__(self):
        # 可在此处修改必填字段
        self.required = ['title', 'url']

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        for field in self.required:
            if not adapter.get(field):
                raise DropItem(f"丢弃缺少必填字段「{field}」的Item")
        return item

⚠️ throwsDropItemAfterwards, the current Item will not continue to enter the subsequent Pipeline, and Scrapy will record an INFO level log to facilitate you to monitor the discard rate.

Scenario 3: MD5 deduplication Pipeline

Is the same page being crawled repeatedly? Generate MD5 fingerprints using core fields and discard duplicates:

import hashlib
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem

class DuplicateFilterPipeline:
    def __init__(self):
        self.seen = set()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        # 选择最能唯一标识一条数据的字段组合
        key = f"{adapter.get('title', '')}|{adapter.get('url', '')}"
        item_hash = hashlib.md5(key.encode()).hexdigest()
        if item_hash in self.seen:
            raise DropItem(f"丢弃重复Item:{adapter.get('title', '')[:20]}...")
        self.seen.add(item_hash)
        return item

This method is simple and effective, but if the amount of data is particularly large (millions), it is recommended toself.seenReplace with Redis Set to avoid memory overflow.

Scenario 4: JSON storage Pipeline

Finally, save the processed data as a standardized JSON file to facilitate handover to the data analysis team:

import json
import os
from itemadapter import ItemAdapter
from datetime import datetime

class JsonPipeline:
    def __init__(self, output_dir='data'):
        self.output_dir = output_dir

    @classmethod
    def from_crawler(cls, crawler):
        return cls(output_dir=crawler.settings.get('JSON_OUTPUT_DIR', 'data'))

    def open_spider(self, spider):
        os.makedirs(self.output_dir, exist_ok=True)
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        self.file = open(
            os.path.join(self.output_dir, f'{spider.name}_{timestamp}.json'),
            'w',
            encoding='utf-8'
        )
        self.file.write('[\n')
        self.first = True

    def close_spider(self, spider):
        self.file.write('\n]')
        self.file.close()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        adapter['scraped_at'] = datetime.now().isoformat()
        line = json.dumps(adapter.asdict(), ensure_ascii=False, indent=2)
        if not self.first:
            self.file.write(',\n')
        else:
            self.first = False
        self.file.write('  ' + line)
        return item

This Pipeline will write the results of each crawl into a JSON file with a timestamp, and automatically addscraped_atfield to facilitate tracking data collection time.


Performance and error optimization

When a crawler needs to process a large amount of data, inserting a single entry into the database can seriously slow down the speed. At the same time, unhandled exceptions may also cause the entire data to be lost. The following two optimization suggestions can make your Pipeline more robust.

1. Batch storage optimization (applicable to MySQL/MongoDB)

Each time you save a batch and submit it all at once, you can significantly reduce database connection overhead:

from collections import deque
from itemadapter import ItemAdapter
import time
import pymysql

class BatchMysqlPipeline:
    def __init__(self, host, db, user, pwd, port, batch=100, flush=30):
        self.host, self.db, self.user, self.pwd, self.port = host, db, user, pwd, port
        self.batch_size = batch
        self.flush_sec = flush
        self.buffer = deque()
        self.last_flush = time.time()

    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        return cls(
            host=settings.get('MYSQL_HOST', 'localhost'),
            db=settings.get('MYSQL_DATABASE', 'scrapy'),
            user=settings.get('MYSQL_USER', 'root'),
            pwd=settings.get('MYSQL_PASSWORD', ''),
            port=settings.get('MYSQL_PORT', 3306),
            batch=settings.get('MYSQL_BATCH_SIZE', 100),
            flush=settings.get('MYSQL_FLUSH_SEC', 30)
        )

    def open_spider(self, spider):
        self.conn = pymysql.connect(
            host=self.host, port=self.port, user=self.user,
            password=self.pwd, database=self.db, charset='utf8mb4'
        )
        self.cursor = self.conn.cursor()

    def close_spider(self, spider):
        self._flush_buffer(spider)    # 爬虫结束前把剩余数据也写入
        self.conn.close()

    def process_item(self, item, spider):
        self.buffer.append(ItemAdapter(item).asdict())
        if len(self.buffer) >= self.batch_size or time.time() - self.last_flush >= self.flush_sec:
            self._flush_buffer(spider)
        return item

    def _flush_buffer(self, spider):
        if not self.buffer:
            return
        # 批量插入示例(根据自己的表结构修改SQL)
        keys = list(self.buffer[0].keys())
        values = [tuple(item[k] for k in keys) for item in self.buffer]
        placeholders = ', '.join(['%s'] * len(keys))
        sql = f"INSERT INTO scraped_data ({', '.join(keys)}) VALUES ({placeholders})"
        try:
            self.cursor.executemany(sql, values)
            self.conn.commit()
            spider.logger.info(f"批量插入 {len(values)} 条数据成功")
        except Exception as e:
            self.conn.rollback()
            spider.logger.error(f"批量插入失败:{e}")
        self.buffer.clear()
        self.last_flush = time.time()

The core idea is: first store the Item in the memory buffer, and when the number reaches the set value or exceeds a certain time, perform a batch write. This will not only ensure that data is not lost, but also give full play to the throughput capacity of the database.

2. Basic error handling

Adding try/except to the Pipeline can prevent individual bad data from interrupting the entire process:

from scrapy.exceptions import DropItem
import traceback
from itemadapter import ItemAdapter

class ErrorHandlingPipeline:
    def process_item(self, item, spider):
        try:
            # 在这里包裹可能出错的处理逻辑
            return item
        except Exception as e:
            spider.logger.error(
                f"处理Item出错:{e},详情:{traceback.format_exc()}"
            )
            # 可选:记录失败Item,方便后续排查
            with open('failed_items.txt', 'a', encoding='utf-8') as f:
                f.write(f"{ItemAdapter(item).asdict()}\n")
            raise DropItem(f"因异常丢弃Item:{e}")

In this way, even if there is a problem with a single piece of data, it will not affect the normal flow of other data.


FAQ Quick Check

Q1: Pipeline is not effective?

Mostly due to these two reasons:

  1. Class name and path are inconsistent: Please confirmsettings.pyinITEM_PIPELINESThe key exactly matches the full path of your Pipeline class.
  2. Uncaught exception occurred in the middle: Some Pipeline is inprocess_itemAfter an exception is thrown, the entire chain of calls may be terminated. It is recommended to add exception-handling to key Pipelines.

Q2: Are there extra commas at the end of the JSON file?

Referring to the implementation in "Scenario 4" above, we useself.firstmark to control the addition of commas, so that the generated JSON array is legal and will not be troubled by an extra comma.

Q3: What should I do if the deduplication Pipeline overflows?

When the amount of data is large, use Python directlysetStoring fingerprints takes up a lot of memory. At this time, you can upgrade to Redis deduplication, andself.seenReplaced with a Redis Set collection. For distributed crawlers, global deduplication can also be achieved.

Q4: How to transfer data between multiple Pipelines?

Item is passed by reference. You can modify the Item in the previous Pipeline, and the subsequent Pipeline will automatically see the modified content. This is exactly in line with the concept of "clean first, then verify, and finally store".


💡 Core Points: The design of Pipeline should follow the "Single Responsibility" principle - each class only focuses on doing one thing, and is connected through priority to form a complete processing link. This is both clear and easy to read and convenient for later maintenance and expansion. Now, you can copy this code directly into your Scrapy project and have your data processing pipeline running in no time!