Scrapy电商全站爬取实战项目 - 从零构建多层级电商数据抓取系统

📂 所属阶段:第四阶段 — 实战演练(项目开发篇)
🔗 相关章节:Spider实战 · Selector选择器 · Pipeline管道实战 · 反爬对抗实战

目录

项目背景与目标

电商网站爬取是爬虫领域的经典应用场景,涉及多级分类导航、深度翻页、大量商品数据提取等复杂技术挑战。本项目将构建一个完整的电商全站抓取系统,具备以下核心能力:

  • 多级分类处理:递归爬取电商网站的多层级分类结构
  • 深度翻页:智能处理商品列表页的多页翻页逻辑
  • 商品数据提取:精准提取商品标题、价格、描述、规格等关键信息
  • 反爬虫对抗:集成IP轮换、请求头伪装、频率控制等反爬策略
  • 数据质量保证:实现数据去重、清洗、验证机制
  • 性能优化:采用分布式架构提升抓取效率

项目技术栈

  • 爬虫框架:Scrapy 2.x
  • 数据处理:Pydantic、Pandas
  • 存储方案:MongoDB、MySQL
  • 代理管理:Scrapy-Proxy-Pool
  • 分布式:Scrapy-Redis

电商网站架构分析

典型电商网站结构

"""
电商网站典型架构:

电商平台
├── 首页 (Homepage)
│   └── 导航菜单 (Navigation Menu)
├── 分类页面 (Category Pages)
│   ├── 一级分类 (Primary Categories)
│   │   ├── 二级分类 (Secondary Categories)
│   │   │   └── 三级分类 (Tertiary Categories)
│   │   └── 商品列表 (Product Listings)
│   └── 筛选条件 (Filter Options)
├── 商品列表页 (Product List Pages)
│   ├── 分页导航 (Pagination Navigation)
│   ├── 商品卡片 (Product Cards)
│   │   ├── 商品图片 (Product Images)
│   │   ├── 商品标题 (Product Title)
│   │   ├── 商品价格 (Product Price)
│   │   └── 商品链接 (Product Link)
│   └── 排序选项 (Sorting Options)
├── 商品详情页 (Product Detail Pages)
│   ├── 商品基本信息 (Basic Product Info)
│   ├── 商品图片 (Product Images)
│   ├── 价格信息 (Price Information)
│   ├── 规格参数 (Specifications)
│   ├── 用户评价 (User Reviews)
│   └── 相关推荐 (Recommendations)
└── 用户中心 (User Center)
"""

爬取难点分析

"""
电商爬取主要难点:

1. 反爬虫机制:
   - 频率限制 (Rate Limiting)
   - IP封禁 (IP Blocking)
   - 验证码挑战 (CAPTCHA Challenges)
   - 浏览器指纹检测 (Browser Fingerprinting)

2. 数据结构复杂:
   - 多层级分类体系
   - 动态加载的商品数据
   - JavaScript渲染的内容
   - AJAX请求获取的数据

3. 网站结构变化:
   - 定期的UI改版
   - CSS选择器变化
   - API接口调整
   - 反爬策略升级
"""

项目架构设计

项目目录结构

ecommerce_spider/
├── ecommerce_spider/
   ├── spiders/
   ├── __init__.py
   ├── ecommerce_spider.py      # 主爬虫文件
   └── category_spider.py       # 分类爬虫
   ├── items/
   ├── __init__.py
   ├── product_item.py          # 商品数据模型
   └── category_item.py         # 分类数据模型
   ├── pipelines/
   ├── __init__.py
   ├── validation_pipeline.py   # 数据验证管道
   ├── deduplication_pipeline.py # 数据去重管道
   ├── storage_pipeline.py      # 数据存储管道
   └── monitoring_pipeline.py   # 监控管道
   ├── middlewares/
   ├── __init__.py
   ├── anti_crawler_middleware.py # 反爬中间件
   ├── proxy_middleware.py      # 代理中间件
   └── retry_middleware.py      # 重试中间件
   ├── utils/
   ├── __init__.py
   ├── category_parser.py       # 分类解析工具
   ├── price_parser.py          # 价格解析工具
   └── data_cleaner.py          # 数据清洗工具
   ├── settings.py                  # 项目配置
   └── __init__.py
├── scrapy.cfg                       # Scrapy配置文件
└── requirements.txt                 # 依赖包列表

核心组件关系图

graph TD
    A[Start URLs] --> B[Category Parser]
    B --> C[Product List Parser]
    C --> D[Product Detail Parser]
    D --> E[Data Validation]
    E --> F[Data Storage]
    F --> G[Monitoring]
    
    H[Anti-Crawler Middleware] -.-> B
    I[Proxy Middleware] -.-> C
    J[Retry Middleware] -.-> D

数据模型定义

商品数据模型

# ecommerce_spider/items/product_item.py
import scrapy
from itemloaders.processors import TakeFirst, MapCompose, Join
from w3lib.html import remove_tags
import re

def clean_price(value):
    """清理价格数据"""
    if value:
        # 移除货币符号和空格
        cleaned = re.sub(r'[^\d.,]', '', value.strip())
        try:
            return float(cleaned)
        except ValueError:
            return None
    return None

def clean_title(value):
    """清理标题数据"""
    if value:
        return value.strip()
    return None

def clean_description(value):
    """清理描述数据"""
    if value:
        # 移除HTML标签并清理空白
        cleaned = remove_tags(value)
        return cleaned.strip()
    return None

def extract_rating(value):
    """提取评分数据"""
    if value:
        match = re.search(r'(\d+(?:\.\d+)?)', str(value))
        if match:
            try:
                return float(match.group(1))
            except ValueError:
                return None
    return None

class ProductItem(scrapy.Item):
    """
    商品数据模型
    """
    # 基本信息
    product_id = scrapy.Field(
        input_processor=MapCompose(str.strip),
        output_processor=TakeFirst()
    )
    title = scrapy.Field(
        input_processor=MapCompose(clean_title),
        output_processor=TakeFirst()
    )
    brand = scrapy.Field(
        input_processor=MapCompose(str.strip),
        output_processor=TakeFirst()
    )
    category_path = scrapy.Field(
        input_processor=MapCompose(str.strip),
        output_processor=Join(' > ')
    )
    
    # 价格信息
    current_price = scrapy.Field(
        input_processor=MapCompose(clean_price),
        output_processor=TakeFirst()
    )
    original_price = scrapy.Field(
        input_processor=MapCompose(clean_price),
        output_processor=TakeFirst()
    )
    discount_rate = scrapy.Field(
        input_processor=MapCompose(str.strip),
        output_processor=TakeFirst()
    )
    
    # 产品规格
    specifications = scrapy.Field()
    color_options = scrapy.Field()
    size_options = scrapy.Field()
    
    # 图片信息
    main_image = scrapy.Field(
        input_processor=MapCompose(str.strip),
        output_processor=TakeFirst()
    )
    gallery_images = scrapy.Field()
    
    # 评价信息
    rating = scrapy.Field(
        input_processor=MapCompose(extract_rating),
        output_processor=TakeFirst()
    )
    review_count = scrapy.Field(
        input_processor=MapCompose(str.strip),
        output_processor=TakeFirst()
    )
    
    # 库存与销售
    stock_status = scrapy.Field(
        input_processor=MapCompose(str.strip),
        output_processor=TakeFirst()
    )
    sales_volume = scrapy.Field(
        input_processor=MapCompose(str.strip),
        output_processor=TakeFirst()
    )
    
    # 元数据
    url = scrapy.Field(
        input_processor=MapCompose(str.strip),
        output_processor=TakeFirst()
    )
    crawled_at = scrapy.Field()
    source_website = scrapy.Field(
        input_processor=MapCompose(str.strip),
        output_processor=TakeFirst()
    )

class CategoryItem(scrapy.Item):
    """
    分类数据模型
    """
    category_id = scrapy.Field()
    category_name = scrapy.Field(
        input_processor=MapCompose(str.strip),
        output_processor=TakeFirst()
    )
    parent_category = scrapy.Field()
    level = scrapy.Field()
    url = scrapy.Field(
        input_processor=MapCompose(str.strip),
        output_processor=TakeFirst()
    )
    product_count = scrapy.Field()
    subcategories = scrapy.Field()

配置数据模型

# ecommerce_spider/items/config_item.py
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
from datetime import datetime

class EcommerceConfig(BaseModel):
    """
    电商爬虫配置模型
    """
    website_name: str = Field(..., description="网站名称")
    base_url: str = Field(..., description="基础URL")
    max_concurrent_requests: int = Field(16, description="最大并发请求数")
    download_delay: float = Field(1.0, description="下载延迟")
    auto_throttle_enabled: bool = Field(True, description="启用自动限速")
    auto_throttle_start_delay: float = Field(1.0, description="自动限速起始延迟")
    auto_throttle_max_delay: float = Field(10.0, description="自动限速最大延迟")
    retry_times: int = Field(3, description="重试次数")
    user_agent: str = Field("Mozilla/5.0 (compatible; EcommerceBot/1.0)", description="用户代理")
    proxy_enabled: bool = Field(False, description="启用代理")
    proxy_list: Optional[list] = Field(None, description="代理列表")
    pipelines: Dict[str, int] = Field(default_factory=dict, description="管道配置")
    custom_settings: Dict[str, Any] = Field(default_factory=dict, description="自定义设置")
    
    class Config:
        arbitrary_types_allowed = True

爬虫实现详解

主爬虫实现

# ecommerce_spider/spiders/ecommerce_spider.py
import scrapy
import json
import re
from urllib.parse import urljoin, urlparse
from ecommerce_spider.items import ProductItem, CategoryItem
from ecommerce_spider.utils.category_parser import CategoryParser
from ecommerce_spider.utils.price_parser import PriceParser

