Pipeline管道实战完全指南 - 数据清洗、验证、存储与处理流程详解

📂 所属阶段:第二阶段 — 数据流转(数据处理篇)
🔗 相关章节:Item 与 Item Loader · Downloader Middleware

目录

Pipeline基础概念

Pipeline是Scrapy框架中用于处理爬取数据的组件,它在Item被抓取后对其进行处理。Pipeline可以执行各种任务,如数据清洗、验证、存储等。

Pipeline的作用与优势

"""
Pipeline的主要作用:
1. 数据清洗:清理和格式化数据
2. 数据验证:确保数据质量
3. 数据存储:将数据保存到不同位置
4. 数据去重:移除重复数据
5. 数据转换:转换数据格式
"""

Pipeline处理流程

"""
Pipeline处理流程:
1. Spider生成Item
2. Item进入Pipeline链
3. 每个Pipeline组件处理Item
4. 最终Item被存储或丢弃
"""

Pipeline生命周期

Pipeline具有完整的生命周期方法,可以在不同阶段执行相应的操作。

基础Pipeline结构

from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem

class BasePipeline:
    """
    基础Pipeline示例
    """
    
    def open_spider(self, spider):
        """
        爬虫开启时调用
        """
        spider.logger.info("Pipeline opened")
    
    def close_spider(self, spider):
        """
        爬虫关闭时调用
        """
        spider.logger.info("Pipeline closed")
    
    def process_item(self, item, spider):
        """
        处理每个Item的核心方法
        """
        adapter = ItemAdapter(item)
        
        # 处理逻辑
        title = adapter.get('title')
        if title:
            adapter['title'] = title.strip().title()
        
        return item
    
    def spider_error(self, response, exception, spider):
        """
        爬虫错误时调用(可选)
        """
        spider.logger.error(f"Spider error: {exception}")

Pipeline处理顺序

"""
Pipeline处理顺序:
在settings.py中定义的顺序决定了Pipeline的执行顺序:

ITEM_PIPELINES = {
    'myproject.pipelines.ValidationPipeline': 300,  # 先执行
    'myproject.pipelines.CleaningPipeline': 400,    # 后执行
    'myproject.pipelines.StoragePipeline': 500,     # 最后执行
}

数字越小,优先级越高,越先执行。
"""

数据清洗Pipeline

数据清洗是Pipeline的重要功能,确保数据质量和一致性。

基础数据清洗Pipeline

import re
from itemadapter import ItemAdapter

class CleaningPipeline:
    """
    数据清洗Pipeline
    """
    
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        
        # 清理文本字段
        for field_name, value in adapter.items():
            if isinstance(value, str):
                # 去除多余空白
                cleaned_value = re.sub(r'\s+', ' ', value.strip())
                # 去除特殊字符
                cleaned_value = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:()""''-]', '', cleaned_value)
                adapter[field_name] = cleaned_value
            elif isinstance(value, list):
                # 处理列表中的字符串
                cleaned_list = []
                for item_value in value:
                    if isinstance(item_value, str):
                        cleaned_value = re.sub(r'\s+', ' ', item_value.strip())
                        cleaned_list.append(cleaned_value)
                    else:
                        cleaned_list.append(item_value)
                adapter[field_name] = cleaned_list
        
        return item

价格数据清洗Pipeline

import re

class PriceCleaningPipeline:
    """
    价格数据清洗Pipeline
    """
    
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        
        price_fields = ['price', 'original_price', 'discount_price']
        
        for field in price_fields:
            if adapter.get(field):
                price_str = str(adapter[field])
                
                # 提取数字部分
                numbers = re.findall(r'\d+(?:\.\d+)?', price_str.replace(',', ''))
                if numbers:
                    try:
                        price = float(numbers[0])
                        adapter[field] = price
                    except ValueError:
                        spider.logger.warning(f"Invalid price format: {price_str}")
                        adapter[field] = None
                else:
                    adapter[field] = None
        
        return item

URL数据清洗Pipeline

from urllib.parse import urljoin, urlparse

