#Scrapy电商全站爬取实战项目 - 从零构建多层级电商数据抓取系统
📂 所属阶段:第四阶段 — 实战演练(项目开发篇)
🔗 相关章节:Spider实战 · Selector选择器 · Pipeline管道实战 · 反爬对抗实战
#目录
#项目背景与目标
电商网站爬取是爬虫领域的经典应用场景,涉及多级分类导航、深度翻页、大量商品数据提取等复杂技术挑战。本项目将构建一个完整的电商全站抓取系统,具备以下核心能力:
- 多级分类处理:递归爬取电商网站的多层级分类结构
- 深度翻页:智能处理商品列表页的多页翻页逻辑
- 商品数据提取:精准提取商品标题、价格、描述、规格等关键信息
- 反爬虫对抗:集成IP轮换、请求头伪装、频率控制等反爬策略
- 数据质量保证:实现数据去重、清洗、验证机制
- 性能优化:采用分布式架构提升抓取效率
#项目技术栈
- 爬虫框架:Scrapy 2.x
- 数据处理:Pydantic、Pandas
- 存储方案:MongoDB、MySQL
- 代理管理:Scrapy-Proxy-Pool
- 分布式:Scrapy-Redis
#电商网站架构分析
#典型电商网站结构
"""
电商网站典型架构:
电商平台
├── 首页 (Homepage)
│ └── 导航菜单 (Navigation Menu)
├── 分类页面 (Category Pages)
│ ├── 一级分类 (Primary Categories)
│ │ ├── 二级分类 (Secondary Categories)
│ │ │ └── 三级分类 (Tertiary Categories)
│ │ └── 商品列表 (Product Listings)
│ └── 筛选条件 (Filter Options)
├── 商品列表页 (Product List Pages)
│ ├── 分页导航 (Pagination Navigation)
│ ├── 商品卡片 (Product Cards)
│ │ ├── 商品图片 (Product Images)
│ │ ├── 商品标题 (Product Title)
│ │ ├── 商品价格 (Product Price)
│ │ └── 商品链接 (Product Link)
│ └── 排序选项 (Sorting Options)
├── 商品详情页 (Product Detail Pages)
│ ├── 商品基本信息 (Basic Product Info)
│ ├── 商品图片 (Product Images)
│ ├── 价格信息 (Price Information)
│ ├── 规格参数 (Specifications)
│ ├── 用户评价 (User Reviews)
│ └── 相关推荐 (Recommendations)
└── 用户中心 (User Center)
"""#爬取难点分析
"""
电商爬取主要难点:
1. 反爬虫机制:
- 频率限制 (Rate Limiting)
- IP封禁 (IP Blocking)
- 验证码挑战 (CAPTCHA Challenges)
- 浏览器指纹检测 (Browser Fingerprinting)
2. 数据结构复杂:
- 多层级分类体系
- 动态加载的商品数据
- JavaScript渲染的内容
- AJAX请求获取的数据
3. 网站结构变化:
- 定期的UI改版
- CSS选择器变化
- API接口调整
- 反爬策略升级
"""#项目架构设计
#项目目录结构
ecommerce_spider/
├── ecommerce_spider/
│ ├── spiders/
│ │ ├── __init__.py
│ │ ├── ecommerce_spider.py # 主爬虫文件
│ │ └── category_spider.py # 分类爬虫
│ ├── items/
│ │ ├── __init__.py
│ │ ├── product_item.py # 商品数据模型
│ │ └── category_item.py # 分类数据模型
│ ├── pipelines/
│ │ ├── __init__.py
│ │ ├── validation_pipeline.py # 数据验证管道
│ │ ├── deduplication_pipeline.py # 数据去重管道
│ │ ├── storage_pipeline.py # 数据存储管道
│ │ └── monitoring_pipeline.py # 监控管道
│ ├── middlewares/
│ │ ├── __init__.py
│ │ ├── anti_crawler_middleware.py # 反爬中间件
│ │ ├── proxy_middleware.py # 代理中间件
│ │ └── retry_middleware.py # 重试中间件
│ ├── utils/
│ │ ├── __init__.py
│ │ ├── category_parser.py # 分类解析工具
│ │ ├── price_parser.py # 价格解析工具
│ │ └── data_cleaner.py # 数据清洗工具
│ ├── settings.py # 项目配置
│ └── __init__.py
├── scrapy.cfg # Scrapy配置文件
└── requirements.txt # 依赖包列表#核心组件关系图
graph TD
A[Start URLs] --> B[Category Parser]
B --> C[Product List Parser]
C --> D[Product Detail Parser]
D --> E[Data Validation]
E --> F[Data Storage]
F --> G[Monitoring]
H[Anti-Crawler Middleware] -.-> B
I[Proxy Middleware] -.-> C
J[Retry Middleware] -.-> D#数据模型定义
#商品数据模型
# ecommerce_spider/items/product_item.py
import scrapy
from itemloaders.processors import TakeFirst, MapCompose, Join
from w3lib.html import remove_tags
import re
def clean_price(value):
"""清理价格数据"""
if value:
# 移除货币符号和空格
cleaned = re.sub(r'[^\d.,]', '', value.strip())
try:
return float(cleaned)
except ValueError:
return None
return None
def clean_title(value):
"""清理标题数据"""
if value:
return value.strip()
return None
def clean_description(value):
"""清理描述数据"""
if value:
# 移除HTML标签并清理空白
cleaned = remove_tags(value)
return cleaned.strip()
return None
def extract_rating(value):
"""提取评分数据"""
if value:
match = re.search(r'(\d+(?:\.\d+)?)', str(value))
if match:
try:
return float(match.group(1))
except ValueError:
return None
return None
class ProductItem(scrapy.Item):
"""
商品数据模型
"""
# 基本信息
product_id = scrapy.Field(
input_processor=MapCompose(str.strip),
output_processor=TakeFirst()
)
title = scrapy.Field(
input_processor=MapCompose(clean_title),
output_processor=TakeFirst()
)
brand = scrapy.Field(
input_processor=MapCompose(str.strip),
output_processor=TakeFirst()
)
category_path = scrapy.Field(
input_processor=MapCompose(str.strip),
output_processor=Join(' > ')
)
# 价格信息
current_price = scrapy.Field(
input_processor=MapCompose(clean_price),
output_processor=TakeFirst()
)
original_price = scrapy.Field(
input_processor=MapCompose(clean_price),
output_processor=TakeFirst()
)
discount_rate = scrapy.Field(
input_processor=MapCompose(str.strip),
output_processor=TakeFirst()
)
# 产品规格
specifications = scrapy.Field()
color_options = scrapy.Field()
size_options = scrapy.Field()
# 图片信息
main_image = scrapy.Field(
input_processor=MapCompose(str.strip),
output_processor=TakeFirst()
)
gallery_images = scrapy.Field()
# 评价信息
rating = scrapy.Field(
input_processor=MapCompose(extract_rating),
output_processor=TakeFirst()
)
review_count = scrapy.Field(
input_processor=MapCompose(str.strip),
output_processor=TakeFirst()
)
# 库存与销售
stock_status = scrapy.Field(
input_processor=MapCompose(str.strip),
output_processor=TakeFirst()
)
sales_volume = scrapy.Field(
input_processor=MapCompose(str.strip),
output_processor=TakeFirst()
)
# 元数据
url = scrapy.Field(
input_processor=MapCompose(str.strip),
output_processor=TakeFirst()
)
crawled_at = scrapy.Field()
source_website = scrapy.Field(
input_processor=MapCompose(str.strip),
output_processor=TakeFirst()
)
class CategoryItem(scrapy.Item):
"""
分类数据模型
"""
category_id = scrapy.Field()
category_name = scrapy.Field(
input_processor=MapCompose(str.strip),
output_processor=TakeFirst()
)
parent_category = scrapy.Field()
level = scrapy.Field()
url = scrapy.Field(
input_processor=MapCompose(str.strip),
output_processor=TakeFirst()
)
product_count = scrapy.Field()
subcategories = scrapy.Field()#配置数据模型
# ecommerce_spider/items/config_item.py
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
from datetime import datetime
class EcommerceConfig(BaseModel):
"""
电商爬虫配置模型
"""
website_name: str = Field(..., description="网站名称")
base_url: str = Field(..., description="基础URL")
max_concurrent_requests: int = Field(16, description="最大并发请求数")
download_delay: float = Field(1.0, description="下载延迟")
auto_throttle_enabled: bool = Field(True, description="启用自动限速")
auto_throttle_start_delay: float = Field(1.0, description="自动限速起始延迟")
auto_throttle_max_delay: float = Field(10.0, description="自动限速最大延迟")
retry_times: int = Field(3, description="重试次数")
user_agent: str = Field("Mozilla/5.0 (compatible; EcommerceBot/1.0)", description="用户代理")
proxy_enabled: bool = Field(False, description="启用代理")
proxy_list: Optional[list] = Field(None, description="代理列表")
pipelines: Dict[str, int] = Field(default_factory=dict, description="管道配置")
custom_settings: Dict[str, Any] = Field(default_factory=dict, description="自定义设置")
class Config:
arbitrary_types_allowed = True#爬虫实现详解
#主爬虫实现
# ecommerce_spider/spiders/ecommerce_spider.py
import scrapy
import json
import re
from urllib.parse import urljoin, urlparse
from ecommerce_spider.items import ProductItem, CategoryItem
from ecommerce_spider.utils.category_parser import CategoryParser
from ecommerce_spider.utils.price_parser import PriceParser
class EcommerceSpider(scrapy.Spider):
"""
电商网站全站爬虫
"""
name = 'ecommerce'
custom_settings = {
'DOWNLOAD_DELAY': 2,
'CONCURRENT_REQUESTS': 8,
'CONCURRENT_REQUESTS_PER_DOMAIN': 2,
'RANDOMIZE_DOWNLOAD_DELAY': 0.5,
'AUTOTHROTTLE_ENABLED': True,
'AUTOTHROTTLE_START_DELAY': 1,
'AUTOTHROTTLE_MAX_DELAY': 10,
'RETRY_TIMES': 3,
'RETRY_HTTP_CODES': [500, 502, 503, 504, 408, 429],
}
def __init__(self, *args, **kwargs):
super(EcommerceSpider, self).__init__(*args, **kwargs)
# 初始化配置
self.start_urls = kwargs.get('start_urls', ['https://example.com/categories'])
self.max_pages = int(kwargs.get('max_pages', 100))
self.category_parser = CategoryParser()
self.price_parser = PriceParser()
# 统计变量
self.stats = {
'categories_crawled': 0,
'products_crawled': 0,
'errors': 0
}
def start_requests(self):
"""
开始请求
"""
for url in self.start_urls:
yield scrapy.Request(
url=url,
callback=self.parse_categories,
errback=self.handle_error,
meta={
'category_level': 0,
'parent_category': None
}
)
def parse_categories(self, response):
"""
解析分类页面
"""
self.logger.info(f"Parsing categories from: {response.url}")
# 提取分类链接
category_links = response.css('a.category-link, .category-item a, .nav-category a')
for link in category_links:
category_url = link.css('::attr(href)').get()
if category_url:
category_url = urljoin(response.url, category_url)
# 提取分类信息
category_name = link.css('::text').get() or link.css('span::text').get()
# 创建分类项
category_item = CategoryItem()
category_item['category_name'] = category_name.strip() if category_name else 'Unknown'
category_item['url'] = category_url
category_item['level'] = response.meta.get('category_level', 0) + 1
category_item['parent_category'] = response.meta.get('parent_category')
yield category_item
# 请求分类页面的商品列表
yield scrapy.Request(
url=category_url,
callback=self.parse_product_list,
errback=self.handle_error,
meta={
'category_name': category_name.strip() if category_name else 'Unknown',
'category_level': response.meta.get('category_level', 0) + 1,
'parent_category': category_name.strip() if category_name else 'Unknown'
}
)
# 递归处理子分类
subcategory_links = response.css('.subcategory a, .sub-category a')
for link in subcategory_links:
subcategory_url = link.css('::attr(href)').get()
if subcategory_url:
subcategory_url = urljoin(response.url, subcategory_url)
yield scrapy.Request(
url=subcategory_url,
callback=self.parse_categories,
errback=self.handle_error,
meta={
'category_level': response.meta.get('category_level', 0) + 1,
'parent_category': response.meta.get('category_name', 'Unknown')
}
)
def parse_product_list(self, response):
"""
解析商品列表页面
"""
self.logger.info(f"Parsing products from: {response.url}")
# 提取商品链接
product_links = response.css(
'a.product-link, .product-item a, .product-card a, '
'.product-container a, .goods-item a'
)
for link in product_links:
product_url = link.css('::attr(href)').get()
if product_url:
product_url = urljoin(response.url, product_url)
yield scrapy.Request(
url=product_url,
callback=self.parse_product_detail,
errback=self.handle_error,
meta={
'category_name': response.meta.get('category_name'),
'parent_category': response.meta.get('parent_category')
}
)
# 处理翻页
next_page = self._find_next_page(response)
if next_page and response.meta.get('page_number', 1) < self.max_pages:
next_page_url = urljoin(response.url, next_page)
yield scrapy.Request(
url=next_page_url,
callback=self.parse_product_list,
errback=self.handle_error,
meta={
'category_name': response.meta.get('category_name'),
'parent_category': response.meta.get('parent_category'),
'page_number': response.meta.get('page_number', 1) + 1
}
)
def parse_product_detail(self, response):
"""
解析商品详情页面
"""
self.logger.info(f"Parsing product detail: {response.url}")
try:
product_item = ProductItem()
# 提取商品ID
product_id = self._extract_product_id(response)
product_item['product_id'] = product_id
# 提取标题
title_selectors = [
'h1::text',
'.product-title::text',
'.product-name::text',
'.title::text',
'title::text'
]
title = None
for selector in title_selectors:
title = response.css(selector).get()
if title:
break
product_item['title'] = title.strip() if title else 'Unknown Product'
# 提取品牌
brand_selectors = [
'[itemprop="brand"]::text',
'.brand::text',
'.product-brand::text',
'[data-brand]::text'
]
brand = None
for selector in brand_selectors:
brand = response.css(selector).get()
if brand:
break
product_item['brand'] = brand.strip() if brand else 'Unknown Brand'
# 提取价格
price_selectors = [
'[itemprop="price"]::text',
'.price::text',
'.product-price::text',
'.current-price::text',
'.sale-price::text'
]
current_price = None
for selector in price_selectors:
current_price = response.css(selector).get()
if current_price:
break
product_item['current_price'] = self.price_parser.parse_price(current_price) if current_price else None
# 提取原价
original_price_selectors = [
'.original-price::text',
'.list-price::text',
'.was-price::text'
]
original_price = None
for selector in original_price_selectors:
original_price = response.css(selector).get()
if original_price:
break
product_item['original_price'] = self.price_parser.parse_price(original_price) if original_price else None
# 计算折扣率
if product_item.get('original_price') and product_item.get('current_price'):
discount = (1 - product_item['current_price'] / product_item['original_price']) * 100
product_item['discount_rate'] = f"{discount:.1f}%"
# 提取主图
image_selectors = [
'[itemprop="image"]::attr(src)',
'.main-image img::attr(src)',
'.product-image img::attr(src)',
'.product-img img::attr(src)'
]
main_image = None
for selector in image_selectors:
main_image = response.css(selector).get()
if main_image:
break
product_item['main_image'] = urljoin(response.url, main_image) if main_image else None
# 提取画廊图片
gallery_images = response.css('.gallery img::attr(src)').getall()
product_item['gallery_images'] = [urljoin(response.url, img) for img in gallery_images if img]
# 提取评分
rating_selectors = [
'[itemprop="ratingValue"]::text',
'.rating::text',
'.score::text',
'.stars::text'
]
rating = None
for selector in rating_selectors:
rating = response.css(selector).get()
if rating:
break
product_item['rating'] = float(rating) if rating and rating.replace('.', '').isdigit() else None
# 提取评论数
review_selectors = [
'[itemprop="reviewCount"]::text',
'.review-count::text',
'.comment-count::text',
'.reviews::text'
]
review_count = None
for selector in review_selectors:
review_count = response.css(selector).get()
if review_count:
break
product_item['review_count'] = review_count.strip() if review_count else None
# 提取库存状态
stock_selectors = [
'.stock-status::text',
'.availability::text',
'.in-stock::text',
'.out-of-stock::text'
]
stock_status = None
for selector in stock_selectors:
stock_status = response.css(selector).get()
if stock_status:
break
product_item['stock_status'] = stock_status.strip() if stock_status else 'Unknown'
# 提取销量
sales_selectors = [
'.sales-volume::text',
'.sold-count::text',
'.sales-count::text'
]
sales_volume = None
for selector in sales_selectors:
sales_volume = response.css(selector).get()
if sales_volume:
break
product_item['sales_volume'] = sales_volume.strip() if sales_volume else 'Unknown'
# 提取规格参数
specs = self._extract_specifications(response)
product_item['specifications'] = specs
# 提取颜色选项
colors = response.css('.color-option::text, .color-item::text').getall()
product_item['color_options'] = [color.strip() for color in colors if color.strip()]
# 提取尺寸选项
sizes = response.css('.size-option::text, .size-item::text').getall()
product_item['size_options'] = [size.strip() for size in sizes if size.strip()]
# 元数据
product_item['url'] = response.url
product_item['category_path'] = f"{response.meta.get('parent_category', '')} > {response.meta.get('category_name', '')}"
product_item['crawled_at'] = scrapy.utils.misc.get_func_args(self.parse_product_detail)[0].now()
product_item['source_website'] = urlparse(response.url).netloc
yield product_item
except Exception as e:
self.logger.error(f"Error parsing product detail: {response.url}, Error: {str(e)}")
self.stats['errors'] += 1
def _extract_product_id(self, response):
"""
提取商品ID
"""
# 从URL中提取
url = response.url
product_id_match = re.search(r'/(\d+)/?$', url) or re.search(r'id=(\d+)', url)
if product_id_match:
return product_id_match.group(1)
# 从页面元素中提取
id_selectors = [
'[data-product-id]',
'[data-id]',
'.product-id',
'#product-id'
]
for selector in id_selectors:
element = response.css(selector)
if element:
data_id = element.attrib.get('data-product-id') or element.attrib.get('data-id')
if data_id:
return data_id
# 从JSON-LD中提取
json_ld_scripts = response.css('script[type="application/ld+json"]::text').getall()
for script in json_ld_scripts:
try:
data = json.loads(script)
if isinstance(data, dict) and data.get('@type') == 'Product':
return data.get('sku') or data.get('productID')
except:
continue
return 'unknown'
def _extract_specifications(self, response):
"""
提取规格参数
"""
specs = {}
# 尝试从表格中提取规格
spec_tables = response.css('.specs-table, .product-specs, .specifications')
for table in spec_tables:
rows = table.css('tr')
for row in rows:
key = row.css('td:first-child, th::text').get()
value = row.css('td:last-child, td:nth-child(2)::text').get()
if key and value:
specs[key.strip()] = value.strip()
# 尝试从列表中提取规格
spec_lists = response.css('.specs-list, .spec-list')
for spec_list in spec_lists:
items = spec_list.css('li')
for item in items:
text = item.css('::text').get()
if ':' in text:
key, value = text.split(':', 1)
specs[key.strip()] = value.strip()
# 尝试从JSON数据中提取
script_tags = response.css('script:not([src])::text').getall()
for script in script_tags:
if 'specs' in script.lower() or 'specifications' in script.lower():
try:
# 尝试解析JSON
json_match = re.search(r'({.*?})', script, re.DOTALL)
if json_match:
data = json.loads(json_match.group(1))
if isinstance(data, dict):
specs.update(data)
except:
continue
return specs
def _find_next_page(self, response):
"""
查找下一页链接
"""
next_selectors = [
'a.next::attr(href)',
'a[rel="next"]::attr(href)',
'.next::attr(href)',
'a:contains("下一页")::attr(href)',
'a:contains("Next")::attr(href)',
'a:contains("»")::attr(href)',
'.pagination .next a::attr(href)',
'.pager .next a::attr(href)'
]
for selector in next_selectors:
next_url = response.css(selector).get()
if next_url:
return next_url
# 尝试从JavaScript中提取
script_content = ' '.join(response.css('script::text').getall())
next_match = re.search(r'"nextPageUrl"\s*:\s*"([^"]+)"', script_content)
if next_match:
return next_match.group(1)
return None
def handle_error(self, failure):
"""
处理请求错误
"""
self.logger.error(f"Request failed: {failure.request.url}")
self.logger.error(f"Failure: {failure}")
self.stats['errors'] += 1
def closed(self, reason):
"""
爬虫关闭时的清理工作
"""
self.logger.info(f"Spider closed. Reason: {reason}")
self.logger.info(f"Statistics: {self.stats}")#多级分类处理策略
#分类解析器实现
# ecommerce_spider/utils/category_parser.py
import re
from urllib.parse import urljoin, urlparse
from typing import List, Dict, Optional
import scrapy
class CategoryParser:
"""
分类解析器
"""
def __init__(self):
self.category_patterns = [
r'/category/([^/]+)',
r'/cat/([^/]+)',
r'/c/([^/]+)',
r'category=([^&]+)',
r'cat=([^&]+)'
]
self.level_indicators = [
'primary', 'secondary', 'tertiary', 'sub', 'child'
]
def parse_category_hierarchy(self, response):
"""
解析分类层级结构
"""
hierarchy = []
# 从面包屑导航解析
breadcrumbs = self._parse_breadcrumbs(response)
if breadcrumbs:
hierarchy.extend(breadcrumbs)
# 从URL结构解析
url_categories = self._parse_url_categories(response.url)
if url_categories:
hierarchy.extend(url_categories)
# 从页面元素解析
page_categories = self._parse_page_categories(response)
if page_categories:
hierarchy.extend(page_categories)
return self._deduplicate_hierarchy(hierarchy)
def _parse_breadcrumbs(self, response):
"""
解析面包屑导航
"""
breadcrumb_selectors = [
'.breadcrumb a',
'.breadcrumbs a',
'.crumbs a',
'nav[aria-label*="breadcrumb"] a',
'[role="navigation"] a'
]
for selector in breadcrumb_selectors:
breadcrumbs = response.css(selector)
if breadcrumbs:
breadcrumb_data = []
for i, breadcrumb in enumerate(breadcrumbs):
name = breadcrumb.css('::text').get()
url = breadcrumb.css('::attr(href)').get()
if name and url:
breadcrumb_data.append({
'name': name.strip(),
'url': urljoin(response.url, url),
'level': i
})
return breadcrumb_data
return []
def _parse_url_categories(self, url):
"""
从URL解析分类
"""
parsed_url = urlparse(url)
path_parts = parsed_url.path.strip('/').split('/')
categories = []
for i, part in enumerate(path_parts):
if any(pattern.format(part) for pattern in self.category_patterns):
categories.append({
'name': part,
'url': url,
'level': i
})
return categories
def _parse_page_categories(self, response):
"""
从页面元素解析分类
"""
category_selectors = [
'.category-nav a',
'.category-menu a',
'.nav-category a',
'.sidebar-category a'
]
categories = []
for selector in category_selectors:
elements = response.css(selector)
for i, element in enumerate(elements):
name = element.css('::text').get()
url = element.css('::attr(href)').get()
if name and url:
categories.append({
'name': name.strip(),
'url': urljoin(response.url, url),
'level': self._determine_level(element)
})
return categories
def _determine_level(self, element):
"""
确定分类层级
"""
classes = element.css('::attr(class)').get() or ''
text = element.css('::text').get() or ''
level = 0
if any(indicator in classes.lower() for indicator in self.level_indicators):
level += 1
if any(indicator in text.lower() for indicator in self.level_indicators):
level += 1
return level
def _deduplicate_hierarchy(self, hierarchy):
"""
去重层级结构
"""
seen = set()
unique_hierarchy = []
for item in hierarchy:
key = (item['name'], item['url'])
if key not in seen:
seen.add(key)
unique_hierarchy.append(item)
return unique_hierarchy
def build_category_tree(self, categories):
"""
构建分类树
"""
tree = {}
for category in categories:
path = category['name'].split(' > ')
current = tree
for level_name in path:
if level_name not in current:
current[level_name] = {}
current = current[level_name]
return tree
class AdvancedCategorySpider(scrapy.Spider):
"""
高级分类爬虫
"""
name = 'advanced_category'
def parse_category_with_depth(self, response, max_depth=3):
"""
深度解析分类
"""
parser = CategoryParser()
hierarchy = parser.parse_category_hierarchy(response)
# 提取当前层级的分类
current_categories = response.css(
'a.category-link, .category-item a, .nav-category a'
)
for i, category_link in enumerate(current_categories):
if i >= max_depth:
break
category_url = category_link.css('::attr(href)').get()
if category_url:
category_url = urljoin(response.url, category_url)
yield scrapy.Request(
url=category_url,
callback=self.parse_category_with_depth,
meta={
'depth': response.meta.get('depth', 0) + 1,
'max_depth': max_depth,
'hierarchy': hierarchy
}
)#深度翻页实现
#智能翻页处理器
# ecommerce_spider/utils/pagination_handler.py
import re
from urllib.parse import urljoin, urlparse, parse_qs
from typing import Optional, Dict, List
import scrapy
class PaginationHandler:
"""
翻页处理器
"""
def __init__(self):
self.pagination_patterns = [
r'page=(\d+)',
r'pagenum=(\d+)',
r'p=(\d+)',
r'/page/(\d+)',
r'/p(\d+)',
r'\?p=(\d+)'
]
self.next_page_indicators = [
'next', '下一页', '>', '»', 'forward',
'下一页', '后页', '下翻', 'next-page'
]
self.prev_page_indicators = [
'prev', 'previous', '上一页', '<', '«', 'back'
]
def extract_current_page(self, url):
"""
提取当前页码
"""
parsed = urlparse(url)
# 从查询参数中提取
query_params = parse_qs(parsed.query)
for param in ['page', 'pagenum', 'p', 'pg']:
if param in query_params:
try:
return int(query_params[param][0])
except (ValueError, IndexError):
continue
# 从URL路径中提取
for pattern in self.pagination_patterns:
match = re.search(pattern, url)
if match:
try:
return int(match.group(1))
except ValueError:
continue
return 1 # 默认第一页
def generate_next_page_url(self, current_url, current_page=None):
"""
生成下一页URL
"""
if current_page is None:
current_page = self.extract_current_page(current_url)
next_page = current_page + 1
# 尝试替换URL中的页码
for pattern in self.pagination_patterns:
if re.search(pattern, current_url):
next_url = re.sub(pattern, rf'page={next_page}', current_url)
return next_url
# 如果URL中没有页码参数,添加页码
parsed = urlparse(current_url)
if '?' in current_url:
return f"{current_url}&page={next_page}"
else:
return f"{current_url}?page={next_page}"
def detect_pagination_method(self, response):
"""
检测翻页方式
"""
pagination_info = {
'method': None,
'has_next': False,
'has_prev': False,
'total_pages': None
}
# 检测AJAX翻页
if self._has_ajax_pagination(response):
pagination_info['method'] = 'ajax'
# 检测普通翻页
elif self._has_standard_pagination(response):
pagination_info['method'] = 'standard'
# 检测无限滚动
elif self._has_infinite_scroll(response):
pagination_info['method'] = 'infinite'
# 检测是否有下一页
pagination_info['has_next'] = bool(self._find_next_page_link(response))
return pagination_info
def _has_ajax_pagination(self, response):
"""
检测是否为AJAX翻页
"""
ajax_indicators = [
'onclick*="ajax"',
'data-ajax',
'[data-load="ajax"]',
'.load-more-btn',
'.infinite-scroll'
]
for indicator in ajax_indicators:
if response.css(indicator):
return True
return False
def _has_standard_pagination(self, response):
"""
检测是否为标准翻页
"""
standard_selectors = [
'.pagination',
'.pager',
'.page-nav',
'.paging',
'.pages'
]
for selector in standard_selectors:
if response.css(selector):
return True
return False
def _has_infinite_scroll(self, response):
"""
检测是否为无限滚动
"""
infinite_indicators = [
'.infinite-scroll',
'.load-more',
'.show-more',
'[data-infinite="true"]',
'.lazy-load'
]
for indicator in infinite_indicators:
if response.css(indicator):
return True
return False
def _find_next_page_link(self, response):
"""
查找下一页链接
"""
# 标准翻页选择器
selectors = [
'a.next::attr(href)',
'a[rel="next"]::attr(href)',
'.next a::attr(href)',
'.pagination .next a::attr(href)',
'.pager .next a::attr(href)'
]
for selector in selectors:
link = response.css(selector).get()
if link:
return link
# 文字匹配
for indicator in self.next_page_indicators:
link = response.css(f'a:contains("{indicator}")::attr(href)').get()
if link:
return link
# 图标匹配
icon_selectors = [
'a:contains(">")::attr(href)',
'a:contains("»")::attr(href)',
'a[title*="next"]::attr(href)',
'a[aria-label*="next"]::attr(href)'
]
for selector in icon_selectors:
link = response.css(selector).get()
if link:
return link
return None
def handle_pagination(self, response, max_pages=100):
"""
处理翻页逻辑
"""
current_page = self.extract_current_page(response.url)
if current_page >= max_pages:
return None # 达到最大页数,停止翻页
next_page_url = self._find_next_page_link(response)
if next_page_url:
return urljoin(response.url, next_page_url)
# 如果找不到下一页链接,尝试生成
generated_url = self.generate_next_page_url(response.url, current_page)
if generated_url != response.url:
return generated_url
return None
class SmartPaginationMiddleware:
"""
智能翻页中间件
"""
def __init__(self):
self.pagination_handler = PaginationHandler()
def process_response(self, request, response, spider):
"""
处理响应,检测翻页
"""
if 'product_list' in request.callback.__name__:
pagination_info = self.pagination_handler.detect_pagination_method(response)
if pagination_info['has_next']:
# 提取下一页URL
next_url = self.pagination_handler.handle_pagination(
response,
max_pages=spider.settings.get('MAX_PAGES', 100)
)
if next_url:
# 生成下一页请求
next_request = scrapy.Request(
url=next_url,
callback=request.callback,
meta=request.meta,
dont_filter=True
)
return [response, next_request]
return response#商品数据提取
#数据提取策略
# ecommerce_spider/utils/data_extractor.py
import re
import json
from typing import Dict, List, Optional, Union
from bs4 import BeautifulSoup
import scrapy
class DataExtractor:
"""
数据提取器
"""
def __init__(self):
self.price_patterns = [
r'[\d,]+\.?\d*', # 匹配数字和小数点
r'¥([\d,]+\.?\d*)', # 匹配人民币
r'\$([\d,]+\.?\d*)', # 匹配美元
r'€([\d,]+\.?\d*)', # 匹配欧元
]
self.rating_patterns = [
r'(\d+(?:\.\d+)?)', # 匹配数字(含小数)
r'(\d+(?:\.\d+)?)/5', # 匹配5分制评分
r'(\d+(?:\.\d+)?)/10', # 匹配10分制评分
]
def extract_price(self, text: str) -> Optional[float]:
"""
提取价格
"""
if not text:
return None
# 移除货币符号和空格
cleaned = re.sub(r'[^\d.,]', '', text.strip())
# 查找价格数字
for pattern in self.price_patterns:
match = re.search(pattern, cleaned)
if match:
try:
price_str = match.group(1) if match.groups() else match.group(0)
return float(price_str.replace(',', ''))
except ValueError:
continue
return None
def extract_rating(self, text: str) -> Optional[float]:
"""
提取评分
"""
if not text:
return None
for pattern in self.rating_patterns:
match = re.search(pattern, text)
if match:
try:
rating = float(match.group(1))
return min(rating, 5.0) # 限制在5分以内
except ValueError:
continue
return None
def extract_reviews_count(self, text: str) -> Optional[int]:
"""
提取评论数
"""
if not text:
return None
# 匹配数字,可能包含千位分隔符
number_match = re.search(r'(\d+(?:[,,]\d{3})*(?:\.\d+)?)', text)
if number_match:
try:
count_str = number_match.group(1).replace(',', '').replace(',', '')
return int(float(count_str))
except ValueError:
pass
return None
def extract_specifications(self, response) -> Dict[str, str]:
"""
提取规格参数
"""
specs = {}
# 从表格提取
tables = response.css('.specs-table, .product-specs, .specifications')
for table in tables:
rows = table.css('tr')
for row in rows:
key_elem = row.css('td:first-child, th::text').get()
value_elem = row.css('td:last-child, td:nth-child(2)::text').get()
if key_elem and value_elem:
key = self._clean_text(key_elem)
value = self._clean_text(value_elem)
specs[key] = value
# 从列表提取
lists = response.css('.specs-list, .spec-list')
for spec_list in lists:
items = spec_list.css('li')
for item in items:
text = self._clean_text(item.css('::text').get())
if ':' in text:
key, value = text.split(':', 1)
specs[self._clean_text(key)] = self._clean_text(value)
# 从JSON-LD提取
json_ld_scripts = response.css('script[type="application/ld+json"]::text').getall()
for script in json_ld_scripts:
try:
data = json.loads(script)
if isinstance(data, dict) and data.get('@type') == 'Product':
# 提取产品规格
if 'additionalProperty' in data:
for prop in data['additionalProperty']:
name = prop.get('name', '')
value = prop.get('value', '')
specs[name] = str(value)
except:
continue
return specs
def extract_images(self, response) -> List[str]:
"""
提取图片链接
"""
images = []
# 主图
main_image_selectors = [
'[itemprop="image"]::attr(src)',
'.main-image img::attr(src)',
'.product-image img::attr(src)',
'.product-img img::attr(src)',
'.large-image::attr(src)'
]
for selector in main_image_selectors:
main_img = response.css(selector).get()
if main_img:
images.append(response.urljoin(main_img))
break
# 画廊图片
gallery_selectors = [
'.gallery img::attr(src)',
'.thumbnails img::attr(src)',
'.product-gallery img::attr(src)',
'.image-thumb::attr(src)'
]
for selector in gallery_selectors:
gallery_imgs = response.css(selector).getall()
for img in gallery_imgs:
if img:
images.append(response.urljoin(img))
# 去重并返回
return list(dict.fromkeys(images)) # 保持顺序的去重
def _clean_text(self, text: str) -> str:
"""
清理文本
"""
if not text:
return ''
return text.strip().replace('\n', '').replace('\t', '').replace('\r', '')
def extract_stock_status(self, response) -> str:
"""
提取库存状态
"""
status_selectors = [
'.stock-status::text',
'.availability::text',
'.in-stock::text',
'.out-of-stock::text',
'.stock-info::text',
'[data-stock]::text'
]
for selector in status_selectors:
status = response.css(selector).get()
if status:
cleaned = self._clean_text(status)
if cleaned:
return cleaned
return 'Unknown'
class AdvancedDataExtractor(DataExtractor):
"""
高级数据提取器
"""
def extract_from_dynamic_content(self, response) -> Dict:
"""
从动态内容提取数据
"""
extracted_data = {}
# 从JavaScript变量中提取
script_content = ' '.join(response.css('script:not([src])::text').getall())
# 提取JSON数据
json_matches = re.findall(r'({.*?})', script_content)
for json_str in json_matches:
try:
data = json.loads(json_str)
if isinstance(data, dict):
extracted_data.update(self._process_dynamic_data(data))
except:
continue
# 提取JavaScript变量
var_patterns = [
r'var\s+(\w+)\s*=\s*({.*?});',
r'const\s+(\w+)\s*=\s*({.*?});',
r'let\s+(\w+)\s*=\s*({.*?});'
]
for pattern in var_patterns:
matches = re.findall(pattern, script_content)
for var_name, var_value in matches:
try:
data = json.loads(var_value)
if isinstance(data, dict):
extracted_data.update(self._process_dynamic_data(data))
except:
continue
return extracted_data
def _process_dynamic_data(self, data: Dict) -> Dict:
"""
处理动态数据
"""
processed = {}
# 提取价格信息
if 'price' in data:
processed['current_price'] = self.extract_price(str(data['price']))
if 'originalPrice' in data:
processed['original_price'] = self.extract_price(str(data['originalPrice']))
# 提取评分信息
if 'rating' in data:
processed['rating'] = self.extract_rating(str(data['rating']))
if 'reviewCount' in data:
processed['review_count'] = self.extract_reviews_count(str(data['reviewCount']))
# 提取规格信息
if 'specifications' in data or 'specs' in data:
specs = data.get('specifications') or data.get('specs', {})
if isinstance(specs, dict):
processed['specifications'] = specs
# 提取库存信息
if 'stock' in data or 'inventory' in data:
processed['stock_status'] = str(data.get('stock') or data.get('inventory', 'Unknown'))
return processed
class ProductDataExtractor:
"""
专门的产品数据提取器
"""
def __init__(self):
self.basic_extractor = DataExtractor()
self.advanced_extractor = AdvancedDataExtractor()
def extract_complete_product_data(self, response) -> Dict:
"""
提取完整产品数据
"""
product_data = {}
# 基本信息
product_data['title'] = self._extract_title(response)
product_data['brand'] = self._extract_brand(response)
# 价格信息
product_data['current_price'] = self._extract_price(response)
product_data['original_price'] = self._extract_original_price(response)
product_data['discount_rate'] = self._calculate_discount_rate(product_data)
# 图片信息
product_data['main_image'] = self._extract_main_image(response)
product_data['gallery_images'] = self._extract_gallery_images(response)
# 评价信息
product_data['rating'] = self._extract_rating(response)
product_data['review_count'] = self._extract_review_count(response)
# 规格参数
product_data['specifications'] = self._extract_specifications(response)
# 库存状态
product_data['stock_status'] = self._extract_stock_status(response)
# 颜色和尺寸选项
product_data['color_options'] = self._extract_color_options(response)
product_data['size_options'] = self._extract_size_options(response)
# 动态数据
dynamic_data = self.advanced_extractor.extract_from_dynamic_content(response)
product_data.update(dynamic_data)
return product_data
def _extract_title(self, response) -> str:
"""
提取标题
"""
title_selectors = [
'h1::text',
'.product-title::text',
'.product-name::text',
'.title::text'
]
for selector in title_selectors:
title = response.css(selector).get()
if title:
return self.basic_extractor._clean_text(title)
return 'Unknown Product'
def _extract_brand(self, response) -> str:
"""
提取品牌
"""
brand_selectors = [
'[itemprop="brand"]::text',
'.brand::text',
'.product-brand::text',
'[data-brand]::text'
]
for selector in brand_selectors:
brand = response.css(selector).get()
if brand:
return self.basic_extractor._clean_text(brand)
return 'Unknown Brand'
def _extract_price(self, response) -> Optional[float]:
"""
提取价格
"""
price_selectors = [
'[itemprop="price"]::text',
'.price::text',
'.product-price::text',
'.current-price::text',
'.sale-price::text'
]
for selector in price_selectors:
price_text = response.css(selector).get()
if price_text:
return self.basic_extractor.extract_price(price_text)
return None
def _extract_original_price(self, response) -> Optional[float]:
"""
提取原价
"""
original_price_selectors = [
'.original-price::text',
'.list-price::text',
'.was-price::text'
]
for selector in original_price_selectors:
original_price_text = response.css(selector).get()
if original_price_text:
return self.basic_extractor.extract_price(original_price_text)
return None
def _calculate_discount_rate(self, product_data: Dict) -> Optional[str]:
"""
计算折扣率
"""
current_price = product_data.get('current_price')
original_price = product_data.get('original_price')
if current_price and original_price and original_price > 0:
discount = (1 - current_price / original_price) * 100
return f"{discount:.1f}%"
return None
def _extract_main_image(self, response) -> Optional[str]:
"""
提取主图
"""
image_selectors = [
'[itemprop="image"]::attr(src)',
'.main-image img::attr(src)',
'.product-image img::attr(src)',
'.product-img img::attr(src)'
]
for selector in image_selectors:
main_image = response.css(selector).get()
if main_image:
return response.urljoin(main_image)
return None
def _extract_gallery_images(self, response) -> List[str]:
"""
提取画廊图片
"""
gallery_images = response.css('.gallery img::attr(src)').getall()
return [response.urljoin(img) for img in gallery_images if img]
def _extract_rating(self, response) -> Optional[float]:
"""
提取评分
"""
rating_selectors = [
'[itemprop="ratingValue"]::text',
'.rating::text',
'.score::text',
'.stars::text'
]
for selector in rating_selectors:
rating = response.css(selector).get()
if rating:
return self.basic_extractor.extract_rating(rating)
return None
def _extract_review_count(self, response) -> Optional[int]:
"""
提取评论数
"""
review_selectors = [
'[itemprop="reviewCount"]::text',
'.review-count::text',
'.comment-count::text',
'.reviews::text'
]
for selector in review_selectors:
review_count = response.css(selector).get()
if review_count:
return self.basic_extractor.extract_reviews_count(review_count)
return None
def _extract_specifications(self, response) -> Dict[str, str]:
"""
提取规格参数
"""
return self.basic_extractor.extract_specifications(response)
def _extract_stock_status(self, response) -> str:
"""
提取库存状态
"""
return self.basic_extractor.extract_stock_status(response)
def _extract_color_options(self, response) -> List[str]:
"""
提取颜色选项
"""
colors = response.css('.color-option::text, .color-item::text, [data-color]::text').getall()
return [self.basic_extractor._clean_text(color) for color in colors if color.strip()]
def _extract_size_options(self, response) -> List[str]:
"""
提取尺寸选项
"""
sizes = response.css('.size-option::text, .size-item::text, [data-size]::text').getall()
return [self.basic_extractor._clean_text(size) for size in sizes if size.strip()]#反爬虫对抗策略
#反爬虫中间件
# ecommerce_spider/middlewares/anti_crawler_middleware.py
import random
import time
import requests
from scrapy.downloadermiddlewares.retry import RetryMiddleware
from scrapy.utils.response import response_status_message
from fake_useragent import UserAgent
class AntiCrawlerMiddleware:
"""
反爬虫中间件
"""
def __init__(self):
self.ua = UserAgent()
self.banned_ips = set()
self.request_counts = {}
self.last_request_time = {}
# 请求头模板
self.header_templates = [
{
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0'
},
{
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Cache-Control': 'no-cache'
}
]
def process_request(self, request, spider):
"""
处理请求,应用反爬虫策略
"""
# 随机User-Agent
request.headers['User-Agent'] = self.ua.random
# 随机请求头
template = random.choice(self.header_templates)
for key, value in template.items():
if key not in request.headers:
request.headers[key] = value
# 随机延时
delay = random.uniform(1, 3)
time.sleep(delay)
# 记录请求信息
domain = request.url.split('/')[2]
if domain not in self.request_counts:
self.request_counts[domain] = 0
self.request_counts[domain] += 1
return None
def process_response(self, request, response, spider):
"""
处理响应,检测反爬虫信号
"""
# 检测是否被封禁
if self._is_banned_response(response):
domain = request.url.split('/')[2]
spider.logger.warning(f"Domain {domain} might be banned")
# 标记IP为被封禁
if hasattr(response, 'ip_address'):
self.banned_ips.add(response.ip_address)
# 检测验证码页面
if self._has_captcha(response):
spider.logger.warning("Captcha detected, consider implementing captcha solving")
return response
def _is_banned_response(self, response):
"""
检测是否为封禁响应
"""
banned_statuses = [403, 429, 503]
banned_texts = [
'blocked', 'forbidden', 'access denied', 'rate limit',
'too many requests', 'captcha', 'verification', 'unauthorized'
]
if response.status in banned_statuses:
return True
response_text = response.text.lower()
for text in banned_texts:
if text in response_text:
return True
return False
def _has_captcha(self, response):
"""
检测是否包含验证码
"""
captcha_indicators = [
'captcha', 'verification', 'validate', 'auth', 'security',
'are you a human', 'robot check', 'prove you are human',
'recaptcha', 'hcaptcha'
]
response_text = response.text.lower()
for indicator in captcha_indicators:
if indicator in response_text:
return True
return False
class ProxyMiddleware:
"""
代理中间件
"""
def __init__(self):
self.proxies = []
self.current_proxy_index = 0
self.proxy_usage_count = {}
self.failed_proxies = set()
def load_proxies(self, proxy_list):
"""
加载代理列表
"""
self.proxies = proxy_list
def get_next_proxy(self):
"""
获取下一个代理
"""
if not self.proxies:
return None
# 跳过失败的代理
for _ in range(len(self.proxies)):
proxy = self.proxies[self.current_proxy_index % len(self.proxies)]
self.current_proxy_index += 1
if proxy not in self.failed_proxies:
return proxy
return None
def process_request(self, request, spider):
"""
为请求分配代理
"""
proxy = self.get_next_proxy()
if proxy:
request.meta['proxy'] = proxy
spider.logger.debug(f"Using proxy: {proxy}")
return None
def process_response(self, request, response, spider):
"""
处理响应,检测代理是否失效
"""
if response.status in [403, 429, 503]:
proxy = request.meta.get('proxy')
if proxy:
spider.logger.warning(f"Proxy {proxy} might be blocked")
self.failed_proxies.add(proxy)
return response
def process_exception(self, request, exception, spider):
"""
处理异常,标记代理为失效
"""
proxy = request.meta.get('proxy')
if proxy:
spider.logger.error(f"Proxy {proxy} failed: {exception}")
self.failed_proxies.add(proxy)
class RateLimitMiddleware:
"""
速率限制中间件
"""
def __init__(self):
self.request_times = {}
self.min_interval = 1 # 最小请求间隔(秒)
self.domain_limits = {} # 域名特定限制
def process_request(self, request, spider):
"""
处理请求,实施速率限制
"""
domain = request.url.split('/')[2]
# 获取域名特定的限制
limit = self.domain_limits.get(domain, self.min_interval)
current_time = time.time()
# 检查上次请求时间
last_request = self.request_times.get(domain, 0)
time_since_last = current_time - last_request
if time_since_last < limit:
sleep_time = limit - time_since_last
spider.logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s for {domain}")
time.sleep(sleep_time)
# 更新最后请求时间
self.request_times[domain] = time.time()
return None
def set_domain_limit(self, domain, limit_seconds):
"""
设置域名特定的请求限制
"""
self.domain_limits[domain] = limit_seconds
class SessionMiddleware:
"""
会话管理中间件
"""
def __init__(self):
self.sessions = {}
self.session_cookies = {}
self.session_rotation_interval = 3600 # 1小时轮换
def process_request(self, request, spider):
"""
处理请求,管理会话
"""
domain = request.url.split('/')[2]
# 检查是否需要轮换会话
current_time = time.time()
last_rotation = self.sessions.get(domain, 0)
if current_time - last_rotation > self.session_rotation_interval:
# 轮换会话(清除旧Cookie)
if domain in self.session_cookies:
del self.session_cookies[domain]
self.sessions[domain] = current_time
# 添加现有的会话Cookie
if domain in self.session_cookies:
for name, value in self.session_cookies[domain].items():
request.cookies.set(name, value)
return None
def process_response(self, request, response, spider):
"""
处理响应,更新会话Cookie
"""
domain = request.url.split('/')[2]
# 提取新的Cookie
new_cookies = {}
for cookie_header in response.headers.getlist('Set-Cookie'):
cookie_str = cookie_header.decode('utf-8')
if '=' in cookie_str:
name, value = cookie_str.split('=', 1)
new_cookies[name.strip()] = value.split(';')[0].strip()
# 更新会话Cookie
if domain not in self.session_cookies:
self.session_cookies[domain] = {}
self.session_cookies[domain].update(new_cookies)
return response#数据去重与清洗
#数据清洗工具
# ecommerce_spider/utils/data_cleaner.py
import re
import pandas as pd
from typing import Dict, List, Any, Optional
from datetime import datetime
class DataCleaner:
"""
数据清洗工具
"""
def __init__(self):
self.price_patterns = [
r'[\d,]+\.?\d*', # 匹配数字和小数点
r'¥([\d,]+\.?\d*)', # 匹配人民币
r'\$([\d,]+\.?\d*)', # 匹配美元
r'€([\d,]+\.?\d*)', # 匹配欧元
]
self.cleaning_rules = {
'title': self._clean_title,
'description': self._clean_description,
'price': self._clean_price,
'rating': self._clean_rating,
'review_count': self._clean_review_count,
'specifications': self._clean_specifications
}
def clean_product_data(self, product_data: Dict[str, Any]) -> Dict[str, Any]:
"""
清洗产品数据
"""
cleaned_data = {}
for field, value in product_data.items():
if field in self.cleaning_rules:
cleaned_data[field] = self.cleaning_rules[field](value)
else:
cleaned_data[field] = self._clean_generic_field(value)
# 添加清洗时间戳
cleaned_data['cleaned_at'] = datetime.now()
return cleaned_data
def _clean_title(self, title: str) -> str:
"""
清洗标题
"""
if not title:
return 'Unknown Product'
# 移除多余空格和换行
cleaned = re.sub(r'\s+', ' ', title.strip())
# 移除特殊字符(保留中文、英文、数字、基本标点)
cleaned = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s\-_.,!?;:()()【】\[\]]', '', cleaned)
return cleaned
def _clean_description(self, description: str) -> str:
"""
清洗描述
"""
if not description:
return ''
# 移除HTML标签
cleaned = re.sub(r'<[^>]+>', '', description)
# 移除多余空格和换行
cleaned = re.sub(r'\n+', '\n', cleaned)
cleaned = re.sub(r'\s+', ' ', cleaned)
# 移除特殊字符
cleaned = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s\-_.,!?;:()()【】\[\]/]', '', cleaned)
return cleaned.strip()
def _clean_price(self, price: Any) -> Optional[float]:
"""
清洗价格
"""
if price is None:
return None
if isinstance(price, (int, float)):
return float(price)
if isinstance(price, str):
# 移除货币符号和空格
cleaned = re.sub(r'[^\d.,]', '', price.strip())
try:
return float(cleaned.replace(',', ''))
except ValueError:
return None
return None
def _clean_rating(self, rating: Any) -> Optional[float]:
"""
清洗评分
"""
if rating is None:
return None
if isinstance(rating, (int, float)):
return min(float(rating), 5.0) # 限制在5分以内
if isinstance(rating, str):
try:
num_rating = float(re.search(r'(\d+(?:\.\d+)?)', rating).group(1))
return min(num_rating, 5.0)
except (ValueError, AttributeError):
return None
return None
def _clean_review_count(self, review_count: Any) -> Optional[int]:
"""
清洗评论数
"""
if review_count is None:
return None
if isinstance(review_count, int):
return review_count
if isinstance(review_count, str):
# 提取数字
numbers = re.findall(r'\d+', review_count.replace(',', ''))
if numbers:
return int(numbers[0])
return None
def _clean_specifications(self, specs: Any) -> Dict[str, str]:
"""
清洗规格参数
"""
if not specs:
return {}
if isinstance(specs, dict):
cleaned_specs = {}
for key, value in specs.items():
cleaned_key = self._clean_generic_field(key) if key else ''
cleaned_value = self._clean_generic_field(value) if value else ''
if cleaned_key and cleaned_value:
cleaned_specs[cleaned_key] = cleaned_value
return cleaned_specs
return {}
def _clean_generic_field(self, field: Any) -> Any:
"""
清洗通用字段
"""
if isinstance(field, str):
# 移除多余空格
return re.sub(r'\s+', ' ', field.strip())
return field
def remove_duplicates(self, products: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
去除重复产品
"""
seen_products = set()
unique_products = []
for product in products:
# 创建产品的唯一标识
product_id = product.get('product_id', 'unknown')
title = product.get('title', 'Unknown Product')
url = product.get('url', '')
# 生成唯一标识符
identifier = f"{product_id}_{hash(title)}_{hash(url)}"
if identifier not in seen_products:
seen_products.add(identifier)
unique_products.append(product)
return unique_products
def validate_data_quality(self, products: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
验证数据质量
"""
quality_report = {
'total_products': len(products),
'complete_products': 0,
'missing_fields': {},
'invalid_values': {},
'quality_score': 0.0
}
required_fields = ['title', 'current_price', 'url']
for product in products:
# 检查完整度
complete = True
for field in required_fields:
if not product.get(field):
complete = False
if field not in quality_report['missing_fields']:
quality_report['missing_fields'][field] = 0
quality_report['missing_fields'][field] += 1
if complete:
quality_report['complete_products'] += 1
# 检查无效值
for field, value in product.items():
if self._is_invalid_value(field, value):
if field not in quality_report['invalid_values']:
quality_report['invalid_values'][field] = 0
quality_report['invalid_values'][field] += 1
# 计算质量得分
if quality_report['total_products'] > 0:
quality_report['quality_score'] = (
quality_report['complete_products'] / quality_report['total_products']
) * 100
return quality_report
def _is_invalid_value(self, field: str, value: Any) -> bool:
"""
检查值是否无效
"""
if value is None:
return True
if isinstance(value, str):
# 检查是否为空或只包含空白字符
if not value.strip():
return True
# 检查是否为默认值
if value.lower() in ['unknown', 'unknown product', '暂无', 'null', 'none']:
return True
if field == 'current_price':
if isinstance(value, (int, float)) and value <= 0:
return True
return False
class DeduplicationPipeline:
"""
去重管道
"""
def __init__(self):
self.data_cleaner = DataCleaner()
self.seen_products = set()
self.duplicates_count = 0
def process_item(self, item, spider):
"""
处理项目,执行去重
"""
# 清洗数据
cleaned_item = self.data_cleaner.clean_product_data(dict(item))
# 生成唯一标识符
product_id = cleaned_item.get('product_id', 'unknown')
title_hash = hash(cleaned_item.get('title', ''))
url_hash = hash(cleaned_item.get('url', ''))
identifier = f"{product_id}_{title_hash}_{url_hash}"
if identifier in self.seen_products:
self.duplicates_count += 1
spider.logger.info(f"Duplicate product found: {cleaned_item.get('title', 'Unknown')}")
# 返回None表示丢弃该项目
return None
self.seen_products.add(identifier)
# 更新项目为清洗后的数据
for key, value in cleaned_item.items():
item[key] = value
return item
def close_spider(self, spider):
"""
爬虫关闭时的清理工作
"""
spider.logger.info(f"Removed {self.duplicates_count} duplicate products")
spider.logger.info(f"Unique products processed: {len(self.seen_products)}")
class DataValidationPipeline:
"""
数据验证管道
"""
def __init__(self):
self.data_cleaner = DataCleaner()
self.validation_errors = []
self.valid_items = 0
self.invalid_items = 0
def process_item(self, item, spider):
"""
处理项目,执行验证
"""
# 验证必需字段
required_fields = ['title', 'current_price', 'url']
for field in required_fields:
if not item.get(field):
error_msg = f"Missing required field '{field}' in item: {item.get('title', 'Unknown')}"
self.validation_errors.append(error_msg)
spider.logger.warning(error_msg)
self.invalid_items += 1
return None # 丢弃无效项目
# 验证数据格式
price = item.get('current_price')
if price is not None and (isinstance(price, (int, float)) and price <= 0):
error_msg = f"Invalid price value '{price}' for item: {item.get('title', 'Unknown')}"
self.validation_errors.append(error_msg)
spider.logger.warning(error_msg)
self.invalid_items += 1
return None
self.valid_items += 1
return item
def close_spider(self, spider):
"""
爬虫关闭时的清理工作
"""
spider.logger.info(f"Validation completed:")
spider.logger.info(f"Valid items: {self.valid_items}")
spider.logger.info(f"Invalid items: {self.invalid_items}")
spider.logger.info(f"Total errors: {len(self.validation_errors)}")
if self.validation_errors:
spider.logger.info("Sample validation errors:")
for error in self.validation_errors[:5]: # 只显示前5个错误
spider.logger.info(f" - {error}")
class StoragePipeline:
"""
存储管道
"""
def __init__(self):
self.items_stored = 0
self.storage_method = 'json' # 默认存储方式
def open_spider(self, spider):
"""
爬虫开启时的初始化
"""
self.items_stored = 0
# 根据配置选择存储方式
storage_config = getattr(spider, 'storage_config', {})
self.storage_method = storage_config.get('method', 'json')
if self.storage_method == 'mongodb':
self._setup_mongodb(storage_config)
elif self.storage_method == 'mysql':
self._setup_mysql(storage_config)
elif self.storage_method == 'csv':
self._setup_csv(storage_config)
def process_item(self, item, spider):
"""
处理项目,执行存储
"""
if self.storage_method == 'mongodb':
self._store_mongodb(item)
elif self.storage_method == 'mysql':
self._store_mysql(item)
elif self.storage_method == 'csv':
self._store_csv(item)
else:
self._store_json(item)
self.items_stored += 1
return item
def _store_json(self, item):
"""
JSON存储
"""
import json
import os
from datetime import datetime
# 创建存储目录
output_dir = 'output'
os.makedirs(output_dir, exist_ok=True)
# 生成文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'{output_dir}/ecommerce_data_{timestamp}.json'
# 写入数据
with open(filename, 'a', encoding='utf-8') as f:
f.write(json.dumps(dict(item), ensure_ascii=False, default=str) + '\n')
def _store_csv(self, item):
"""
CSV存储
"""
import csv
import os
from datetime import datetime
# 创建存储目录
output_dir = 'output'
os.makedirs(output_dir, exist_ok=True)
# 生成文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'{output_dir}/ecommerce_data_{timestamp}.csv'
# 写入数据
write_header = not os.path.exists(filename)
with open(filename, 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=item.keys())
if write_header:
writer.writeheader()
writer.writerow(dict(item))
def _setup_mongodb(self, config):
"""
设置MongoDB存储
"""
try:
from pymongo import MongoClient
connection_string = config.get('connection_string', 'mongodb://localhost:27017/')
database_name = config.get('database', 'ecommerce_db')
collection_name = config.get('collection', 'products')
self.mongo_client = MongoClient(connection_string)
self.database = self.mongo_client[database_name]
self.collection = self.database[collection_name]
except ImportError:
raise Exception("pymongo not installed. Install with: pip install pymongo")
def _store_mongodb(self, item):
"""
MongoDB存储
"""
if hasattr(self, 'collection'):
# 添加时间戳
item['stored_at'] = datetime.now()
self.collection.insert_one(dict(item))
def _setup_mysql(self, config):
"""
设置MySQL存储
"""
try:
import pymysql
host = config.get('host', 'localhost')
user = config.get('user', 'root')
password = config.get('password', '')
database = config.get('database', 'ecommerce_db')
self.mysql_connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database,
charset='utf8mb4'
)
# 创建表
self._create_mysql_table()
except ImportError:
raise Exception("pymysql not installed. Install with: pip install pymysql")
def _create_mysql_table(self):
"""
创建MySQL表
"""
cursor = self.mysql_connection.cursor()
create_table_sql = """
CREATE TABLE IF NOT EXISTS products (
id INT AUTO_INCREMENT PRIMARY KEY,
product_id VARCHAR(100),
title TEXT,
brand VARCHAR(200),
category_path TEXT,
current_price DECIMAL(10, 2),
original_price DECIMAL(10, 2),
discount_rate VARCHAR(20),
specifications JSON,
color_options JSON,
size_options JSON,
main_image TEXT,
gallery_images JSON,
rating DECIMAL(3, 2),
review_count INT,
stock_status VARCHAR(50),
sales_volume VARCHAR(50),
url TEXT,
crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
source_website VARCHAR(200)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
cursor.execute(create_table_sql)
self.mysql_connection.commit()
def _store_mysql(self, item):
"""
MySQL存储
"""
if hasattr(self, 'mysql_connection'):
cursor = self.mysql_connection.cursor()
# 准备插入数据
insert_sql = """
INSERT INTO products (
product_id, title, brand, category_path, current_price,
original_price, discount_rate, specifications, color_options,
size_options, main_image, gallery_images, rating,
review_count, stock_status, sales_volume, url, source_website
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# 处理JSON字段
specifications = item.get('specifications')
if isinstance(specifications, dict):
specifications = json.dumps(specifications, ensure_ascii=False)
color_options = item.get('color_options')
if isinstance(color_options, list):
color_options = json.dumps(color_options, ensure_ascii=False)
size_options = item.get('size_options')
if isinstance(size_options, list):
size_options = json.dumps(size_options, ensure_ascii=False)
gallery_images = item.get('gallery_images')
if isinstance(gallery_images, list):
gallery_images = json.dumps(gallery_images, ensure_ascii=False)
values = (
item.get('product_id'), item.get('title'), item.get('brand'),
item.get('category_path'), item.get('current_price'),
item.get('original_price'), item.get('discount_rate'),
specifications, color_options, size_options,
item.get('main_image'), gallery_images,
item.get('rating'), item.get('review_count'),
item.get('stock_status'), item.get('sales_volume'),
item.get('url'), item.get('source_website')
)
cursor.execute(insert_sql, values)
self.mysql_connection.commit()
class MonitoringPipeline:
"""
监控管道
"""
def __init__(self):
self.items_processed = 0
self.start_time = None
self.errors = []
def open_spider(self, spider):
"""
爬虫开启时的初始化
"""
self.start_time = datetime.now()
self.items_processed = 0
spider.logger.info("E-commerce spider started at: %s", self.start_time)
def process_item(self, item, spider):
"""
处理项目,执行监控
"""
self.items_processed += 1
# 每处理100个项目记录一次进度
if self.items_processed % 100 == 0:
elapsed_time = datetime.now() - self.start_time
spider.logger.info(f"Processed {self.items_processed} items in {elapsed_time}")
return item
def close_spider(self, spider):
"""
爬虫关闭时的监控报告
"""
end_time = datetime.now()
total_time = end_time - self.start_time
spider.logger.info("=" * 50)
spider.logger.info("E-COMMERCE SPIDER MONITORING REPORT")
spider.logger.info("=" * 50)
spider.logger.info(f"Start Time: {self.start_time}")
spider.logger.info(f"End Time: {end_time}")
spider.logger.info(f"Total Runtime: {total_time}")
spider.logger.info(f"Items Processed: {self.items_processed}")
spider.logger.info(f"Average Speed: {self.items_processed / total_time.total_seconds():.2f} items/sec")
spider.logger.info("=" * 50)
## 性能优化技巧 \{#性能优化技巧}
### 并发与延迟优化
```python
# 优化Scrapy设置以提高性能
CUSTOM_SETTINGS = {
# 并发设置
'CONCURRENT_REQUESTS': 32,
'CONCURRENT_REQUESTS_PER_DOMAIN': 8,
'CONCURRENT_REQUESTS_PER_IP': 0,
# 下载延迟
'DOWNLOAD_DELAY': 1,
'RANDOMIZE_DOWNLOAD_DELAY': 0.5,
# 自动限速
'AUTOTHROTTLE_ENABLED': True,
'AUTOTHROTTLE_START_DELAY': 1,
'AUTOTHROTTLE_MAX_DELAY': 10,
'AUTOTHROTTLE_TARGET_CONCURRENCY': 4.0,
'AUTOTHROTTLE_DEBUG': False,
# DNS缓存
'DNSCACHE_ENABLED': True,
# 连接池
'REACTOR_THREADPOOL_MAXSIZE': 20,
# 重试设置
'RETRY_TIMES': 3,
'RETRY_HTTP_CODES': [500, 502, 503, 504, 408, 429],
# 缓存
'HTTPCACHE_ENABLED': False, # 电商爬虫通常不需要缓存
}#内存优化
# 内存优化配置
MEMORY_OPTIMIZATION = {
# 下载器设置
'DOWNLOAD_MAXSIZE': 1073741824, # 1GB
'DOWNLOAD_WARNSIZE': 33554432, # 32MB
# Item处理器
'ITEM_PIPELINES': {
'ecommerce_spider.pipelines.MemoryOptimizedPipeline': 300,
},
# 日志级别
'LOG_LEVEL': 'INFO',
}
class MemoryOptimizedPipeline:
"""
内存优化管道
"""
def __init__(self):
self.buffer = []
self.buffer_size = 1000 # 缓冲区大小
def process_item(self, item, spider):
"""
处理项目,批量存储以减少内存占用
"""
self.buffer.append(dict(item))
if len(self.buffer) >= self.buffer_size:
self._flush_buffer()
return item
def _flush_buffer(self):
"""
刷新缓冲区
"""
if self.buffer:
# 批量存储到数据库或其他存储系统
self._batch_store(self.buffer)
self.buffer.clear()
def close_spider(self, spider):
"""
关闭爬虫时处理剩余数据
"""
self._flush_buffer()
def _batch_store(self, items):
"""
批量存储
"""
# 实现批量存储逻辑
pass#数据库优化
# 数据库连接优化
DATABASE_OPTIMIZATION = {
'mysql': {
'pool_size': 20,
'pool_recycle': 3600,
'pool_timeout': 20,
'max_overflow': 0,
'pool_pre_ping': True,
},
'mongodb': {
'max_pool_size': 50,
'min_pool_size': 10,
'max_idle_time_ms': 30000,
'server_selection_timeout_ms': 5000,
}
}#项目部署与监控
#Docker部署
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["scrapy", "crawl", "ecommerce"]# docker-compose.yml
version: '3.8'
services:
ecommerce-spider:
build: .
volumes:
- ./output:/app/output
environment:
- SCRAPY_SETTINGS_MODULE=ecommerce_spider.settings
networks:
- spider-network
mongodb:
image: mongo:4.4
ports:
- "27017:27017"
volumes:
- mongodb_data:/data/db
networks:
- spider-network
redis:
image: redis:6.2
ports:
- "6379:6379"
networks:
- spider-network
networks:
spider-network:
driver: bridge
volumes:
mongodb_data:#部署脚本
#!/bin/bash
# deploy.sh - 电商爬虫部署脚本
set -e
echo "开始部署电商爬虫..."
# 检查依赖
echo "检查依赖..."
pip install -r requirements.txt
# 创建输出目录
mkdir -p output logs
# 设置环境变量
export SCRAPY_SETTINGS_MODULE=ecommerce_spider.settings
# 运行爬虫
echo "启动电商爬虫..."
scrapy crawl ecommerce \
-s LOG_FILE=logs/ecommerce_$(date +%Y%m%d_%H%M%S).log \
-s JOBDIR=crawls/ecommerce_job
echo "部署完成!"#监控脚本
# monitoring.py - 爬虫监控脚本
import psutil
import time
import logging
from datetime import datetime
class SpiderMonitor:
"""
爬虫监控器
"""
def __init__(self):
self.logger = self._setup_logger()
self.process = psutil.Process()
def _setup_logger(self):
"""
设置日志记录器
"""
logger = logging.getLogger('SpiderMonitor')
logger.setLevel(logging.INFO)
handler = logging.FileHandler(f'monitoring_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def collect_metrics(self):
"""
收集监控指标
"""
metrics = {
'timestamp': datetime.now().isoformat(),
'cpu_percent': self.process.cpu_percent(),
'memory_info': self.process.memory_info()._asdict(),
'disk_io_counters': self.process.io_counters()._asdict() if hasattr(self.process.io_counters(), '_asdict') else {},
'num_threads': self.process.num_threads(),
'num_fds': self.process.num_fds() if hasattr(self.process, 'num_fds') else 0,
}
return metrics
def log_metrics(self, metrics):
"""
记录监控指标
"""
self.logger.info(f"CPU: {metrics['cpu_percent']}%")
self.logger.info(f"Memory: {metrics['memory_info']['rss'] / 1024 / 1024:.2f} MB")
self.logger.info(f"Threads: {metrics['num_threads']}")
def start_monitoring(self, interval=60):
"""
开始监控
"""
self.logger.info("开始监控爬虫...")
try:
while True:
metrics = self.collect_metrics()
self.log_metrics(metrics)
time.sleep(interval)
except KeyboardInterrupt:
self.logger.info("监控已停止")
if __name__ == "__main__":
monitor = SpiderMonitor()
monitor.start_monitoring()#SEO优化建议
#网站SEO分析
# seo_analyzer.py - 电商网站SEO分析工具
import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import urljoin, urlparse
class SEOAnalyzer:
"""
SEO分析器
"""
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (compatible; SEOAnalyzer/1.0)'
})
def analyze_page(self, url):
"""
分析单个页面的SEO
"""
try:
response = self.session.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
analysis = {
'url': url,
'status_code': response.status_code,
'title': self._get_title(soup),
'meta_description': self._get_meta_description(soup),
'meta_keywords': self._get_meta_keywords(soup),
'h1_count': len(soup.find_all('h1')),
'h2_h6_count': len(soup.find_all(['h2', 'h3', 'h4', 'h5', 'h6'])),
'image_alt_count': self._count_images_with_alt(soup),
'internal_links': self._count_internal_links(soup, url),
'external_links': self._count_external_links(soup, url),
'page_size': len(response.content),
'load_time': response.elapsed.total_seconds(),
}
return analysis
except Exception as e:
return {'url': url, 'error': str(e)}
def _get_title(self, soup):
"""
获取页面标题
"""
title_tag = soup.find('title')
return title_tag.get_text().strip() if title_tag else None
def _get_meta_description(self, soup):
"""
获取meta描述
"""
desc_tag = soup.find('meta', attrs={'name': 'description'})
return desc_tag.get('content', '').strip() if desc_tag else None
def _get_meta_keywords(self, soup):
"""
获取meta关键词
"""
keywords_tag = soup.find('meta', attrs={'name': 'keywords'})
return keywords_tag.get('content', '').strip() if keywords_tag else None
def _count_images_with_alt(self, soup):
"""
计算有alt属性的图片数量
"""
images = soup.find_all('img')
return sum(1 for img in images if img.get('alt'))
def _count_internal_links(self, soup, base_url):
"""
计算内部链接数量
"""
base_domain = urlparse(base_url).netloc
links = soup.find_all('a', href=True)
internal_count = 0
for link in links:
href = link['href']
link_domain = urlparse(urljoin(base_url, href)).netloc
if link_domain == base_domain:
internal_count += 1
return internal_count
def _count_external_links(self, soup, base_url):
"""
计算外部链接数量
"""
base_domain = urlparse(base_url).netloc
links = soup.find_all('a', href=True)
external_count = 0
for link in links:
href = link['href']
link_domain = urlparse(urljoin(base_url, href)).netloc
if link_domain != base_domain:
external_count += 1
return external_count
def analyze_sitemap(self, sitemap_url):
"""
分析站点地图
"""
try:
response = self.session.get(sitemap_url)
soup = BeautifulSoup(response.content, 'xml')
urls = []
for url_elem in soup.find_all('url'):
loc = url_elem.find('loc')
if loc:
urls.append(loc.text)
return urls
except Exception as e:
return {'error': str(e)}
# 使用示例
analyzer = SEOAnalyzer('https://example-ecommerce.com')
analysis_result = analyzer.analyze_page('https://example-ecommerce.com/products/123')
print(analysis_result)#爬虫SEO最佳实践
# 爬虫SEO最佳实践配置
CRAWLER_SEO_BEST_PRACTICES = {
# 遵守robots.txt
'ROBOTSTXT_OBEY': True,
# 合理的爬取频率
'DOWNLOAD_DELAY': 2,
'RANDOMIZE_DOWNLOAD_DELAY': 0.5,
# 合适的User-Agent
'USER_AGENT': 'EcommerceBot/1.0 (+http://yourdomain.com/bot-info)',
# 设置合理的并发数
'CONCURRENT_REQUESTS': 8,
'CONCURRENT_REQUESTS_PER_DOMAIN': 2,
# 设置爬虫友好头部
'DEFAULT_REQUEST_HEADERS': {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Referer': 'https://www.google.com/',
},
# 监控和日志
'LOG_LEVEL': 'INFO',
'LOGSTATS_INTERVAL': 60,
# 错误处理
'RETRY_TIMES': 3,
'RETRY_HTTP_CODES': [500, 502, 503, 504, 408, 429],
}通过本项目的实施,我们构建了一个功能完整的电商全站爬取系统,涵盖了从基础架构到高级功能的各个方面。这个系统不仅能够高效地抓取电商网站数据,还具备良好的可扩展性、稳定性和维护性,为电商数据分析提供了强有力的技术支撑。