class EcommerceSpider(scrapy.Spider):
    """
    电商网站全站爬虫
    """
    name = 'ecommerce'
    
    custom_settings = {
        'DOWNLOAD_DELAY': 2,
        'CONCURRENT_REQUESTS': 8,
        'CONCURRENT_REQUESTS_PER_DOMAIN': 2,
        'RANDOMIZE_DOWNLOAD_DELAY': 0.5,
        'AUTOTHROTTLE_ENABLED': True,
        'AUTOTHROTTLE_START_DELAY': 1,
        'AUTOTHROTTLE_MAX_DELAY': 10,
        'RETRY_TIMES': 3,
        'RETRY_HTTP_CODES': [500, 502, 503, 504, 408, 429],
    }
    
    def __init__(self, *args, **kwargs):
        super(EcommerceSpider, self).__init__(*args, **kwargs)
        
        # 初始化配置
        self.start_urls = kwargs.get('start_urls', ['https://example.com/categories'])
        self.max_pages = int(kwargs.get('max_pages', 100))
        self.category_parser = CategoryParser()
        self.price_parser = PriceParser()
        
        # 统计变量
        self.stats = {
            'categories_crawled': 0,
            'products_crawled': 0,
            'errors': 0
        }
    
    def start_requests(self):
        """
        开始请求
        """
        for url in self.start_urls:
            yield scrapy.Request(
                url=url,
                callback=self.parse_categories,
                errback=self.handle_error,
                meta={
                    'category_level': 0,
                    'parent_category': None
                }
            )
    
    def parse_categories(self, response):
        """
        解析分类页面
        """
        self.logger.info(f"Parsing categories from: {response.url}")
        
        # 提取分类链接
        category_links = response.css('a.category-link, .category-item a, .nav-category a')
        
        for link in category_links:
            category_url = link.css('::attr(href)').get()
            if category_url:
                category_url = urljoin(response.url, category_url)
                
                # 提取分类信息
                category_name = link.css('::text').get() or link.css('span::text').get()
                
                # 创建分类项
                category_item = CategoryItem()
                category_item['category_name'] = category_name.strip() if category_name else 'Unknown'
                category_item['url'] = category_url
                category_item['level'] = response.meta.get('category_level', 0) + 1
                category_item['parent_category'] = response.meta.get('parent_category')
                
                yield category_item
                
                # 请求分类页面的商品列表
                yield scrapy.Request(
                    url=category_url,
                    callback=self.parse_product_list,
                    errback=self.handle_error,
                    meta={
                        'category_name': category_name.strip() if category_name else 'Unknown',
                        'category_level': response.meta.get('category_level', 0) + 1,
                        'parent_category': category_name.strip() if category_name else 'Unknown'
                    }
                )
        
        # 递归处理子分类
        subcategory_links = response.css('.subcategory a, .sub-category a')
        for link in subcategory_links:
            subcategory_url = link.css('::attr(href)').get()
            if subcategory_url:
                subcategory_url = urljoin(response.url, subcategory_url)
                
                yield scrapy.Request(
                    url=subcategory_url,
                    callback=self.parse_categories,
                    errback=self.handle_error,
                    meta={
                        'category_level': response.meta.get('category_level', 0) + 1,
                        'parent_category': response.meta.get('category_name', 'Unknown')
                    }
                )
    
    def parse_product_list(self, response):
        """
        解析商品列表页面
        """
        self.logger.info(f"Parsing products from: {response.url}")
        
        # 提取商品链接
        product_links = response.css(
            'a.product-link, .product-item a, .product-card a, '
            '.product-container a, .goods-item a'
        )
        
        for link in product_links:
            product_url = link.css('::attr(href)').get()
            if product_url:
                product_url = urljoin(response.url, product_url)
                
                yield scrapy.Request(
                    url=product_url,
                    callback=self.parse_product_detail,
                    errback=self.handle_error,
                    meta={
                        'category_name': response.meta.get('category_name'),
                        'parent_category': response.meta.get('parent_category')
                    }
                )
        
        # 处理翻页
        next_page = self._find_next_page(response)
        if next_page and response.meta.get('page_number', 1) < self.max_pages:
            next_page_url = urljoin(response.url, next_page)
            
            yield scrapy.Request(
                url=next_page_url,
                callback=self.parse_product_list,
                errback=self.handle_error,
                meta={
                    'category_name': response.meta.get('category_name'),
                    'parent_category': response.meta.get('parent_category'),
                    'page_number': response.meta.get('page_number', 1) + 1
                }
            )
    
    def parse_product_detail(self, response):
        """
        解析商品详情页面
        """
        self.logger.info(f"Parsing product detail: {response.url}")
        
        try:
            product_item = ProductItem()
            
            # 提取商品ID
            product_id = self._extract_product_id(response)
            product_item['product_id'] = product_id
            
            # 提取标题
            title_selectors = [
                'h1::text',
                '.product-title::text',
                '.product-name::text',
                '.title::text',
                'title::text'
            ]
            title = None
            for selector in title_selectors:
                title = response.css(selector).get()
                if title:
                    break
            product_item['title'] = title.strip() if title else 'Unknown Product'
            
            # 提取品牌
            brand_selectors = [
                '[itemprop="brand"]::text',
                '.brand::text',
                '.product-brand::text',
                '[data-brand]::text'
            ]
            brand = None
            for selector in brand_selectors:
                brand = response.css(selector).get()
                if brand:
                    break
            product_item['brand'] = brand.strip() if brand else 'Unknown Brand'
            
            # 提取价格
            price_selectors = [
                '[itemprop="price"]::text',
                '.price::text',
                '.product-price::text',
                '.current-price::text',
                '.sale-price::text'
            ]
            current_price = None
            for selector in price_selectors:
                current_price = response.css(selector).get()
                if current_price:
                    break
            product_item['current_price'] = self.price_parser.parse_price(current_price) if current_price else None
            
            # 提取原价
            original_price_selectors = [
                '.original-price::text',
                '.list-price::text',
                '.was-price::text'
            ]
            original_price = None
            for selector in original_price_selectors:
                original_price = response.css(selector).get()
                if original_price:
                    break
            product_item['original_price'] = self.price_parser.parse_price(original_price) if original_price else None
            
            # 计算折扣率
            if product_item.get('original_price') and product_item.get('current_price'):
                discount = (1 - product_item['current_price'] / product_item['original_price']) * 100
                product_item['discount_rate'] = f"{discount:.1f}%"
            
            # 提取主图
            image_selectors = [
                '[itemprop="image"]::attr(src)',
                '.main-image img::attr(src)',
                '.product-image img::attr(src)',
                '.product-img img::attr(src)'
            ]
            main_image = None
            for selector in image_selectors:
                main_image = response.css(selector).get()
                if main_image:
                    break
            product_item['main_image'] = urljoin(response.url, main_image) if main_image else None
            
            # 提取画廊图片
            gallery_images = response.css('.gallery img::attr(src)').getall()
            product_item['gallery_images'] = [urljoin(response.url, img) for img in gallery_images if img]
            
            # 提取评分
            rating_selectors = [
                '[itemprop="ratingValue"]::text',
                '.rating::text',
                '.score::text',
                '.stars::text'
            ]
            rating = None
            for selector in rating_selectors:
                rating = response.css(selector).get()
                if rating:
                    break
            product_item['rating'] = float(rating) if rating and rating.replace('.', '').isdigit() else None
            
            # 提取评论数
            review_selectors = [
                '[itemprop="reviewCount"]::text',
                '.review-count::text',
                '.comment-count::text',
                '.reviews::text'
            ]
            review_count = None
            for selector in review_selectors:
                review_count = response.css(selector).get()
                if review_count:
                    break
            product_item['review_count'] = review_count.strip() if review_count else None
            
            # 提取库存状态
            stock_selectors = [
                '.stock-status::text',
                '.availability::text',
                '.in-stock::text',
                '.out-of-stock::text'
            ]
            stock_status = None
            for selector in stock_selectors:
                stock_status = response.css(selector).get()
                if stock_status:
                    break
            product_item['stock_status'] = stock_status.strip() if stock_status else 'Unknown'
            
            # 提取销量
            sales_selectors = [
                '.sales-volume::text',
                '.sold-count::text',
                '.sales-count::text'
            ]
            sales_volume = None
            for selector in sales_selectors:
                sales_volume = response.css(selector).get()
                if sales_volume:
                    break
            product_item['sales_volume'] = sales_volume.strip() if sales_volume else 'Unknown'
            
            # 提取规格参数
            specs = self._extract_specifications(response)
            product_item['specifications'] = specs
            
            # 提取颜色选项
            colors = response.css('.color-option::text, .color-item::text').getall()
            product_item['color_options'] = [color.strip() for color in colors if color.strip()]
            
            # 提取尺寸选项
            sizes = response.css('.size-option::text, .size-item::text').getall()
            product_item['size_options'] = [size.strip() for size in sizes if size.strip()]
            
            # 元数据
            product_item['url'] = response.url
            product_item['category_path'] = f"{response.meta.get('parent_category', '')} > {response.meta.get('category_name', '')}"
            product_item['crawled_at'] = scrapy.utils.misc.get_func_args(self.parse_product_detail)[0].now()
            product_item['source_website'] = urlparse(response.url).netloc
            
            yield product_item
            
        except Exception as e:
            self.logger.error(f"Error parsing product detail: {response.url}, Error: {str(e)}")
            self.stats['errors'] += 1
    
    def _extract_product_id(self, response):
        """
        提取商品ID
        """
        # 从URL中提取
        url = response.url
        product_id_match = re.search(r'/(\d+)/?$', url) or re.search(r'id=(\d+)', url)
        if product_id_match:
            return product_id_match.group(1)
        
        # 从页面元素中提取
        id_selectors = [
            '[data-product-id]',
            '[data-id]',
            '.product-id',
            '#product-id'
        ]
        for selector in id_selectors:
            element = response.css(selector)
            if element:
                data_id = element.attrib.get('data-product-id') or element.attrib.get('data-id')
                if data_id:
                    return data_id
        
        # 从JSON-LD中提取
        json_ld_scripts = response.css('script[type="application/ld+json"]::text').getall()
        for script in json_ld_scripts:
            try:
                data = json.loads(script)
                if isinstance(data, dict) and data.get('@type') == 'Product':
                    return data.get('sku') or data.get('productID')
            except:
                continue
        
        return 'unknown'
    
    def _extract_specifications(self, response):
        """
        提取规格参数
        """
        specs = {}
        
        # 尝试从表格中提取规格
        spec_tables = response.css('.specs-table, .product-specs, .specifications')
        for table in spec_tables:
            rows = table.css('tr')
            for row in rows:
                key = row.css('td:first-child, th::text').get()
                value = row.css('td:last-child, td:nth-child(2)::text').get()
                if key and value:
                    specs[key.strip()] = value.strip()
        
        # 尝试从列表中提取规格
        spec_lists = response.css('.specs-list, .spec-list')
        for spec_list in spec_lists:
            items = spec_list.css('li')
            for item in items:
                text = item.css('::text').get()
                if ':' in text:
                    key, value = text.split(':', 1)
                    specs[key.strip()] = value.strip()
        
        # 尝试从JSON数据中提取
        script_tags = response.css('script:not([src])::text').getall()
        for script in script_tags:
            if 'specs' in script.lower() or 'specifications' in script.lower():
                try:
                    # 尝试解析JSON
                    json_match = re.search(r'({.*?})', script, re.DOTALL)
                    if json_match:
                        data = json.loads(json_match.group(1))
                        if isinstance(data, dict):
                            specs.update(data)
                except:
                    continue
        
        return specs
    
    def _find_next_page(self, response):
        """
        查找下一页链接
        """
        next_selectors = [
            'a.next::attr(href)',
            'a[rel="next"]::attr(href)',
            '.next::attr(href)',
            'a:contains("下一页")::attr(href)',
            'a:contains("Next")::attr(href)',
            'a:contains("»")::attr(href)',
            '.pagination .next a::attr(href)',
            '.pager .next a::attr(href)'
        ]
        
        for selector in next_selectors:
            next_url = response.css(selector).get()
            if next_url:
                return next_url
        
        # 尝试从JavaScript中提取
        script_content = ' '.join(response.css('script::text').getall())
        next_match = re.search(r'"nextPageUrl"\s*:\s*"([^"]+)"', script_content)
        if next_match:
            return next_match.group(1)
        
        return None
    
    def handle_error(self, failure):
        """
        处理请求错误
        """
        self.logger.error(f"Request failed: {failure.request.url}")
        self.logger.error(f"Failure: {failure}")
        self.stats['errors'] += 1
    
    def closed(self, reason):
        """
        爬虫关闭时的清理工作
        """
        self.logger.info(f"Spider closed. Reason: {reason}")
        self.logger.info(f"Statistics: {self.stats}")