class UrlCleaningPipeline:
    """
    URL数据清洗Pipeline
    """
    
    def __init__(self, base_url=None):
        self.base_url = base_url
    
    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        base_url = settings.get('DEFAULT_BASE_URL')
        return cls(base_url)
    
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        
        url_fields = ['url', 'image_urls', 'source_url', 'canonical_url']
        
        for field in url_fields:
            value = adapter.get(field)
            if value:
                if isinstance(value, str):
                    # 标准化单个URL
                    adapter[field] = self.normalize_url(value, spider)
                elif isinstance(value, list):
                    # 标准化URL列表
                    normalized_urls = [self.normalize_url(url, spider) for url in value]
                    adapter[field] = normalized_urls
        
        return item
    
    def normalize_url(self, url, spider):
        """
        标准化URL
        """
        if not url:
            return url
        
        url = url.strip()
        
        # 处理相对URL
        if self.base_url and not url.startswith(('http://', 'https://')):
            url = urljoin(self.base_url, url)
        
        # 验证URL格式
        try:
            parsed = urlparse(url)
            if parsed.scheme and parsed.netloc:
                return url
        except Exception:
            spider.logger.warning(f"Invalid URL format: {url}")
        
        return None

数据验证Pipeline

数据验证确保只有符合要求的数据才会被处理和存储。

基础验证Pipeline

from scrapy.exceptions import DropItem

class ValidationPipeline:
    """
    数据验证Pipeline
    """
    
    def __init__(self):
        self.required_fields = ['title', 'url']
        self.min_title_length = 5
    
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        
        # 检查必需字段
        for field in self.required_fields:
            if not adapter.get(field):
                raise DropItem(f"Missing required field: {field}")
        
        # 验证标题长度
        title = adapter.get('title')
        if title and len(str(title)) < self.min_title_length:
            raise DropItem(f"Title too short: {title}")
        
        # 验证URL格式
        url = adapter.get('url')
        if url and not self.is_valid_url(url):
            raise DropItem(f"Invalid URL: {url}")
        
        return item
    
    def is_valid_url(self, url):
        """
        验证URL格式
        """
        import re
        url_pattern = re.compile(
            r'^https?://'  # http:// or https://
            r'(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}'  # domain
            r'(?:/[^\s]*)?$'  # optional path
        )
        return bool(url_pattern.match(url))

高级验证Pipeline

import re
from datetime import datetime
from scrapy.exceptions import DropItem

class AdvancedValidationPipeline:
    """
    高级数据验证Pipeline
    """
    
    def __init__(self):
        self.field_validations = {
            'email': self.validate_email,
            'phone': self.validate_phone,
            'date': self.validate_date,
            'url': self.validate_url,
            'price': self.validate_price
        }
    
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        
        for field_name, validator in self.field_validations.items():
            value = adapter.get(field_name)
            if value and not validator(value):
                raise DropItem(f"Invalid {field_name}: {value}")
        
        return item
    
    def validate_email(self, email):
        """
        验证邮箱格式
        """
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return bool(re.match(pattern, str(email)))
    
    def validate_phone(self, phone):
        """
        验证手机号格式(中国)
        """
        pattern = r'^1[3-9]\d{9}$'
        return bool(re.match(pattern, str(phone)))
    
    def validate_date(self, date_str):
        """
        验证日期格式
        """
        try:
            datetime.strptime(str(date_str), '%Y-%m-%d')
            return True
        except ValueError:
            return False
    
    def validate_url(self, url):
        """
        验证URL格式
        """
        pattern = r'^https?://(?:[-\w.])+(?:\:[0-9]+)?(?:/(?:[\w/_.])*(?:\?(?:[\w&=%.])*)?(?:\#(?:[\w.])*)?)?$'
        return bool(re.match(pattern, str(url)))
    
    def validate_price(self, price):
        """
        验证价格格式
        """
        try:
            price_float = float(price)
            return price_float >= 0
        except (ValueError, TypeError):
            return False

本地存储Pipeline

JSON存储Pipeline

import json
import os
from itemadapter import ItemAdapter
from datetime import datetime

