Scrapy数据清洗与校验完全指南

📂 所属阶段:第二阶段 — 数据流转(数据处理篇)
🔗 相关章节:Pipeline管道实战 · 数据去重与增量更新

在Scrapy爬虫流程中,原始响应数据往往混杂空白、乱码、格式错乱甚至重复项,直接入库会大幅降低分析价值。本文聚焦Pipeline内的轻量高效方案,覆盖文本/数值/日期的基础清洗、必填/格式/范围的核心校验,附带性能优化和踩坑指南,轻松解决90%的数据质量问题。


一、Pipeline数据处理的核心架构

Scrapy的Pipeline设计天然适合串联处理数据,我们可以按「清洗→校验→去重(可选)」的顺序拆分Pipeline类,避免逻辑混乱:

# settings.py 配置Pipeline优先级(数字越小优先级越高)
ITEM_PIPELINES = {
    'myproject.pipelines.TextCleaningPipeline': 100,  # 1. 文本清洗
    'myproject.pipelines.NumericDateTimePipeline': 150, # 2. 数值/日期处理
    'myproject.pipelines.CoreValidationPipeline': 200, # 3. 核心校验
    # 'myproject.pipelines.DuplicatesPipeline': 250,  # 4. (可选)去重
}

二、数据清洗基础实战

2.1 文本数据清洗(高频场景)

重点处理:首尾空白、多余换行/制表符、HTML标签/实体、编码问题

import re
import unicodedata
from html import unescape

class TextCleaningPipeline:
    def process_item(self, item, spider):
        # 只处理Item定义的文本字段,避免误改
        text_fields = getattr(item, 'text_fields', ['title', 'content', 'desc'])
        
        for field in text_fields:
            if field not in item or not isinstance(item[field], str):
                continue
            
            raw = item[field]
            # 1. Unicode标准化(解决乱码/全角半角混合)
            raw = unicodedata.normalize('NFKC', raw)
            # 2. 解码HTML实体(如 &nbsp; → 空格、&lt; → <)
            raw = unescape(raw)
            # 3. 去除HTML标签(可替换为更健壮的lxml,re适合轻量场景)
            raw = re.sub(r'<[^>]+>', '', raw)
            # 4. 去除多余空白(换行、制表符→单个空格)
            raw = re.sub(r'\s+', ' ', raw).strip()
            
            # 5. 可选:保留中文/英文/数字/常用标点
            # raw = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:()""''\[\]{}\-—]', '', raw)
            
            item[field] = raw
        return item

2.2 数值/日期数据清洗

将字符串转换为标准数据类型,便于后续入库和分析:

from datetime import datetime, timedelta
import re

class NumericDateTimePipeline:
    def __init__(self):
        # 常见日期格式(可根据目标网站调整)
        self.date_fmts = ['%Y-%m-%d', '%Y/%m/%d', '%d-%m-%Y', '%b %d, %Y']
        # 相对时间正则(如 "3 days ago"、"1小时前")
        self.relative_re = [
            (r'(\d+)\s*天前', lambda m: datetime.now() - timedelta(days=int(m.group(1)))),
            (r'(\d+)\s*小时前', lambda m: datetime.now() - timedelta(hours=int(m.group(1)))),
            (r'(\d+)\s*days?\s*ago', lambda m: datetime.now() - timedelta(days=int(m.group(1)))),
        ]
    
    def process_item(self, item, spider):
        # 同样通过Item属性标注字段类型
        num_fields = getattr(item, 'num_fields', ['price', 'rating', 'stock'])
        date_fields = getattr(item, 'date_fields', ['pub_time', 'update_time'])
        
        # 清洗数值
        for field in num_fields:
            if field not in item:
                continue
            item[field] = self._clean_num(item[field])
        
        # 清洗日期
        for field in date_fields:
            if field not in item:
                continue
            item[field] = self._clean_date(item[field])
        
        return item
    
    def _clean_num(self, raw):
        if isinstance(raw, (int, float)):
            return raw
        if not isinstance(raw, str):
            return None
        
        # 去除货币符号、单位、空格
        raw = re.sub(r'[¥$,€£₹%\s元个评分]', '', raw)
        try:
            return float(raw) if '.' in raw else int(raw)
        except ValueError:
            return None
    
    def _clean_date(self, raw):
        if isinstance(raw, datetime):
            return raw
        if not isinstance(raw, str):
            return None
        
        # 先试相对时间
        for pattern, func in self.relative_re:
            match = re.search(pattern, raw, re.IGNORECASE)
            if match:
                return func(match)
        # 再试标准格式
        for fmt in self.date_fmts:
            try:
                return datetime.strptime(raw.strip(), fmt)
            except ValueError:
                continue
        return None

三、数据校验核心实战

3.1 必填字段+范围+一致性校验

通过一个Pipeline类实现核心校验,避免分散:

from scrapy.exceptions import DropItem

class CoreValidationPipeline:
    def process_item(self, item, spider):
        # 1. 必填字段校验(通过Item属性)
        required = getattr(item, 'required_fields', ['title', 'price', 'pub_time'])
        missing = [f for f in required if f not in item or item[f] is None]
        if missing:
            raise DropItem(f"Missing required fields: {', '.join(missing)}")
        
        # 2. 数值范围校验(通过Item属性的dict)
        range_map = getattr(item, 'range_map', {'price': (0, 1000000), 'rating': (0, 5)})
        for f, (min_v, max_v) in range_map.items():
            if f not in item or not isinstance(item[f], (int, float)):
                continue
            if not (min_v <= item[f] <= max_v):
                spider.logger.warning(f"Field {f} out of range: {item[f]}")
                # 可选:DropItem 或 修正值
                # item[f] = max(min_v, min(item[f], max_v))
        
        # 3. 简单一致性校验(如折扣价≤原价)
        if 'original_price' in item and 'discount_price' in item:
            if item['original_price'] and item['discount_price'] > item['original_price']:
                spider.logger.warning(f"Discount > original: {item['discount_price']} > {item['original_price']}")
                # 可选:DropItem 或 交换
        
        return item

四、性能优化小技巧

  1. 避免在Pipeline中做I/O密集型操作(如查数据库、发HTTP请求),可单独开异步线程池/使用Redis/MQ
  2. 优先用原生Python字符串操作,re次之,避免引入过重的库(如lxml除非必须解析复杂HTML)
  3. 定期垃圾回收(在close_spider或每1000条item后调用gc.collect())
  4. 使用字段标注(Item属性),而非遍历所有字段清洗/校验

五、常见踩坑指南

  1. 清洗过度:不要在没有验证的情况下删除字符,先通过spider.logger记录异常
  2. 日期解析时区:如果目标网站有时区,可使用pytz/zoneinfo(Python3.9+内置)
  3. DropItem的使用时机:只有确定数据完全无效时才Drop,否则可标注后继续处理

相关推荐