多级分类处理策略

分类解析器实现

# ecommerce_spider/utils/category_parser.py
import re
from urllib.parse import urljoin, urlparse
from typing import List, Dict, Optional
import scrapy

class CategoryParser:
    """
    分类解析器
    """
    
    def __init__(self):
        self.category_patterns = [
            r'/category/([^/]+)',
            r'/cat/([^/]+)',
            r'/c/([^/]+)',
            r'category=([^&]+)',
            r'cat=([^&]+)'
        ]
        
        self.level_indicators = [
            'primary', 'secondary', 'tertiary', 'sub', 'child'
        ]
    
    def parse_category_hierarchy(self, response):
        """
        解析分类层级结构
        """
        hierarchy = []
        
        # 从面包屑导航解析
        breadcrumbs = self._parse_breadcrumbs(response)
        if breadcrumbs:
            hierarchy.extend(breadcrumbs)
        
        # 从URL结构解析
        url_categories = self._parse_url_categories(response.url)
        if url_categories:
            hierarchy.extend(url_categories)
        
        # 从页面元素解析
        page_categories = self._parse_page_categories(response)
        if page_categories:
            hierarchy.extend(page_categories)
        
        return self._deduplicate_hierarchy(hierarchy)
    
    def _parse_breadcrumbs(self, response):
        """
        解析面包屑导航
        """
        breadcrumb_selectors = [
            '.breadcrumb a',
            '.breadcrumbs a',
            '.crumbs a',
            'nav[aria-label*="breadcrumb"] a',
            '[role="navigation"] a'
        ]
        
        for selector in breadcrumb_selectors:
            breadcrumbs = response.css(selector)
            if breadcrumbs:
                breadcrumb_data = []
                for i, breadcrumb in enumerate(breadcrumbs):
                    name = breadcrumb.css('::text').get()
                    url = breadcrumb.css('::attr(href)').get()
                    
                    if name and url:
                        breadcrumb_data.append({
                            'name': name.strip(),
                            'url': urljoin(response.url, url),
                            'level': i
                        })
                return breadcrumb_data
        
        return []
    
    def _parse_url_categories(self, url):
        """
        从URL解析分类
        """
        parsed_url = urlparse(url)
        path_parts = parsed_url.path.strip('/').split('/')
        
        categories = []
        for i, part in enumerate(path_parts):
            if any(pattern.format(part) for pattern in self.category_patterns):
                categories.append({
                    'name': part,
                    'url': url,
                    'level': i
                })
        
        return categories
    
    def _parse_page_categories(self, response):
        """
        从页面元素解析分类
        """
        category_selectors = [
            '.category-nav a',
            '.category-menu a',
            '.nav-category a',
            '.sidebar-category a'
        ]
        
        categories = []
        for selector in category_selectors:
            elements = response.css(selector)
            for i, element in enumerate(elements):
                name = element.css('::text').get()
                url = element.css('::attr(href)').get()
                
                if name and url:
                    categories.append({
                        'name': name.strip(),
                        'url': urljoin(response.url, url),
                        'level': self._determine_level(element)
                    })
        
        return categories
    
    def _determine_level(self, element):
        """
        确定分类层级
        """
        classes = element.css('::attr(class)').get() or ''
        text = element.css('::text').get() or ''
        
        level = 0
        if any(indicator in classes.lower() for indicator in self.level_indicators):
            level += 1
        if any(indicator in text.lower() for indicator in self.level_indicators):
            level += 1
        
        return level
    
    def _deduplicate_hierarchy(self, hierarchy):
        """
        去重层级结构
        """
        seen = set()
        unique_hierarchy = []
        
        for item in hierarchy:
            key = (item['name'], item['url'])
            if key not in seen:
                seen.add(key)
                unique_hierarchy.append(item)
        
        return unique_hierarchy
    
    def build_category_tree(self, categories):
        """
        构建分类树
        """
        tree = {}
        
        for category in categories:
            path = category['name'].split(' > ')
            current = tree
            
            for level_name in path:
                if level_name not in current:
                    current[level_name] = {}
                current = current[level_name]
        
        return tree

class AdvancedCategorySpider(scrapy.Spider):
    """
    高级分类爬虫
    """
    
    name = 'advanced_category'
    
    def parse_category_with_depth(self, response, max_depth=3):
        """
        深度解析分类
        """
        parser = CategoryParser()
        hierarchy = parser.parse_category_hierarchy(response)
        
        # 提取当前层级的分类
        current_categories = response.css(
            'a.category-link, .category-item a, .nav-category a'
        )
        
        for i, category_link in enumerate(current_categories):
            if i >= max_depth:
                break
                
            category_url = category_link.css('::attr(href)').get()
            if category_url:
                category_url = urljoin(response.url, category_url)
                
                yield scrapy.Request(
                    url=category_url,
                    callback=self.parse_category_with_depth,
                    meta={
                        'depth': response.meta.get('depth', 0) + 1,
                        'max_depth': max_depth,
                        'hierarchy': hierarchy
                    }
                )

深度翻页实现

智能翻页处理器

# ecommerce_spider/utils/pagination_handler.py
import re
from urllib.parse import urljoin, urlparse, parse_qs
from typing import Optional, Dict, List
import scrapy

class PaginationHandler:
    """
    翻页处理器
    """
    
    def __init__(self):
        self.pagination_patterns = [
            r'page=(\d+)',
            r'pagenum=(\d+)',
            r'p=(\d+)',
            r'/page/(\d+)',
            r'/p(\d+)',
            r'\?p=(\d+)'
        ]
        
        self.next_page_indicators = [
            'next', '下一页', '>', '»', 'forward',
            '下一页', '后页', '下翻', 'next-page'
        ]
        
        self.prev_page_indicators = [
            'prev', 'previous', '上一页', '<', '«', 'back'
        ]
    
    def extract_current_page(self, url):
        """
        提取当前页码
        """
        parsed = urlparse(url)
        
        # 从查询参数中提取
        query_params = parse_qs(parsed.query)
        for param in ['page', 'pagenum', 'p', 'pg']:
            if param in query_params:
                try:
                    return int(query_params[param][0])
                except (ValueError, IndexError):
                    continue
        
        # 从URL路径中提取
        for pattern in self.pagination_patterns:
            match = re.search(pattern, url)
            if match:
                try:
                    return int(match.group(1))
                except ValueError:
                    continue
        
        return 1  # 默认第一页
    
    def generate_next_page_url(self, current_url, current_page=None):
        """
        生成下一页URL
        """
        if current_page is None:
            current_page = self.extract_current_page(current_url)
        
        next_page = current_page + 1
        
        # 尝试替换URL中的页码
        for pattern in self.pagination_patterns:
            if re.search(pattern, current_url):
                next_url = re.sub(pattern, rf'page={next_page}', current_url)
                return next_url
        
        # 如果URL中没有页码参数,添加页码
        parsed = urlparse(current_url)
        if '?' in current_url:
            return f"{current_url}&page={next_page}"
        else:
            return f"{current_url}?page={next_page}"
    
    def detect_pagination_method(self, response):
        """
        检测翻页方式
        """
        pagination_info = {
            'method': None,
            'has_next': False,
            'has_prev': False,
            'total_pages': None
        }
        
        # 检测AJAX翻页
        if self._has_ajax_pagination(response):
            pagination_info['method'] = 'ajax'
        
        # 检测普通翻页
        elif self._has_standard_pagination(response):
            pagination_info['method'] = 'standard'
        
        # 检测无限滚动
        elif self._has_infinite_scroll(response):
            pagination_info['method'] = 'infinite'
        
        # 检测是否有下一页
        pagination_info['has_next'] = bool(self._find_next_page_link(response))
        
        return pagination_info
    
    def _has_ajax_pagination(self, response):
        """
        检测是否为AJAX翻页
        """
        ajax_indicators = [
            'onclick*="ajax"',
            'data-ajax',
            '[data-load="ajax"]',
            '.load-more-btn',
            '.infinite-scroll'
        ]
        
        for indicator in ajax_indicators:
            if response.css(indicator):
                return True
        
        return False
    
    def _has_standard_pagination(self, response):
        """
        检测是否为标准翻页
        """
        standard_selectors = [
            '.pagination',
            '.pager',
            '.page-nav',
            '.paging',
            '.pages'
        ]
        
        for selector in standard_selectors:
            if response.css(selector):
                return True
        
        return False
    
    def _has_infinite_scroll(self, response):
        """
        检测是否为无限滚动
        """
        infinite_indicators = [
            '.infinite-scroll',
            '.load-more',
            '.show-more',
            '[data-infinite="true"]',
            '.lazy-load'
        ]
        
        for indicator in infinite_indicators:
            if response.css(indicator):
                return True
        
        return False
    
    def _find_next_page_link(self, response):
        """
        查找下一页链接
        """
        # 标准翻页选择器
        selectors = [
            'a.next::attr(href)',
            'a[rel="next"]::attr(href)',
            '.next a::attr(href)',
            '.pagination .next a::attr(href)',
            '.pager .next a::attr(href)'
        ]
        
        for selector in selectors:
            link = response.css(selector).get()
            if link:
                return link
        
        # 文字匹配
        for indicator in self.next_page_indicators:
            link = response.css(f'a:contains("{indicator}")::attr(href)').get()
            if link:
                return link
        
        # 图标匹配
        icon_selectors = [
            'a:contains(">")::attr(href)',
            'a:contains("»")::attr(href)',
            'a[title*="next"]::attr(href)',
            'a[aria-label*="next"]::attr(href)'
        ]
        
        for selector in icon_selectors:
            link = response.css(selector).get()
            if link:
                return link
        
        return None
    
    def handle_pagination(self, response, max_pages=100):
        """
        处理翻页逻辑
        """
        current_page = self.extract_current_page(response.url)
        
        if current_page >= max_pages:
            return None  # 达到最大页数,停止翻页
        
        next_page_url = self._find_next_page_link(response)
        if next_page_url:
            return urljoin(response.url, next_page_url)
        
        # 如果找不到下一页链接,尝试生成
        generated_url = self.generate_next_page_url(response.url, current_page)
        if generated_url != response.url:
            return generated_url
        
        return None