class JsonPipeline:
    """
    JSON存储Pipeline
    """
    
    def __init__(self, output_dir='data'):
        self.output_dir = output_dir
        self.files = {}
    
    @classmethod
    def from_crawler(cls, crawler):
        output_dir = crawler.settings.get('JSON_OUTPUT_DIR', 'data')
        return cls(output_dir)
    
    def open_spider(self, spider):
        # 创建输出目录
        os.makedirs(self.output_dir, exist_ok=True)
        
        # 为每个爬虫创建单独的文件
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        filename = os.path.join(self.output_dir, f'{spider.name}_{timestamp}.json')
        
        self.file = open(filename, 'w', encoding='utf-8')
        self.file.write('[\n')  # JSON数组开始
        self.first_item = True
    
    def close_spider(self, spider):
        if hasattr(self, 'file'):
            self.file.write('\n]')  # JSON数组结束
            self.file.close()
    
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        
        # 添加时间戳
        adapter['processed_at'] = datetime.now().isoformat()
        
        # 格式化JSON并写入
        line = json.dumps(adapter.asdict(), ensure_ascii=False, indent=2)
        
        if not self.first_item:
            self.file.write(',\n')  # 添加逗号分隔
        else:
            self.first_item = False
        
        self.file.write('  ' + line)  # 添加缩进
        
        return item

CSV存储Pipeline

import csv
import os
from itemadapter import ItemAdapter
from datetime import datetime

class CsvPipeline:
    """
    CSV存储Pipeline
    """
    
    def __init__(self, output_dir='data'):
        self.output_dir = output_dir
    
    @classmethod
    def from_crawler(cls, crawler):
        output_dir = crawler.settings.get('CSV_OUTPUT_DIR', 'data')
        return cls(output_dir)
    
    def open_spider(self, spider):
        os.makedirs(self.output_dir, exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        filename = os.path.join(self.output_dir, f'{spider.name}_{timestamp}.csv')
        
        self.file = open(filename, 'w', newline='', encoding='utf-8')
        self.writer = None
    
    def close_spider(self, spider):
        if hasattr(self, 'file'):
            self.file.close()
    
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        
        # 添加时间戳
        adapter['processed_at'] = datetime.now().isoformat()
        
        if self.writer is None:
            # 首次写入,写入头部
            self.writer = csv.DictWriter(self.file, fieldnames=adapter.keys())
            self.writer.writeheader()
        
        self.writer.writerow(adapter.asdict())
        
        return item

文件去重Pipeline

import hashlib
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem

class DuplicateFilterPipeline:
    """
    文件去重Pipeline
    """
    
    def __init__(self):
        self.seen_hashes = set()
    
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        
        # 创建基于关键字段的哈希
        key_fields = ['title', 'url', 'price']  # 可配置的关键字段
        key_values = []
        
        for field in key_fields:
            value = adapter.get(field)
            if value is not None:
                key_values.append(str(value))
        
        if not key_values:
            return item  # 没有关键字段,跳过去重
        
        # 创建哈希
        key_string = '|'.join(key_values)
        item_hash = hashlib.md5(key_string.encode()).hexdigest()
        
        if item_hash in self.seen_hashes:
            raise DropItem(f"Duplicate item found: {adapter.get('title', 'Unknown')}")
        
        self.seen_hashes.add(item_hash)
        return item

数据库存储Pipeline

MySQL存储Pipeline

import pymysql
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
import logging

class MysqlPipeline:
    """
    MySQL存储Pipeline
    """
    
    def __init__(self, host, database, user, password, port):
        self.host = host
        self.database = database
        self.user = user
        self.password = password
        self.port = port
    
    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        return cls(
            host=settings.get('MYSQL_HOST', 'localhost'),
            database=settings.get('MYSQL_DATABASE', 'scrapy_db'),
            user=settings.get('MYSQL_USER', 'root'),
            password=settings.get('MYSQL_PASSWORD', ''),
            port=settings.get('MYSQL_PORT', 3306)
        )
    
    def open_spider(self, spider):
        try:
            self.connection = pymysql.connect(
                host=self.host,
                port=self.port,
                user=self.user,
                password=self.password,
                database=self.database,
                charset='utf8mb4'
            )
            self.cursor = self.connection.cursor()
            spider.logger.info("Connected to MySQL database")
        except Exception as e:
            spider.logger.error(f"MySQL connection failed: {e}")
            raise DropItem("Database connection failed")
    
    def close_spider(self, spider):
        if hasattr(self, 'connection'):
            self.connection.close()
            spider.logger.info("MySQL connection closed")
    
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        
        # 构建插入语句
        columns = list(adapter.keys())
        placeholders = ', '.join(['%s'] * len(columns))
        column_names = ', '.join(columns)
        
        sql = f"INSERT INTO scraped_data ({column_names}) VALUES ({placeholders})"
        values = [adapter.get(col) for col in columns]
        
        try:
            self.cursor.execute(sql, tuple(values))
            self.connection.commit()
            spider.logger.debug(f"Item inserted: {adapter.get('title', 'Unknown')}")
        except Exception as e:
            self.connection.rollback()
            spider.logger.error(f"Insert failed: {e}")
            raise DropItem(f"Database insert failed: {e}")
        
        return item

MongoDB存储Pipeline

import pymongo
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
from datetime import datetime

class MongoPipeline:
    """
    MongoDB存储Pipeline
    """
    
    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI', 'mongodb://localhost:27017'),
            mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy_db')
        )
    
    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]
        spider.logger.info("Connected to MongoDB")
    
    def close_spider(self, spider):
        self.client.close()
        spider.logger.info("MongoDB connection closed")
    
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        
        # 添加时间戳
        document = dict(adapter.asdict())
        document['scraped_at'] = datetime.now()
        
        # 插入到集合中(集合名为爬虫名称)
        collection = self.db[spider.name]
        
        try:
            result = collection.insert_one(document)
            spider.logger.debug(f"Item inserted with ID: {result.inserted_id}")
        except Exception as e:
            spider.logger.error(f"MongoDB insert failed: {e}")
            raise DropItem(f"MongoDB insert failed: {e}")
        
        return item