class SmartPaginationMiddleware:
    """
    智能翻页中间件
    """
    
    def __init__(self):
        self.pagination_handler = PaginationHandler()
    
    def process_response(self, request, response, spider):
        """
        处理响应,检测翻页
        """
        if 'product_list' in request.callback.__name__:
            pagination_info = self.pagination_handler.detect_pagination_method(response)
            
            if pagination_info['has_next']:
                # 提取下一页URL
                next_url = self.pagination_handler.handle_pagination(
                    response, 
                    max_pages=spider.settings.get('MAX_PAGES', 100)
                )
                
                if next_url:
                    # 生成下一页请求
                    next_request = scrapy.Request(
                        url=next_url,
                        callback=request.callback,
                        meta=request.meta,
                        dont_filter=True
                    )
                    return [response, next_request]
        
        return response

商品数据提取

数据提取策略

# ecommerce_spider/utils/data_extractor.py
import re
import json
from typing import Dict, List, Optional, Union
from bs4 import BeautifulSoup
import scrapy

class DataExtractor:
    """
    数据提取器
    """
    
    def __init__(self):
        self.price_patterns = [
            r'[\d,]+\.?\d*',  # 匹配数字和小数点
            r([\d,]+\.?\d*)',  # 匹配人民币
            r'\$([\d,]+\.?\d*)',  # 匹配美元
            r'€([\d,]+\.?\d*)',  # 匹配欧元
        ]
        
        self.rating_patterns = [
            r'(\d+(?:\.\d+)?)',  # 匹配数字(含小数)
            r'(\d+(?:\.\d+)?)/5',  # 匹配5分制评分
            r'(\d+(?:\.\d+)?)/10',  # 匹配10分制评分
        ]
    
    def extract_price(self, text: str) -> Optional[float]:
        """
        提取价格
        """
        if not text:
            return None
        
        # 移除货币符号和空格
        cleaned = re.sub(r'[^\d.,]', '', text.strip())
        
        # 查找价格数字
        for pattern in self.price_patterns:
            match = re.search(pattern, cleaned)
            if match:
                try:
                    price_str = match.group(1) if match.groups() else match.group(0)
                    return float(price_str.replace(',', ''))
                except ValueError:
                    continue
        
        return None
    
    def extract_rating(self, text: str) -> Optional[float]:
        """
        提取评分
        """
        if not text:
            return None
        
        for pattern in self.rating_patterns:
            match = re.search(pattern, text)
            if match:
                try:
                    rating = float(match.group(1))
                    return min(rating, 5.0)  # 限制在5分以内
                except ValueError:
                    continue
        
        return None
    
    def extract_reviews_count(self, text: str) -> Optional[int]:
        """
        提取评论数
        """
        if not text:
            return None
        
        # 匹配数字,可能包含千位分隔符
        number_match = re.search(r'(\d+(?:[,,]\d{3})*(?:\.\d+)?)', text)
        if number_match:
            try:
                count_str = number_match.group(1).replace(',', '').replace(',', '')
                return int(float(count_str))
            except ValueError:
                pass
        
        return None
    
    def extract_specifications(self, response) -> Dict[str, str]:
        """
        提取规格参数
        """
        specs = {}
        
        # 从表格提取
        tables = response.css('.specs-table, .product-specs, .specifications')
        for table in tables:
            rows = table.css('tr')
            for row in rows:
                key_elem = row.css('td:first-child, th::text').get()
                value_elem = row.css('td:last-child, td:nth-child(2)::text').get()
                
                if key_elem and value_elem:
                    key = self._clean_text(key_elem)
                    value = self._clean_text(value_elem)
                    specs[key] = value
        
        # 从列表提取
        lists = response.css('.specs-list, .spec-list')
        for spec_list in lists:
            items = spec_list.css('li')
            for item in items:
                text = self._clean_text(item.css('::text').get())
                if ':' in text:
                    key, value = text.split(':', 1)
                    specs[self._clean_text(key)] = self._clean_text(value)
        
        # 从JSON-LD提取
        json_ld_scripts = response.css('script[type="application/ld+json"]::text').getall()
        for script in json_ld_scripts:
            try:
                data = json.loads(script)
                if isinstance(data, dict) and data.get('@type') == 'Product':
                    # 提取产品规格
                    if 'additionalProperty' in data:
                        for prop in data['additionalProperty']:
                            name = prop.get('name', '')
                            value = prop.get('value', '')
                            specs[name] = str(value)
            except:
                continue
        
        return specs
    
    def extract_images(self, response) -> List[str]:
        """
        提取图片链接
        """
        images = []
        
        # 主图
        main_image_selectors = [
            '[itemprop="image"]::attr(src)',
            '.main-image img::attr(src)',
            '.product-image img::attr(src)',
            '.product-img img::attr(src)',
            '.large-image::attr(src)'
        ]
        
        for selector in main_image_selectors:
            main_img = response.css(selector).get()
            if main_img:
                images.append(response.urljoin(main_img))
                break
        
        # 画廊图片
        gallery_selectors = [
            '.gallery img::attr(src)',
            '.thumbnails img::attr(src)',
            '.product-gallery img::attr(src)',
            '.image-thumb::attr(src)'
        ]
        
        for selector in gallery_selectors:
            gallery_imgs = response.css(selector).getall()
            for img in gallery_imgs:
                if img:
                    images.append(response.urljoin(img))
        
        # 去重并返回
        return list(dict.fromkeys(images))  # 保持顺序的去重
    
    def _clean_text(self, text: str) -> str:
        """
        清理文本
        """
        if not text:
            return ''
        return text.strip().replace('\n', '').replace('\t', '').replace('\r', '')
    
    def extract_stock_status(self, response) -> str:
        """
        提取库存状态
        """
        status_selectors = [
            '.stock-status::text',
            '.availability::text',
            '.in-stock::text',
            '.out-of-stock::text',
            '.stock-info::text',
            '[data-stock]::text'
        ]
        
        for selector in status_selectors:
            status = response.css(selector).get()
            if status:
                cleaned = self._clean_text(status)
                if cleaned:
                    return cleaned
        
        return 'Unknown'

class AdvancedDataExtractor(DataExtractor):
    """
    高级数据提取器
    """
    
    def extract_from_dynamic_content(self, response) -> Dict:
        """
        从动态内容提取数据
        """
        extracted_data = {}
        
        # 从JavaScript变量中提取
        script_content = ' '.join(response.css('script:not([src])::text').getall())
        
        # 提取JSON数据
        json_matches = re.findall(r'({.*?})', script_content)
        for json_str in json_matches:
            try:
                data = json.loads(json_str)
                if isinstance(data, dict):
                    extracted_data.update(self._process_dynamic_data(data))
            except:
                continue
        
        # 提取JavaScript变量
        var_patterns = [
            r'var\s+(\w+)\s*=\s*({.*?});',
            r'const\s+(\w+)\s*=\s*({.*?});',
            r'let\s+(\w+)\s*=\s*({.*?});'
        ]
        
        for pattern in var_patterns:
            matches = re.findall(pattern, script_content)
            for var_name, var_value in matches:
                try:
                    data = json.loads(var_value)
                    if isinstance(data, dict):
                        extracted_data.update(self._process_dynamic_data(data))
                except:
                    continue
        
        return extracted_data
    
    def _process_dynamic_data(self, data: Dict) -> Dict:
        """
        处理动态数据
        """
        processed = {}
        
        # 提取价格信息
        if 'price' in data:
            processed['current_price'] = self.extract_price(str(data['price']))
        
        if 'originalPrice' in data:
            processed['original_price'] = self.extract_price(str(data['originalPrice']))
        
        # 提取评分信息
        if 'rating' in data:
            processed['rating'] = self.extract_rating(str(data['rating']))
        
        if 'reviewCount' in data:
            processed['review_count'] = self.extract_reviews_count(str(data['reviewCount']))
        
        # 提取规格信息
        if 'specifications' in data or 'specs' in data:
            specs = data.get('specifications') or data.get('specs', {})
            if isinstance(specs, dict):
                processed['specifications'] = specs
        
        # 提取库存信息
        if 'stock' in data or 'inventory' in data:
            processed['stock_status'] = str(data.get('stock') or data.get('inventory', 'Unknown'))
        
        return processed