Redis存储Pipeline

import redis
import json
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
from datetime import datetime

class RedisPipeline:
    """
    Redis存储Pipeline
    """
    
    def __init__(self, redis_host, redis_port, redis_db):
        self.redis_host = redis_host
        self.redis_port = redis_port
        self.redis_db = redis_db
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            redis_host=crawler.settings.get('REDIS_HOST', 'localhost'),
            redis_port=crawler.settings.get('REDIS_PORT', 6379),
            redis_db=crawler.settings.get('REDIS_DB', 0)
        )
    
    def open_spider(self, spider):
        try:
            self.redis_client = redis.Redis(
                host=self.redis_host,
                port=self.redis_port,
                db=self.redis_db,
                decode_responses=True
            )
            self.redis_client.ping()
            spider.logger.info("Connected to Redis")
        except Exception as e:
            spider.logger.error(f"Redis connection failed: {e}")
            raise DropItem("Redis connection failed")
    
    def close_spider(self, spider):
        spider.logger.info("Redis connection closed")
    
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        
        # 添加时间戳
        data = dict(adapter.asdict())
        data['scraped_at'] = datetime.now().isoformat()
        
        # 序列化为JSON并存储
        item_json = json.dumps(data, ensure_ascii=False)
        
        # 使用爬虫名称作为键的前缀
        key = f"{spider.name}:{data.get('url', datetime.now().timestamp())}"
        
        try:
            self.redis_client.set(key, item_json)
            spider.logger.debug(f"Item stored in Redis: {key}")
        except Exception as e:
            spider.logger.error(f"Redis storage failed: {e}")
            raise DropItem(f"Redis storage failed: {e}")
        
        return item

高级Pipeline技巧

异步Pipeline

import asyncio
import aiofiles
from itemadapter import ItemAdapter
from datetime import datetime

class AsyncPipeline:
    """
    异步Pipeline示例
    """
    
    def __init__(self):
        self.loop = asyncio.get_event_loop()
    
    async def async_write_to_file(self, item_data, filename):
        """
        异步写入文件
        """
        async with aiofiles.open(filename, 'a', encoding='utf-8') as f:
            await f.write(item_data + '\n')
    
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        adapter['processed_at'] = datetime.now().isoformat()
        
        # 在事件循环中运行异步操作
        item_json = json.dumps(adapter.asdict(), ensure_ascii=False)
        coroutine = self.async_write_to_file(item_json, f'{spider.name}_async.json')
        
        # 调度异步任务
        asyncio.create_task(coroutine)
        
        return item

批量处理Pipeline

from itemadapter import ItemAdapter
from collections import deque
import threading
import time

class BatchProcessingPipeline:
    """
    批量处理Pipeline
    """
    
    def __init__(self, batch_size=100, flush_interval=30):
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.batches = {}
        self.last_flush = {}
        self.lock = threading.Lock()
    
    def open_spider(self, spider):
        self.batches[spider.name] = deque()
        self.last_flush[spider.name] = time.time()
    
    def close_spider(self, spider):
        # 关闭时处理剩余数据
        self._flush_batch(spider)
    
    def process_item(self, item, spider):
        with self.lock:
            self.batches[spider.name].append(item)
            
            # 检查是否需要刷新批次
            if len(self.batches[spider.name]) >= self.batch_size:
                self._flush_batch(spider)
            elif time.time() - self.last_flush[spider.name] >= self.flush_interval:
                self._flush_batch(spider)
        
        return item
    
    def _flush_batch(self, spider):
        """
        处理批次数据
        """
        batch = list(self.batches[spider.name])
        if batch:
            # 处理批次数据(这里以打印为例)
            spider.logger.info(f"Processing batch of {len(batch)} items")
            
            # 实际应用中,这里可能是批量插入数据库等操作
            for item in batch:
                adapter = ItemAdapter(item)
                spider.logger.debug(f"Batch processing: {adapter.get('title', 'Unknown')}")
            
            # 清空批次
            self.batches[spider.name].clear()
            self.last_flush[spider.name] = time.time()

条件Pipeline

class ConditionalPipeline:
    """
    条件处理Pipeline
    """
    
    def __init__(self):
        self.conditions = {
            'high_priority': self.is_high_priority,
            'needs_validation': self.needs_extra_validation,
            'requires_storage': self.should_store
        }
    
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        
        # 根据条件应用不同的处理逻辑
        if self.conditions['high_priority'](item):
            self.handle_high_priority(adapter, spider)
        
        if self.conditions['needs_validation'](item):
            self.extra_validation(adapter, spider)
        
        if self.conditions['requires_storage'](item):
            self.store_item(adapter, spider)
        
        return item
    
    def is_high_priority(self, item):
        """
        判断是否为高优先级
        """
        adapter = ItemAdapter(item)
        title = adapter.get('title', '')
        return 'urgent' in str(title).lower() or 'important' in str(title).lower()
    
    def needs_extra_validation(self, item):
        """
        判断是否需要额外验证
        """
        adapter = ItemAdapter(item)
        price = adapter.get('price')
        return price is not None and float(price) > 10000  # 价格超过1万需要额外验证
    
    def should_store(self, item):
        """
        判断是否应该存储
        """
        adapter = ItemAdapter(item)
        return adapter.get('title') is not None
    
    def handle_high_priority(self, adapter, spider):
        """
        处理高优先级项目
        """
        spider.logger.info(f"High priority item: {adapter.get('title')}")
        # 可以发送通知、加急处理等
    
    def extra_validation(self, adapter, spider):
        """
        额外验证
        """
        spider.logger.info(f"Extra validation for: {adapter.get('title')}")
        # 更严格的验证逻辑
    
    def store_item(self, adapter, spider):
        """
        存储项目
        """
        spider.logger.info(f"Storing item: {adapter.get('title')}")
        # 存储逻辑

性能优化策略

连接池优化

from pymysql.connections import Connection
import pymysql
from DBUtils.PooledDB import PooledDB