class ProductDataExtractor:
    """
    专门的产品数据提取器
    """
    
    def __init__(self):
        self.basic_extractor = DataExtractor()
        self.advanced_extractor = AdvancedDataExtractor()
    
    def extract_complete_product_data(self, response) -> Dict:
        """
        提取完整产品数据
        """
        product_data = {}
        
        # 基本信息
        product_data['title'] = self._extract_title(response)
        product_data['brand'] = self._extract_brand(response)
        
        # 价格信息
        product_data['current_price'] = self._extract_price(response)
        product_data['original_price'] = self._extract_original_price(response)
        product_data['discount_rate'] = self._calculate_discount_rate(product_data)
        
        # 图片信息
        product_data['main_image'] = self._extract_main_image(response)
        product_data['gallery_images'] = self._extract_gallery_images(response)
        
        # 评价信息
        product_data['rating'] = self._extract_rating(response)
        product_data['review_count'] = self._extract_review_count(response)
        
        # 规格参数
        product_data['specifications'] = self._extract_specifications(response)
        
        # 库存状态
        product_data['stock_status'] = self._extract_stock_status(response)
        
        # 颜色和尺寸选项
        product_data['color_options'] = self._extract_color_options(response)
        product_data['size_options'] = self._extract_size_options(response)
        
        # 动态数据
        dynamic_data = self.advanced_extractor.extract_from_dynamic_content(response)
        product_data.update(dynamic_data)
        
        return product_data
    
    def _extract_title(self, response) -> str:
        """
        提取标题
        """
        title_selectors = [
            'h1::text',
            '.product-title::text',
            '.product-name::text',
            '.title::text'
        ]
        
        for selector in title_selectors:
            title = response.css(selector).get()
            if title:
                return self.basic_extractor._clean_text(title)
        
        return 'Unknown Product'
    
    def _extract_brand(self, response) -> str:
        """
        提取品牌
        """
        brand_selectors = [
            '[itemprop="brand"]::text',
            '.brand::text',
            '.product-brand::text',
            '[data-brand]::text'
        ]
        
        for selector in brand_selectors:
            brand = response.css(selector).get()
            if brand:
                return self.basic_extractor._clean_text(brand)
        
        return 'Unknown Brand'
    
    def _extract_price(self, response) -> Optional[float]:
        """
        提取价格
        """
        price_selectors = [
            '[itemprop="price"]::text',
            '.price::text',
            '.product-price::text',
            '.current-price::text',
            '.sale-price::text'
        ]
        
        for selector in price_selectors:
            price_text = response.css(selector).get()
            if price_text:
                return self.basic_extractor.extract_price(price_text)
        
        return None
    
    def _extract_original_price(self, response) -> Optional[float]:
        """
        提取原价
        """
        original_price_selectors = [
            '.original-price::text',
            '.list-price::text',
            '.was-price::text'
        ]
        
        for selector in original_price_selectors:
            original_price_text = response.css(selector).get()
            if original_price_text:
                return self.basic_extractor.extract_price(original_price_text)
        
        return None
    
    def _calculate_discount_rate(self, product_data: Dict) -> Optional[str]:
        """
        计算折扣率
        """
        current_price = product_data.get('current_price')
        original_price = product_data.get('original_price')
        
        if current_price and original_price and original_price > 0:
            discount = (1 - current_price / original_price) * 100
            return f"{discount:.1f}%"
        
        return None
    
    def _extract_main_image(self, response) -> Optional[str]:
        """
        提取主图
        """
        image_selectors = [
            '[itemprop="image"]::attr(src)',
            '.main-image img::attr(src)',
            '.product-image img::attr(src)',
            '.product-img img::attr(src)'
        ]
        
        for selector in image_selectors:
            main_image = response.css(selector).get()
            if main_image:
                return response.urljoin(main_image)
        
        return None
    
    def _extract_gallery_images(self, response) -> List[str]:
        """
        提取画廊图片
        """
        gallery_images = response.css('.gallery img::attr(src)').getall()
        return [response.urljoin(img) for img in gallery_images if img]
    
    def _extract_rating(self, response) -> Optional[float]:
        """
        提取评分
        """
        rating_selectors = [
            '[itemprop="ratingValue"]::text',
            '.rating::text',
            '.score::text',
            '.stars::text'
        ]
        
        for selector in rating_selectors:
            rating = response.css(selector).get()
            if rating:
                return self.basic_extractor.extract_rating(rating)
        
        return None
    
    def _extract_review_count(self, response) -> Optional[int]:
        """
        提取评论数
        """
        review_selectors = [
            '[itemprop="reviewCount"]::text',
            '.review-count::text',
            '.comment-count::text',
            '.reviews::text'
        ]
        
        for selector in review_selectors:
            review_count = response.css(selector).get()
            if review_count:
                return self.basic_extractor.extract_reviews_count(review_count)
        
        return None
    
    def _extract_specifications(self, response) -> Dict[str, str]:
        """
        提取规格参数
        """
        return self.basic_extractor.extract_specifications(response)
    
    def _extract_stock_status(self, response) -> str:
        """
        提取库存状态
        """
        return self.basic_extractor.extract_stock_status(response)
    
    def _extract_color_options(self, response) -> List[str]:
        """
        提取颜色选项
        """
        colors = response.css('.color-option::text, .color-item::text, [data-color]::text').getall()
        return [self.basic_extractor._clean_text(color) for color in colors if color.strip()]
    
    def _extract_size_options(self, response) -> List[str]:
        """
        提取尺寸选项
        """
        sizes = response.css('.size-option::text, .size-item::text, [data-size]::text').getall()
        return [self.basic_extractor._clean_text(size) for size in sizes if size.strip()]

反爬虫对抗策略

反爬虫中间件

# ecommerce_spider/middlewares/anti_crawler_middleware.py
import random
import time
import requests
from scrapy.downloadermiddlewares.retry import RetryMiddleware
from scrapy.utils.response import response_status_message
from fake_useragent import UserAgent

class AntiCrawlerMiddleware:
    """
    反爬虫中间件
    """
    
    def __init__(self):
        self.ua = UserAgent()
        self.banned_ips = set()
        self.request_counts = {}
        self.last_request_time = {}
        
        # 请求头模板
        self.header_templates = [
            {
                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
                'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
                'Accept-Encoding': 'gzip, deflate, br',
                'Connection': 'keep-alive',
                'Upgrade-Insecure-Requests': '1',
                'Sec-Fetch-Dest': 'document',
                'Sec-Fetch-Mode': 'navigate',
                'Sec-Fetch-Site': 'none',
                'Cache-Control': 'max-age=0'
            },
            {
                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
                'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8',
                'Accept-Encoding': 'gzip, deflate',
                'Connection': 'keep-alive',
                'Upgrade-Insecure-Requests': '1',
                'Cache-Control': 'no-cache'
            }
        ]
    
    def process_request(self, request, spider):
        """
        处理请求,应用反爬虫策略
        """
        # 随机User-Agent
        request.headers['User-Agent'] = self.ua.random
        
        # 随机请求头
        template = random.choice(self.header_templates)
        for key, value in template.items():
            if key not in request.headers:
                request.headers[key] = value
        
        # 随机延时
        delay = random.uniform(1, 3)
        time.sleep(delay)
        
        # 记录请求信息
        domain = request.url.split('/')[2]
        if domain not in self.request_counts:
            self.request_counts[domain] = 0
        self.request_counts[domain] += 1
        
        return None
    
    def process_response(self, request, response, spider):
        """
        处理响应,检测反爬虫信号
        """
        # 检测是否被封禁
        if self._is_banned_response(response):
            domain = request.url.split('/')[2]
            spider.logger.warning(f"Domain {domain} might be banned")
            
            # 标记IP为被封禁
            if hasattr(response, 'ip_address'):
                self.banned_ips.add(response.ip_address)
        
        # 检测验证码页面
        if self._has_captcha(response):
            spider.logger.warning("Captcha detected, consider implementing captcha solving")
        
        return response
    
    def _is_banned_response(self, response):
        """
        检测是否为封禁响应
        """
        banned_statuses = [403, 429, 503]
        banned_texts = [
            'blocked', 'forbidden', 'access denied', 'rate limit', 
            'too many requests', 'captcha', 'verification', 'unauthorized'
        ]
        
        if response.status in banned_statuses:
            return True
        
        response_text = response.text.lower()
        for text in banned_texts:
            if text in response_text:
                return True
        
        return False
    
    def _has_captcha(self, response):
        """
        检测是否包含验证码
        """
        captcha_indicators = [
            'captcha', 'verification', 'validate', 'auth', 'security',
            'are you a human', 'robot check', 'prove you are human',
            'recaptcha', 'hcaptcha'
        ]
        
        response_text = response.text.lower()
        for indicator in captcha_indicators:
            if indicator in response_text:
                return True
        
        return False

class ProxyMiddleware:
    """
    代理中间件
    """
    
    def __init__(self):
        self.proxies = []
        self.current_proxy_index = 0
        self.proxy_usage_count = {}
        self.failed_proxies = set()
    
    def load_proxies(self, proxy_list):
        """
        加载代理列表
        """
        self.proxies = proxy_list
    
    def get_next_proxy(self):
        """
        获取下一个代理
        """
        if not self.proxies:
            return None
        
        # 跳过失败的代理
        for _ in range(len(self.proxies)):
            proxy = self.proxies[self.current_proxy_index % len(self.proxies)]
            self.current_proxy_index += 1
            
            if proxy not in self.failed_proxies:
                return proxy
        
        return None
    
    def process_request(self, request, spider):
        """
        为请求分配代理
        """
        proxy = self.get_next_proxy()
        if proxy:
            request.meta['proxy'] = proxy
            spider.logger.debug(f"Using proxy: {proxy}")
        
        return None
    
    def process_response(self, request, response, spider):
        """
        处理响应,检测代理是否失效
        """
        if response.status in [403, 429, 503]:
            proxy = request.meta.get('proxy')
            if proxy:
                spider.logger.warning(f"Proxy {proxy} might be blocked")
                self.failed_proxies.add(proxy)
        
        return response
    
    def process_exception(self, request, exception, spider):
        """
        处理异常,标记代理为失效
        """
        proxy = request.meta.get('proxy')
        if proxy:
            spider.logger.error(f"Proxy {proxy} failed: {exception}")
            self.failed_proxies.add(proxy)