class OptimizedMysqlPipeline:
    """
    使用连接池优化的MySQL Pipeline
    """
    
    def __init__(self, mysql_settings):
        self.mysql_settings = mysql_settings
        self.pool = None
    
    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        mysql_settings = {
            'host': settings.get('MYSQL_HOST', 'localhost'),
            'port': settings.get('MYSQL_PORT', 3306),
            'user': settings.get('MYSQL_USER', 'root'),
            'password': settings.get('MYSQL_PASSWORD', ''),
            'database': settings.get('MYSQL_DATABASE', 'scrapy_db'),
            'charset': 'utf8mb4'
        }
        return cls(mysql_settings)
    
    def open_spider(self, spider):
        # 创建连接池
        self.pool = PooledDB(
            creator=pymysql,
            maxconnections=20,  # 最大连接数
            mincached=2,        # 最小缓存连接数
            maxcached=5,        # 最大缓存连接数
            maxshared=3,        # 最大共享连接数
            blocking=True,      # 连接池满时是否阻塞
            **self.mysql_settings
        )
        spider.logger.info("MySQL connection pool created")
    
    def close_spider(self, spider):
        if self.pool:
            self.pool.close()
            spider.logger.info("MySQL connection pool closed")
    
    def process_item(self, item, spider):
        conn = None
        try:
            # 从连接池获取连接
            conn = self.pool.connection()
            cursor = conn.cursor()
            
            # 处理数据
            adapter = ItemAdapter(item)
            columns = list(adapter.keys())
            placeholders = ', '.join(['%s'] * len(columns))
            column_names = ', '.join(columns)
            
            sql = f"INSERT INTO scraped_data ({column_names}) VALUES ({placeholders})"
            values = [adapter.get(col) for col in columns]
            
            cursor.execute(sql, tuple(values))
            conn.commit()
            
            spider.logger.debug(f"Item processed: {adapter.get('title', 'Unknown')}")
            
        except Exception as e:
            if conn:
                conn.rollback()
            spider.logger.error(f"Database operation failed: {e}")
            raise DropItem(f"Database operation failed: {e}")
        
        finally:
            if conn:
                conn.close()  # 返回连接到池中
        
        return item

内存优化

import gc
from collections import defaultdict

class MemoryOptimizedPipeline:
    """
    内存优化Pipeline
    """
    
    def __init__(self, max_items_before_gc=1000):
        self.max_items_before_gc = max_items_before_gc
        self.processed_count = 0
    
    def process_item(self, item, spider):
        # 处理项目
        adapter = ItemAdapter(item)
        
        # 检查是否需要垃圾回收
        self.processed_count += 1
        if self.processed_count >= self.max_items_before_gc:
            gc.collect()  # 执行垃圾回收
            self.processed_count = 0
            spider.logger.debug("Garbage collection performed")
        
        return item

错误处理与监控

错误处理Pipeline

import traceback
from datetime import datetime

class ErrorHandlingPipeline:
    """
    错误处理和日志记录Pipeline
    """
    
    def __init__(self):
        self.error_count = 0
        self.success_count = 0
    
    def open_spider(self, spider):
        spider.logger.info("Error handling pipeline started")
    
    def close_spider(self, spider):
        spider.logger.info(f"Pipeline statistics - Success: {self.success_count}, Errors: {self.error_count}")
    
    def process_item(self, item, spider):
        try:
            # 处理项目
            adapter = ItemAdapter(item)
            
            # 添加处理时间戳
            adapter['processed_at'] = datetime.now().isoformat()
            
            self.success_count += 1
            return item
            
        except Exception as e:
            self.error_count += 1
            error_info = {
                'error': str(e),
                'traceback': traceback.format_exc(),
                'item_keys': list(ItemAdapter(item).keys()) if item else [],
                'timestamp': datetime.now().isoformat()
            }
            
            spider.logger.error(f"Processing error: {error_info}")
            
            # 可以选择抛出异常或返回原始项目
            # raise DropItem(f"Processing failed: {e}")
            return item

监控Pipeline

import time
from collections import defaultdict, Counter

class MonitoringPipeline:
    """
    性能监控Pipeline
    """
    
    def __init__(self):
        self.stats = {
            'processed_count': 0,
            'processing_times': [],
            'field_counts': Counter(),
            'error_counts': Counter()
        }
        self.start_time = time.time()
    
    def open_spider(self, spider):
        spider.logger.info("Monitoring pipeline started")
    
    def close_spider(self, spider):
        total_time = time.time() - self.start_time
        avg_processing_time = sum(self.stats['processing_times']) / len(self.stats['processing_times']) if self.stats['processing_times'] else 0
        
        spider.logger.info(f"Processing Summary:")
        spider.logger.info(f"  Total items: {self.stats['processed_count']}")
        spider.logger.info(f"  Total time: {total_time:.2f}s")
        spider.logger.info(f"  Avg processing time: {avg_processing_time:.4f}s")
        spider.logger.info(f"  Items/sec: {self.stats['processed_count']/total_time:.2f}" if total_time > 0 else "N/A")
        spider.logger.info(f"  Field distribution: {dict(self.stats['field_counts'])}")
    
    def process_item(self, item, spider):
        start_time = time.time()
        
        try:
            # 处理项目
            adapter = ItemAdapter(item)
            
            # 统计字段出现次数
            for field_name in adapter.keys():
                self.stats['field_counts'][field_name] += 1
            
            self.stats['processed_count'] += 1
            
        except Exception as e:
            self.stats['error_counts'][str(e)] += 1
            raise
        
        finally:
            processing_time = time.time() - start_time
            self.stats['processing_times'].append(processing_time)
        
        return item

常见问题与解决方案

问题1: Pipeline执行顺序错误

现象: Pipeline没有按照预期顺序执行 解决方案:

# 确保在settings.py中正确设置优先级
ITEM_PIPELINES = {
    'myproject.pipelines.ValidationPipeline': 300,  # 先执行验证
    'myproject.pipelines.CleaningPipeline': 400,    # 再执行清洗
    'myproject.pipelines.StoragePipeline': 500,     # 最后存储
}

问题2: 数据库连接超时

现象: 数据库操作时出现连接超时 解决方案:

# 使用连接池和适当的超时设置
class RobustDatabasePipeline:
    def __init__(self):
        self.retry_attempts = 3
    
    def handle_database_operation(self, operation, *args, **kwargs):
        for attempt in range(self.retry_attempts):
            try:
                return operation(*args, **kwargs)
            except Exception as e:
                if attempt == self.retry_attempts - 1:
                    raise e
                time.sleep(2 ** attempt)  # 指数退避

问题3: 内存泄漏

现象: 长时间运行后内存占用过高 解决方案:

import weakref

class MemorySafePipeline:
    def __init__(self):
        self.item_cache = {}  # 使用弱引用缓存
        self.max_cache_size = 1000
    
    def process_item(self, item, spider):
        # 控制缓存大小
        if len(self.item_cache) > self.max_cache_size:
            self.item_cache.clear()
        
        # 处理项目
        return item

最佳实践建议

设计原则

  1. 单一职责: 每个Pipeline只处理一种类型的逻辑
  2. 可配置性: 通过settings.py配置Pipeline参数
  3. 错误处理: 妥善处理异常,避免影响整个处理链
  4. 性能考虑: 考虑大数据量处理的性能影响

部署建议

  1. 监控: 实施适当的监控和日志记录
  2. 备份: 定期备份存储的数据
  3. 扩展性: 设计可扩展的Pipeline架构
  4. 测试: 充分测试各种边界情况

💡 核心要点: Pipeline是Scrapy数据处理的核心组件,合理的Pipeline设计能够确保数据质量和处理效率。通过组合不同的Pipeline,可以构建强大的数据处理流水线。


SEO优化建议

为了提高这篇Pipeline教程在搜索引擎中的排名,以下是几个关键的SEO优化建议:

标题优化

  • 主标题: 包含核心关键词"Pipeline", "数据处理", "数据存储"
  • 二级标题: 每个章节标题都包含相关的长尾关键词
  • H1-H6层次结构: 保持正确的标题层级,便于搜索引擎理解内容结构

内容优化

  • 关键词密度: 在内容中自然地融入关键词如"Scrapy", "Pipeline", "数据处理", "数据清洗", "数据存储", "爬虫框架"等
  • 元描述: 在文章开头的元数据中包含吸引人的描述
  • 内部链接: 链接到其他相关教程,如Item 与 Item Loader
  • 外部权威链接: 引用官方文档和权威资源

技术SEO

  • 页面加载速度: 优化代码块和图片加载
  • 移动端适配: 确保在移动设备上良好显示
  • 结构化数据: 使用适当的HTML标签和语义化元素

用户体验优化

  • 内容可读性: 使用清晰的段落结构和代码示例
  • 互动元素: 提供实际可运行的代码示例
  • 更新频率: 定期更新内容以保持时效性

🔗 相关教程推荐

🏷️ 标签云: Scrapy Pipeline 数据处理 数据存储 数据清洗 数据验证 爬虫框架 网络爬虫 Python爬虫