class RateLimitMiddleware:
    """
    速率限制中间件
    """
    
    def __init__(self):
        self.request_times = {}
        self.min_interval = 1  # 最小请求间隔(秒)
        self.domain_limits = {}  # 域名特定限制
    
    def process_request(self, request, spider):
        """
        处理请求,实施速率限制
        """
        domain = request.url.split('/')[2]
        
        # 获取域名特定的限制
        limit = self.domain_limits.get(domain, self.min_interval)
        
        current_time = time.time()
        
        # 检查上次请求时间
        last_request = self.request_times.get(domain, 0)
        time_since_last = current_time - last_request
        
        if time_since_last < limit:
            sleep_time = limit - time_since_last
            spider.logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s for {domain}")
            time.sleep(sleep_time)
        
        # 更新最后请求时间
        self.request_times[domain] = time.time()
        
        return None
    
    def set_domain_limit(self, domain, limit_seconds):
        """
        设置域名特定的请求限制
        """
        self.domain_limits[domain] = limit_seconds

class SessionMiddleware:
    """
    会话管理中间件
    """
    
    def __init__(self):
        self.sessions = {}
        self.session_cookies = {}
        self.session_rotation_interval = 3600  # 1小时轮换
    
    def process_request(self, request, spider):
        """
        处理请求,管理会话
        """
        domain = request.url.split('/')[2]
        
        # 检查是否需要轮换会话
        current_time = time.time()
        last_rotation = self.sessions.get(domain, 0)
        
        if current_time - last_rotation > self.session_rotation_interval:
            # 轮换会话(清除旧Cookie)
            if domain in self.session_cookies:
                del self.session_cookies[domain]
            self.sessions[domain] = current_time
        
        # 添加现有的会话Cookie
        if domain in self.session_cookies:
            for name, value in self.session_cookies[domain].items():
                request.cookies.set(name, value)
        
        return None
    
    def process_response(self, request, response, spider):
        """
        处理响应,更新会话Cookie
        """
        domain = request.url.split('/')[2]
        
        # 提取新的Cookie
        new_cookies = {}
        for cookie_header in response.headers.getlist('Set-Cookie'):
            cookie_str = cookie_header.decode('utf-8')
            if '=' in cookie_str:
                name, value = cookie_str.split('=', 1)
                new_cookies[name.strip()] = value.split(';')[0].strip()
        
        # 更新会话Cookie
        if domain not in self.session_cookies:
            self.session_cookies[domain] = {}
        self.session_cookies[domain].update(new_cookies)
        
        return response

数据去重与清洗

数据清洗工具

# ecommerce_spider/utils/data_cleaner.py
import re
import pandas as pd
from typing import Dict, List, Any, Optional
from datetime import datetime

class DataCleaner:
    """
    数据清洗工具
    """
    
    def __init__(self):
        self.price_patterns = [
            r'[\d,]+\.?\d*',  # 匹配数字和小数点
            r([\d,]+\.?\d*)',  # 匹配人民币
            r'\$([\d,]+\.?\d*)',  # 匹配美元
            r'€([\d,]+\.?\d*)',  # 匹配欧元
        ]
        
        self.cleaning_rules = {
            'title': self._clean_title,
            'description': self._clean_description,
            'price': self._clean_price,
            'rating': self._clean_rating,
            'review_count': self._clean_review_count,
            'specifications': self._clean_specifications
        }
    
    def clean_product_data(self, product_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        清洗产品数据
        """
        cleaned_data = {}
        
        for field, value in product_data.items():
            if field in self.cleaning_rules:
                cleaned_data[field] = self.cleaning_rules[field](value)
            else:
                cleaned_data[field] = self._clean_generic_field(value)
        
        # 添加清洗时间戳
        cleaned_data['cleaned_at'] = datetime.now()
        
        return cleaned_data
    
    def _clean_title(self, title: str) -> str:
        """
        清洗标题
        """
        if not title:
            return 'Unknown Product'
        
        # 移除多余空格和换行
        cleaned = re.sub(r'\s+', ' ', title.strip())
        
        # 移除特殊字符(保留中文、英文、数字、基本标点)
        cleaned = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s\-_.,!?;:()()【】\[\]]', '', cleaned)
        
        return cleaned
    
    def _clean_description(self, description: str) -> str:
        """
        清洗描述
        """
        if not description:
            return ''
        
        # 移除HTML标签
        cleaned = re.sub(r'<[^>]+>', '', description)
        
        # 移除多余空格和换行
        cleaned = re.sub(r'\n+', '\n', cleaned)
        cleaned = re.sub(r'\s+', ' ', cleaned)
        
        # 移除特殊字符
        cleaned = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s\-_.,!?;:()()【】\[\]/]', '', cleaned)
        
        return cleaned.strip()
    
    def _clean_price(self, price: Any) -> Optional[float]:
        """
        清洗价格
        """
        if price is None:
            return None
        
        if isinstance(price, (int, float)):
            return float(price)
        
        if isinstance(price, str):
            # 移除货币符号和空格
            cleaned = re.sub(r'[^\d.,]', '', price.strip())
            try:
                return float(cleaned.replace(',', ''))
            except ValueError:
                return None
        
        return None
    
    def _clean_rating(self, rating: Any) -> Optional[float]:
        """
        清洗评分
        """
        if rating is None:
            return None
        
        if isinstance(rating, (int, float)):
            return min(float(rating), 5.0)  # 限制在5分以内
        
        if isinstance(rating, str):
            try:
                num_rating = float(re.search(r'(\d+(?:\.\d+)?)', rating).group(1))
                return min(num_rating, 5.0)
            except (ValueError, AttributeError):
                return None
        
        return None
    
    def _clean_review_count(self, review_count: Any) -> Optional[int]:
        """
        清洗评论数
        """
        if review_count is None:
            return None
        
        if isinstance(review_count, int):
            return review_count
        
        if isinstance(review_count, str):
            # 提取数字
            numbers = re.findall(r'\d+', review_count.replace(',', ''))
            if numbers:
                return int(numbers[0])
        
        return None
    
    def _clean_specifications(self, specs: Any) -> Dict[str, str]:
        """
        清洗规格参数
        """
        if not specs:
            return {}
        
        if isinstance(specs, dict):
            cleaned_specs = {}
            for key, value in specs.items():
                cleaned_key = self._clean_generic_field(key) if key else ''
                cleaned_value = self._clean_generic_field(value) if value else ''
                if cleaned_key and cleaned_value:
                    cleaned_specs[cleaned_key] = cleaned_value
            return cleaned_specs
        
        return {}
    
    def _clean_generic_field(self, field: Any) -> Any:
        """
        清洗通用字段
        """
        if isinstance(field, str):
            # 移除多余空格
            return re.sub(r'\s+', ' ', field.strip())
        return field
    
    def remove_duplicates(self, products: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        去除重复产品
        """
        seen_products = set()
        unique_products = []
        
        for product in products:
            # 创建产品的唯一标识
            product_id = product.get('product_id', 'unknown')
            title = product.get('title', 'Unknown Product')
            url = product.get('url', '')
            
            # 生成唯一标识符
            identifier = f"{product_id}_{hash(title)}_{hash(url)}"
            
            if identifier not in seen_products:
                seen_products.add(identifier)
                unique_products.append(product)
        
        return unique_products
    
    def validate_data_quality(self, products: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        验证数据质量
        """
        quality_report = {
            'total_products': len(products),
            'complete_products': 0,
            'missing_fields': {},
            'invalid_values': {},
            'quality_score': 0.0
        }
        
        required_fields = ['title', 'current_price', 'url']
        
        for product in products:
            # 检查完整度
            complete = True
            for field in required_fields:
                if not product.get(field):
                    complete = False
                    if field not in quality_report['missing_fields']:
                        quality_report['missing_fields'][field] = 0
                    quality_report['missing_fields'][field] += 1
            
            if complete:
                quality_report['complete_products'] += 1
            
            # 检查无效值
            for field, value in product.items():
                if self._is_invalid_value(field, value):
                    if field not in quality_report['invalid_values']:
                        quality_report['invalid_values'][field] = 0
                    quality_report['invalid_values'][field] += 1
        
        # 计算质量得分
        if quality_report['total_products'] > 0:
            quality_report['quality_score'] = (
                quality_report['complete_products'] / quality_report['total_products']
            ) * 100
        
        return quality_report
    
    def _is_invalid_value(self, field: str, value: Any) -> bool:
        """
        检查值是否无效
        """
        if value is None:
            return True
        
        if isinstance(value, str):
            # 检查是否为空或只包含空白字符
            if not value.strip():
                return True
            
            # 检查是否为默认值
            if value.lower() in ['unknown', 'unknown product', '暂无', 'null', 'none']:
                return True
        
        if field == 'current_price':
            if isinstance(value, (int, float)) and value <= 0:
                return True
        
        return False

class DeduplicationPipeline:
    """
    去重管道
    """
    
    def __init__(self):
        self.data_cleaner = DataCleaner()
        self.seen_products = set()
        self.duplicates_count = 0
    
    def process_item(self, item, spider):
        """
        处理项目,执行去重
        """
        # 清洗数据
        cleaned_item = self.data_cleaner.clean_product_data(dict(item))
        
        # 生成唯一标识符
        product_id = cleaned_item.get('product_id', 'unknown')
        title_hash = hash(cleaned_item.get('title', ''))
        url_hash = hash(cleaned_item.get('url', ''))
        
        identifier = f"{product_id}_{title_hash}_{url_hash}"
        
        if identifier in self.seen_products:
            self.duplicates_count += 1
            spider.logger.info(f"Duplicate product found: {cleaned_item.get('title', 'Unknown')}")
            # 返回None表示丢弃该项目
            return None
        
        self.seen_products.add(identifier)
        
        # 更新项目为清洗后的数据
        for key, value in cleaned_item.items():
            item[key] = value
        
        return item
    
    def close_spider(self, spider):
        """
        爬虫关闭时的清理工作
        """
        spider.logger.info(f"Removed {self.duplicates_count} duplicate products")
        spider.logger.info(f"Unique products processed: {len(self.seen_products)}")

class DataValidationPipeline:
    """
    数据验证管道
    """
    
    def __init__(self):
        self.data_cleaner = DataCleaner()
        self.validation_errors = []
        self.valid_items = 0
        self.invalid_items = 0
    
    def process_item(self, item, spider):
        """
        处理项目,执行验证
        """
        # 验证必需字段
        required_fields = ['title', 'current_price', 'url']
        
        for field in required_fields:
            if not item.get(field):
                error_msg = f"Missing required field '{field}' in item: {item.get('title', 'Unknown')}"
                self.validation_errors.append(error_msg)
                spider.logger.warning(error_msg)
                self.invalid_items += 1
                return None  # 丢弃无效项目
        
        # 验证数据格式
        price = item.get('current_price')
        if price is not None and (isinstance(price, (int, float)) and price <= 0):
            error_msg = f"Invalid price value '{price}' for item: {item.get('title', 'Unknown')}"
            self.validation_errors.append(error_msg)
            spider.logger.warning(error_msg)
            self.invalid_items += 1
            return None
        
        self.valid_items += 1
        return item
    
    def close_spider(self, spider):
        """
        爬虫关闭时的清理工作
        """
        spider.logger.info(f"Validation completed:")
        spider.logger.info(f"Valid items: {self.valid_items}")
        spider.logger.info(f"Invalid items: {self.invalid_items}")
        spider.logger.info(f"Total errors: {len(self.validation_errors)}")
        
        if self.validation_errors:
            spider.logger.info("Sample validation errors:")
            for error in self.validation_errors[:5]:  # 只显示前5个错误
                spider.logger.info(f"  - {error}")

class StoragePipeline:
    """
    存储管道
    """
    
    def __init__(self):
        self.items_stored = 0
        self.storage_method = 'json'  # 默认存储方式
    
    def open_spider(self, spider):
        """
        爬虫开启时的初始化
        """
        self.items_stored = 0
        
        # 根据配置选择存储方式
        storage_config = getattr(spider, 'storage_config', {})
        self.storage_method = storage_config.get('method', 'json')
        
        if self.storage_method == 'mongodb':
            self._setup_mongodb(storage_config)
        elif self.storage_method == 'mysql':
            self._setup_mysql(storage_config)
        elif self.storage_method == 'csv':
            self._setup_csv(storage_config)
    
    def process_item(self, item, spider):
        """
        处理项目,执行存储
        """
        if self.storage_method == 'mongodb':
            self._store_mongodb(item)
        elif self.storage_method == 'mysql':
            self._store_mysql(item)
        elif self.storage_method == 'csv':
            self._store_csv(item)
        else:
            self._store_json(item)
        
        self.items_stored += 1
        return item
    
    def _store_json(self, item):
        """
        JSON存储
        """
        import json
        import os
        from datetime import datetime
        
        # 创建存储目录
        output_dir = 'output'
        os.makedirs(output_dir, exist_ok=True)
        
        # 生成文件名
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        filename = f'{output_dir}/ecommerce_data_{timestamp}.json'
        
        # 写入数据
        with open(filename, 'a', encoding='utf-8') as f:
            f.write(json.dumps(dict(item), ensure_ascii=False, default=str) + '\n')
    
    def _store_csv(self, item):
        """
        CSV存储
        """
        import csv
        import os
        from datetime import datetime
        
        # 创建存储目录
        output_dir = 'output'
        os.makedirs(output_dir, exist_ok=True)
        
        # 生成文件名
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        filename = f'{output_dir}/ecommerce_data_{timestamp}.csv'
        
        # 写入数据
        write_header = not os.path.exists(filename)
        with open(filename, 'a', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=item.keys())
            if write_header:
                writer.writeheader()
            writer.writerow(dict(item))
    
    def _setup_mongodb(self, config):
        """
        设置MongoDB存储
        """
        try:
            from pymongo import MongoClient
            
            connection_string = config.get('connection_string', 'mongodb://localhost:27017/')
            database_name = config.get('database', 'ecommerce_db')
            collection_name = config.get('collection', 'products')
            
            self.mongo_client = MongoClient(connection_string)
            self.database = self.mongo_client[database_name]
            self.collection = self.database[collection_name]
            
        except ImportError:
            raise Exception("pymongo not installed. Install with: pip install pymongo")
    
    def _store_mongodb(self, item):
        """
        MongoDB存储
        """
        if hasattr(self, 'collection'):
            # 添加时间戳
            item['stored_at'] = datetime.now()
            self.collection.insert_one(dict(item))
    
    def _setup_mysql(self, config):
        """
        设置MySQL存储
        """
        try:
            import pymysql
            
            host = config.get('host', 'localhost')
            user = config.get('user', 'root')
            password = config.get('password', '')
            database = config.get('database', 'ecommerce_db')
            
            self.mysql_connection = pymysql.connect(
                host=host,
                user=user,
                password=password,
                database=database,
                charset='utf8mb4'
            )
            
            # 创建表
            self._create_mysql_table()
        
        except ImportError:
            raise Exception("pymysql not installed. Install with: pip install pymysql")
    
    def _create_mysql_table(self):
        """
        创建MySQL表
        """
        cursor = self.mysql_connection.cursor()
        
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS products (
            id INT AUTO_INCREMENT PRIMARY KEY,
            product_id VARCHAR(100),
            title TEXT,
            brand VARCHAR(200),
            category_path TEXT,
            current_price DECIMAL(10, 2),
            original_price DECIMAL(10, 2),
            discount_rate VARCHAR(20),
            specifications JSON,
            color_options JSON,
            size_options JSON,
            main_image TEXT,
            gallery_images JSON,
            rating DECIMAL(3, 2),
            review_count INT,
            stock_status VARCHAR(50),
            sales_volume VARCHAR(50),
            url TEXT,
            crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            source_website VARCHAR(200)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
        """
        
        cursor.execute(create_table_sql)
        self.mysql_connection.commit()
    
    def _store_mysql(self, item):
        """
        MySQL存储
        """
        if hasattr(self, 'mysql_connection'):
            cursor = self.mysql_connection.cursor()
            
            # 准备插入数据
            insert_sql = """
            INSERT INTO products (
                product_id, title, brand, category_path, current_price, 
                original_price, discount_rate, specifications, color_options, 
                size_options, main_image, gallery_images, rating, 
                review_count, stock_status, sales_volume, url, source_website
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            """
            
            # 处理JSON字段
            specifications = item.get('specifications')
            if isinstance(specifications, dict):
                specifications = json.dumps(specifications, ensure_ascii=False)
            
            color_options = item.get('color_options')
            if isinstance(color_options, list):
                color_options = json.dumps(color_options, ensure_ascii=False)
            
            size_options = item.get('size_options')
            if isinstance(size_options, list):
                size_options = json.dumps(size_options, ensure_ascii=False)
            
            gallery_images = item.get('gallery_images')
            if isinstance(gallery_images, list):
                gallery_images = json.dumps(gallery_images, ensure_ascii=False)
            
            values = (
                item.get('product_id'), item.get('title'), item.get('brand'), 
                item.get('category_path'), item.get('current_price'), 
                item.get('original_price'), item.get('discount_rate'), 
                specifications, color_options, size_options, 
                item.get('main_image'), gallery_images, 
                item.get('rating'), item.get('review_count'), 
                item.get('stock_status'), item.get('sales_volume'), 
                item.get('url'), item.get('source_website')
            )
            
            cursor.execute(insert_sql, values)
            self.mysql_connection.commit()

class MonitoringPipeline:
    """
    监控管道
    """
    
    def __init__(self):
        self.items_processed = 0
        self.start_time = None
        self.errors = []
    
    def open_spider(self, spider):
        """
        爬虫开启时的初始化
        """
        self.start_time = datetime.now()
        self.items_processed = 0
        spider.logger.info("E-commerce spider started at: %s", self.start_time)
    
    def process_item(self, item, spider):
        """
        处理项目,执行监控
        """
        self.items_processed += 1
        
        # 每处理100个项目记录一次进度
        if self.items_processed % 100 == 0:
            elapsed_time = datetime.now() - self.start_time
            spider.logger.info(f"Processed {self.items_processed} items in {elapsed_time}")
        
        return item
    
    def close_spider(self, spider):
        """
        爬虫关闭时的监控报告
        """
        end_time = datetime.now()
        total_time = end_time - self.start_time
        
        spider.logger.info("=" * 50)
        spider.logger.info("E-COMMERCE SPIDER MONITORING REPORT")
        spider.logger.info("=" * 50)
        spider.logger.info(f"Start Time: {self.start_time}")
        spider.logger.info(f"End Time: {end_time}")
        spider.logger.info(f"Total Runtime: {total_time}")
        spider.logger.info(f"Items Processed: {self.items_processed}")
        spider.logger.info(f"Average Speed: {self.items_processed / total_time.total_seconds():.2f} items/sec")
        spider.logger.info("=" * 50)

## 性能优化技巧 \{#性能优化技巧}

### 并发与延迟优化

```python
# 优化Scrapy设置以提高性能
CUSTOM_SETTINGS = {
    # 并发设置
    'CONCURRENT_REQUESTS': 32,
    'CONCURRENT_REQUESTS_PER_DOMAIN': 8,
    'CONCURRENT_REQUESTS_PER_IP': 0,
    
    # 下载延迟
    'DOWNLOAD_DELAY': 1,
    'RANDOMIZE_DOWNLOAD_DELAY': 0.5,
    
    # 自动限速
    'AUTOTHROTTLE_ENABLED': True,
    'AUTOTHROTTLE_START_DELAY': 1,
    'AUTOTHROTTLE_MAX_DELAY': 10,
    'AUTOTHROTTLE_TARGET_CONCURRENCY': 4.0,
    'AUTOTHROTTLE_DEBUG': False,
    
    # DNS缓存
    'DNSCACHE_ENABLED': True,
    
    # 连接池
    'REACTOR_THREADPOOL_MAXSIZE': 20,
    
    # 重试设置
    'RETRY_TIMES': 3,
    'RETRY_HTTP_CODES': [500, 502, 503, 504, 408, 429],
    
    # 缓存
    'HTTPCACHE_ENABLED': False,  # 电商爬虫通常不需要缓存
}

内存优化

# 内存优化配置
MEMORY_OPTIMIZATION = {
    # 下载器设置
    'DOWNLOAD_MAXSIZE': 1073741824,  # 1GB
    'DOWNLOAD_WARNSIZE': 33554432,   # 32MB
    
    # Item处理器
    'ITEM_PIPELINES': {
        'ecommerce_spider.pipelines.MemoryOptimizedPipeline': 300,
    },
    
    # 日志级别
    'LOG_LEVEL': 'INFO',
}

class MemoryOptimizedPipeline:
    """
    内存优化管道
    """
    
    def __init__(self):
        self.buffer = []
        self.buffer_size = 1000  # 缓冲区大小
    
    def process_item(self, item, spider):
        """
        处理项目,批量存储以减少内存占用
        """
        self.buffer.append(dict(item))
        
        if len(self.buffer) >= self.buffer_size:
            self._flush_buffer()
        
        return item
    
    def _flush_buffer(self):
        """
        刷新缓冲区
        """
        if self.buffer:
            # 批量存储到数据库或其他存储系统
            self._batch_store(self.buffer)
            self.buffer.clear()
    
    def close_spider(self, spider):
        """
        关闭爬虫时处理剩余数据
        """
        self._flush_buffer()
    
    def _batch_store(self, items):
        """
        批量存储
        """
        # 实现批量存储逻辑
        pass

数据库优化

# 数据库连接优化
DATABASE_OPTIMIZATION = {
    'mysql': {
        'pool_size': 20,
        'pool_recycle': 3600,
        'pool_timeout': 20,
        'max_overflow': 0,
        'pool_pre_ping': True,
    },
    'mongodb': {
        'max_pool_size': 50,
        'min_pool_size': 10,
        'max_idle_time_ms': 30000,
        'server_selection_timeout_ms': 5000,
    }
}

项目部署与监控

Docker部署

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["scrapy", "crawl", "ecommerce"]
# docker-compose.yml
version: '3.8'

services:
  ecommerce-spider:
    build: .
    volumes:
      - ./output:/app/output
    environment:
      - SCRAPY_SETTINGS_MODULE=ecommerce_spider.settings
    networks:
      - spider-network

  mongodb:
    image: mongo:4.4
    ports:
      - "27017:27017"
    volumes:
      - mongodb_data:/data/db
    networks:
      - spider-network

  redis:
    image: redis:6.2
    ports:
      - "6379:6379"
    networks:
      - spider-network

networks:
  spider-network:
    driver: bridge

volumes:
  mongodb_data:

部署脚本

#!/bin/bash
# deploy.sh - 电商爬虫部署脚本

set -e

echo "开始部署电商爬虫..."

# 检查依赖
echo "检查依赖..."
pip install -r requirements.txt

# 创建输出目录
mkdir -p output logs

# 设置环境变量
export SCRAPY_SETTINGS_MODULE=ecommerce_spider.settings

# 运行爬虫
echo "启动电商爬虫..."
scrapy crawl ecommerce \
    -s LOG_FILE=logs/ecommerce_$(date +%Y%m%d_%H%M%S).log \
    -s JOBDIR=crawls/ecommerce_job

echo "部署完成!"

监控脚本

# monitoring.py - 爬虫监控脚本
import psutil
import time
import logging
from datetime import datetime

class SpiderMonitor:
    """
    爬虫监控器
    """
    
    def __init__(self):
        self.logger = self._setup_logger()
        self.process = psutil.Process()
    
    def _setup_logger(self):
        """
        设置日志记录器
        """
        logger = logging.getLogger('SpiderMonitor')
        logger.setLevel(logging.INFO)
        
        handler = logging.FileHandler(f'monitoring_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        
        logger.addHandler(handler)
        return logger
    
    def collect_metrics(self):
        """
        收集监控指标
        """
        metrics = {
            'timestamp': datetime.now().isoformat(),
            'cpu_percent': self.process.cpu_percent(),
            'memory_info': self.process.memory_info()._asdict(),
            'disk_io_counters': self.process.io_counters()._asdict() if hasattr(self.process.io_counters(), '_asdict') else {},
            'num_threads': self.process.num_threads(),
            'num_fds': self.process.num_fds() if hasattr(self.process, 'num_fds') else 0,
        }
        
        return metrics
    
    def log_metrics(self, metrics):
        """
        记录监控指标
        """
        self.logger.info(f"CPU: {metrics['cpu_percent']}%")
        self.logger.info(f"Memory: {metrics['memory_info']['rss'] / 1024 / 1024:.2f} MB")
        self.logger.info(f"Threads: {metrics['num_threads']}")
    
    def start_monitoring(self, interval=60):
        """
        开始监控
        """
        self.logger.info("开始监控爬虫...")
        
        try:
            while True:
                metrics = self.collect_metrics()
                self.log_metrics(metrics)
                time.sleep(interval)
        except KeyboardInterrupt:
            self.logger.info("监控已停止")

if __name__ == "__main__":
    monitor = SpiderMonitor()
    monitor.start_monitoring()

SEO优化建议

网站SEO分析

# seo_analyzer.py - 电商网站SEO分析工具
import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import urljoin, urlparse

class SEOAnalyzer:
    """
    SEO分析器
    """
    
    def __init__(self, base_url):
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (compatible; SEOAnalyzer/1.0)'
        })
    
    def analyze_page(self, url):
        """
        分析单个页面的SEO
        """
        try:
            response = self.session.get(url)
            soup = BeautifulSoup(response.content, 'html.parser')
            
            analysis = {
                'url': url,
                'status_code': response.status_code,
                'title': self._get_title(soup),
                'meta_description': self._get_meta_description(soup),
                'meta_keywords': self._get_meta_keywords(soup),
                'h1_count': len(soup.find_all('h1')),
                'h2_h6_count': len(soup.find_all(['h2', 'h3', 'h4', 'h5', 'h6'])),
                'image_alt_count': self._count_images_with_alt(soup),
                'internal_links': self._count_internal_links(soup, url),
                'external_links': self._count_external_links(soup, url),
                'page_size': len(response.content),
                'load_time': response.elapsed.total_seconds(),
            }
            
            return analysis
        except Exception as e:
            return {'url': url, 'error': str(e)}
    
    def _get_title(self, soup):
        """
        获取页面标题
        """
        title_tag = soup.find('title')
        return title_tag.get_text().strip() if title_tag else None
    
    def _get_meta_description(self, soup):
        """
        获取meta描述
        """
        desc_tag = soup.find('meta', attrs={'name': 'description'})
        return desc_tag.get('content', '').strip() if desc_tag else None
    
    def _get_meta_keywords(self, soup):
        """
        获取meta关键词
        """
        keywords_tag = soup.find('meta', attrs={'name': 'keywords'})
        return keywords_tag.get('content', '').strip() if keywords_tag else None
    
    def _count_images_with_alt(self, soup):
        """
        计算有alt属性的图片数量
        """
        images = soup.find_all('img')
        return sum(1 for img in images if img.get('alt'))
    
    def _count_internal_links(self, soup, base_url):
        """
        计算内部链接数量
        """
        base_domain = urlparse(base_url).netloc
        links = soup.find_all('a', href=True)
        internal_count = 0
        
        for link in links:
            href = link['href']
            link_domain = urlparse(urljoin(base_url, href)).netloc
            if link_domain == base_domain:
                internal_count += 1
        
        return internal_count
    
    def _count_external_links(self, soup, base_url):
        """
        计算外部链接数量
        """
        base_domain = urlparse(base_url).netloc
        links = soup.find_all('a', href=True)
        external_count = 0
        
        for link in links:
            href = link['href']
            link_domain = urlparse(urljoin(base_url, href)).netloc
            if link_domain != base_domain:
                external_count += 1
        
        return external_count
    
    def analyze_sitemap(self, sitemap_url):
        """
        分析站点地图
        """
        try:
            response = self.session.get(sitemap_url)
            soup = BeautifulSoup(response.content, 'xml')
            
            urls = []
            for url_elem in soup.find_all('url'):
                loc = url_elem.find('loc')
                if loc:
                    urls.append(loc.text)
            
            return urls
        except Exception as e:
            return {'error': str(e)}

# 使用示例
analyzer = SEOAnalyzer('https://example-ecommerce.com')
analysis_result = analyzer.analyze_page('https://example-ecommerce.com/products/123')
print(analysis_result)

爬虫SEO最佳实践

# 爬虫SEO最佳实践配置
CRAWLER_SEO_BEST_PRACTICES = {
    # 遵守robots.txt
    'ROBOTSTXT_OBEY': True,
    
    # 合理的爬取频率
    'DOWNLOAD_DELAY': 2,
    'RANDOMIZE_DOWNLOAD_DELAY': 0.5,
    
    # 合适的User-Agent
    'USER_AGENT': 'EcommerceBot/1.0 (+http://yourdomain.com/bot-info)',
    
    # 设置合理的并发数
    'CONCURRENT_REQUESTS': 8,
    'CONCURRENT_REQUESTS_PER_DOMAIN': 2,
    
    # 设置爬虫友好头部
    'DEFAULT_REQUEST_HEADERS': {
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Accept-Encoding': 'gzip, deflate',
        'Connection': 'keep-alive',
        'Referer': 'https://www.google.com/',
    },
    
    # 监控和日志
    'LOG_LEVEL': 'INFO',
    'LOGSTATS_INTERVAL': 60,
    
    # 错误处理
    'RETRY_TIMES': 3,
    'RETRY_HTTP_CODES': [500, 502, 503, 504, 408, 429],
}

通过本项目的实施,我们构建了一个功能完整的电商全站爬取系统,涵盖了从基础架构到高级功能的各个方面。这个系统不仅能够高效地抓取电商网站数据,还具备良好的可扩展性、稳定性和维护性,为电商数据分析提供了强有力的技术支撑。