#Spider Middleware完全指南 - 数据预处理与后处理技术详解
📂 所属阶段:第五阶段 — 战力升级(分布式与进阶篇)
🔗 相关章节:Downloader Middleware · Pipeline管道实战 · 分布式去重与调度
#目录
- Spider Middleware基础概念
- Spider Middleware生命周期
- 输入处理方法
- 输出处理方法
- 异常处理方法
- 信号系统集成
- 数据预处理技术
- 数据后处理技术
- 高级中间件技巧
- 性能优化策略
- 分布式环境下的中间件
- 常见问题与解决方案
- 最佳实践总结
#Spider Middleware基础概念
Spider Middleware是Scrapy框架中位于Engine和Spider之间的组件,用于拦截Spider的输入响应和输出Items/Requests。它在数据进入Spider之前和离开Spider之后提供处理机会。
#Spider Middleware的作用与优势
"""
Spider Middleware的主要作用:
1. 输入预处理:在响应到达Spider之前处理响应
2. 输出后处理:在Items/Requests离开Spider后处理
3. 异常处理:处理Spider处理过程中的异常
4. 信号拦截:监听和响应爬虫事件
5. 数据过滤:过滤不需要的Items或Requests
"""#Spider Middleware与其他组件的关系
"""
Scrapy请求处理流程:
Engine -> Scheduler -> Engine -> Downloader -> Engine ->
Spider Middleware -> Spider -> Spider Middleware -> Engine -> Pipeline
"""#Spider Middleware生命周期
Spider Middleware具有完整的生命周期方法,可以在不同阶段对响应和结果进行处理。
#基础Spider Middleware结构
class BaseSpiderMiddleware:
"""
基础Spider Middleware示例
"""
def __init__(self):
"""
初始化方法,在爬虫启动时调用一次
"""
pass
@classmethod
def from_crawler(cls, crawler):
"""
从crawler实例创建middleware的方法
用于访问settings等配置信息
"""
return cls()
def process_spider_input(self, response, spider):
"""
处理Spider输入的方法
在响应被Spider处理之前调用
"""
# 返回None表示继续处理
# 抛出异常表示停止处理
return None
def process_spider_output(self, response, result, spider):
"""
处理Spider输出的方法
在Spider产生Items/Requests后调用
"""
# 必须返回可迭代对象
for item_or_request in result:
yield item_or_request
def process_spider_exception(self, response, exception, spider):
"""
处理异常的方法
在Spider处理过程中发生异常时调用
"""
# 返回None表示异常未处理,交给下一个中间件
# 返回可迭代对象表示异常已处理
return None#Spider Middleware配置
# settings.py
SPIDER_MIDDLEWARES = {
# 格式:'路径.到.中间件类': 优先级数字
'myproject.middlewares.CustomSpiderMiddleware': 543,
'myproject.middlewares.SignalMiddleware': 544,
}
# 优先级说明:
# 数字越小,优先级越高,越先执行
# Scrapy内置中间件的优先级范围:0-1000
# 自定义中间件建议使用500-1000之间的数字#输入处理方法
process_spider_input方法在响应被Spider处理之前调用,用于预处理响应数据。
#基础输入处理
class BasicInputMiddleware:
"""
基础输入处理中间件
"""
def process_spider_input(self, response, spider):
"""
基础输入处理逻辑
"""
spider.logger.info(f"Processing response: {response.url}")
# 检查响应状态
if response.status != 200:
spider.logger.warning(f"Response status {response.status} for {response.url}")
# 检查响应内容类型
content_type = response.headers.get('Content-Type', b'').decode('utf-8').lower()
if 'text/html' not in content_type:
spider.logger.debug(f"Non-HTML content: {content_type}")
# 添加自定义元数据到响应
response.meta['processed_by'] = 'BasicInputMiddleware'
response.meta['process_time'] = time.time()
return None#响应预处理
import json
import re
class ResponsePreprocessingMiddleware:
"""
响应预处理中间件
"""
def process_spider_input(self, response, spider):
"""
响应预处理逻辑
"""
# 检查是否为JSON响应
content_type = response.headers.get('Content-Type', b'').decode('utf-8').lower()
if 'application/json' in content_type:
try:
# 预解析JSON内容
json_data = json.loads(response.text)
response.meta['parsed_json'] = json_data
except json.JSONDecodeError:
spider.logger.error(f"Failed to parse JSON response: {response.url}")
# 检查是否为XML响应
elif 'application/xml' in content_type or 'text/xml' in content_type:
try:
# 预处理XML内容
response.meta['xml_content'] = response.text
except Exception as e:
spider.logger.error(f"Failed to process XML response: {e}")
# 检查页面编码
if response.encoding != 'utf-8':
spider.logger.debug(f"Response encoding: {response.encoding}")
# 检查页面大小
content_length = len(response.body)
if content_length > 10 * 1024 * 1024: # 10MB
spider.logger.warning(f"Large response size: {content_length} bytes")
return None#输入验证中间件
from scrapy.exceptions import IgnoreRequest
class InputValidationMiddleware:
"""
输入验证中间件
"""
def process_spider_input(self, response, spider):
"""
验证输入响应的有效性
"""
# 检查响应大小(防止过大响应)
if len(response.body) > spider.crawler.settings.getint('MAX_RESPONSE_SIZE', 10 * 1024 * 1024):
raise IgnoreRequest("Response too large")
# 检查响应内容
response_text = response.text.lower()
# 检查反爬页面
anti_crawl_indicators = [
'访问过于频繁', '请稍后重试', 'blocked', 'forbidden',
'验证码', 'captcha', 'rate limit', 'too many requests'
]
if any(indicator in response_text for indicator in anti_crawl_indicators):
spider.logger.warning(f"Anti-crawling detected for {response.url}")
# 可以选择抛出异常或记录日志
response.meta['anti_crawl_detected'] = True
# 检查页面完整性
if '<html' not in response_text or '</html>' not in response_text:
spider.logger.warning(f"Potentially incomplete HTML for {response.url}")
return None#输出处理方法
process_spider_output方法在Spider产生Items/Requests后调用,用于后处理输出数据。
#基础输出处理
from itemadapter import ItemAdapter
class BasicOutputMiddleware:
"""
基础输出处理中间件
"""
def process_spider_output(self, response, result, spider):
"""
基础输出处理逻辑
"""
for item_or_request in result:
if isinstance(item_or_request, dict):
# 处理字典类型的Item
item_or_request['processed_at'] = time.time()
item_or_request['source_url'] = response.url
yield item_or_request
elif hasattr(item_or_request, 'fields'): # Scrapy Item
# 处理Scrapy Item
item_adapter = ItemAdapter(item_or_request)
item_adapter['processed_at'] = time.time()
item_adapter['source_url'] = response.url
yield item_or_request
else:
# 处理Request
item_or_request.meta['processed_at'] = time.time()
yield item_or_request#数据过滤中间件
class DataFilteringMiddleware:
"""
数据过滤中间件
"""
def __init__(self):
self.filtered_count = 0
def process_spider_output(self, response, result, spider):
"""
过滤不需要的数据
"""
for item_or_request in result:
if self._should_filter(item_or_request, spider):
self.filtered_count += 1
spider.logger.debug(f"Filtered item: {item_or_request}")
continue
yield item_or_request
def _should_filter(self, item_or_request, spider):
"""
判断是否应该过滤该项目
"""
if isinstance(item_or_request, dict):
# 过滤空数据
if not item_or_request:
return True
# 过滤包含特定关键词的数据
filter_keywords = spider.crawler.settings.getlist('FILTER_KEYWORDS', [])
for keyword in filter_keywords:
if any(keyword.lower() in str(value).lower()
for value in item_or_request.values() if isinstance(value, str)):
return True
elif hasattr(item_or_request, 'fields'):
# 过滤Scrapy Item
adapter = ItemAdapter(item_or_request)
if not adapter.asdict():
return True
return False#数据增强中间件
import hashlib
class DataEnrichmentMiddleware:
"""
数据增强中间件
"""
def process_spider_output(self, response, result, spider):
"""
增强数据,添加额外信息
"""
for item_or_request in result:
if isinstance(item_or_request, dict):
# 添加数据增强信息
enriched_item = self._enrich_item(item_or_request, response)
yield enriched_item
elif hasattr(item_or_request, 'fields'):
# 处理Scrapy Item
adapter = ItemAdapter(item_or_request)
enriched_adapter = self._enrich_item(adapter.asdict(), response)
# 更新原始Item
for key, value in enriched_adapter.items():
adapter[key] = value
yield item_or_request
else:
# 处理Request
item_or_request.meta['enriched_at'] = time.time()
yield item_or_request
def _enrich_item(self, item_dict, response):
"""
增强单个项目
"""
enriched = item_dict.copy()
# 添加来源信息
enriched['source_url'] = response.url
enriched['crawl_timestamp'] = time.time()
enriched['crawl_date'] = time.strftime('%Y-%m-%d %H:%M:%S')
# 添加数据唯一标识
content_str = str(sorted(item_dict.items()))
enriched['data_hash'] = hashlib.md5(content_str.encode()).hexdigest()
# 添加响应相关信息
enriched['response_status'] = response.status
enriched['response_size'] = len(response.body)
# 添加分类信息(如果适用)
if 'url' in enriched:
enriched['domain'] = self._extract_domain(enriched['url'])
return enriched
def _extract_domain(self, url):
"""
从URL提取域名
"""
from urllib.parse import urlparse
parsed = urlparse(url)
return parsed.netloc#异常处理方法
process_spider_exception方法在Spider处理过程中发生异常时调用。
#基础异常处理
import traceback
class BasicExceptionMiddleware:
"""
基础异常处理中间件
"""
def process_spider_exception(self, response, exception, spider):
"""
处理Spider异常的基础方法
"""
spider.logger.error(f"Spider exception in {response.url}: {exception}")
spider.logger.error(f"Traceback: {traceback.format_exc()}")
# 根据异常类型决定处理方式
if isinstance(exception, ValueError):
# 对于ValueError,尝试返回一个空结果
spider.logger.warning(f"ValueError handled for {response.url}")
return []
elif isinstance(exception, KeyError):
# 对于KeyError,记录并继续
spider.logger.warning(f"KeyError handled for {response.url}")
return []
# 对于其他异常,返回None让其他中间件处理
return None#高级异常处理
from scrapy.http import Request
class AdvancedExceptionMiddleware:
"""
高级异常处理中间件
"""
def process_spider_exception(self, response, exception, spider):
"""
高级异常处理逻辑
"""
exception_type = type(exception).__name__
spider.logger.error(f"Exception {exception_type} in {response.url}: {str(exception)}")
# 根据异常类型进行不同处理
if self._is_network_related_exception(exception):
return self._handle_network_exception(response, exception, spider)
elif self._is_parsing_exception(exception):
return self._handle_parsing_exception(response, exception, spider)
elif self._is_data_validation_exception(exception):
return self._handle_validation_exception(response, exception, spider)
# 如果无法处理特定异常,记录并返回空结果
spider.logger.error(f"Unhandled exception type: {exception_type}")
return []
def _is_network_related_exception(self, exception):
"""
检查是否为网络相关异常
"""
network_exceptions = [
'ConnectionError', 'TimeoutError', 'ConnectTimeout',
'ReadTimeout', 'TooManyRedirects'
]
return type(exception).__name__ in network_exceptions
def _is_parsing_exception(self, exception):
"""
检查是否为解析相关异常
"""
parsing_exceptions = [
'ValueError', 'TypeError', 'AttributeError',
'IndexError', 'KeyError'
]
return type(exception).__name__ in parsing_exceptions
def _is_data_validation_exception(self, exception):
"""
检查是否为数据验证相关异常
"""
validation_exceptions = [
'ValidationError', 'AssertionError'
]
return type(exception).__name__ in validation_exceptions
def _handle_network_exception(self, response, exception, spider):
"""
处理网络相关异常
"""
spider.logger.warning(f"Network exception, scheduling retry for {response.url}")
# 返回一个新的请求进行重试
retry_request = response.request.replace(
dont_filter=True,
meta={
**response.request.meta,
'retry_count': response.request.meta.get('retry_count', 0) + 1
}
)
return [retry_request]
def _handle_parsing_exception(self, response, exception, spider):
"""
处理解析相关异常
"""
spider.logger.warning(f"Parsing exception, returning empty result for {response.url}")
# 返回空结果,但记录异常信息
response.meta['parsing_error'] = str(exception)
return []
def _handle_validation_exception(self, response, exception, spider):
"""
处理数据验证相关异常
"""
spider.logger.warning(f"Validation exception, skipping invalid data from {response.url}")
# 返回空结果,表示跳过无效数据
return []#信号系统集成
Spider Middleware可以与Scrapy的信号系统集成,监听爬虫生命周期事件。
#信号中间件基础
from scrapy import signals
from scrapy.exceptions import NotConfigured
class SignalMiddleware:
"""
信号处理中间件
"""
def __init__(self, crawler):
self.crawler = crawler
self.stats = crawler.stats
# 连接信号
crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
crawler.signals.connect(self.request_scheduled, signal=signals.request_scheduled)
crawler.signals.connect(self.response_received, signal=signals.response_received)
crawler.signals.connect(self.item_scraped, signal=signals.item_scraped)
@classmethod
def from_crawler(cls, crawler):
"""
从crawler实例创建中间件
"""
return cls(crawler)
def spider_opened(self, spider):
"""
爬虫开启时的处理
"""
spider.logger.info(f"Spider {spider.name} opened")
self.stats.inc_value('spider/opened_count')
def spider_closed(self, spider, reason):
"""
爬虫关闭时的处理
"""
spider.logger.info(f"Spider {spider.name} closed: {reason}")
self.stats.inc_value('spider/closed_count')
def spider_idle(self, spider):
"""
爬虫空闲时的处理
"""
spider.logger.debug(f"Spider {spider.name} is idle")
self.stats.inc_value('spider/idle_count')
def request_scheduled(self, request, spider):
"""
请求调度时的处理
"""
spider.logger.debug(f"Request scheduled: {request.url}")
self.stats.inc_value('requests/scheduled')
def response_received(self, response, request, spider):
"""
响应接收时的处理
"""
spider.logger.debug(f"Response received: {response.url} (Status: {response.status})")
self.stats.inc_value('responses/received')
self.stats.inc_value(f'responses/status_count/{response.status}')
def item_scraped(self, item, response, spider):
"""
Item被抓取时的处理
"""
spider.logger.debug(f"Item scraped from {response.url}")
self.stats.inc_value('items/scraped')#高级信号处理
import time
from scrapy import signals
from collections import defaultdict, deque
class AdvancedSignalMiddleware:
"""
高级信号处理中间件
"""
def __init__(self, crawler):
self.crawler = crawler
self.stats = crawler.stats
self.performance_metrics = defaultdict(deque)
self.start_time = time.time()
# 连接信号
self._connect_signals()
def _connect_signals(self):
"""
连接所有需要的信号
"""
signal_connections = [
(self.spider_opened, signals.spider_opened),
(self.spider_closed, signals.spider_closed),
(self.request_scheduled, signals.request_scheduled),
(self.response_received, signals.response_received),
(self.item_scraped, signals.item_scraped),
(self.request_dropped, signals.request_dropped),
(self.response_downloaded, signals.response_downloaded),
]
for handler, signal in signal_connections:
self.crawler.signals.connect(handler, signal=signal)
def spider_opened(self, spider):
"""
爬虫开启时初始化性能监控
"""
spider.logger.info(f"Performance monitoring started for {spider.name}")
self.stats.set_value('performance/start_time', time.time())
def spider_closed(self, spider, reason):
"""
爬虫关闭时输出性能报告
"""
total_time = time.time() - self.start_time
spider.logger.info(f"Performance report for {spider.name}:")
spider.logger.info(f" Total time: {total_time:.2f}s")
# 输出统计信息
requests_count = self.stats.get_value('requests/scheduled', 0)
responses_count = self.stats.get_value('responses/received', 0)
items_count = self.stats.get_value('items/scraped', 0)
if total_time > 0:
spider.logger.info(f" Requests/sec: {requests_count/total_time:.2f}")
spider.logger.info(f" Responses/sec: {responses_count/total_time:.2f}")
spider.logger.info(f" Items/sec: {items_count/total_time:.2f}")
def request_scheduled(self, request, spider):
"""
记录请求调度时间
"""
request.meta['scheduled_time'] = time.time()
self.stats.inc_value('requests/scheduled')
def response_received(self, response, request, spider):
"""
处理响应接收事件
"""
scheduled_time = request.meta.get('scheduled_time', time.time())
download_time = time.time() - scheduled_time
# 记录下载时间
self.performance_metrics['download_times'].append(download_time)
self.stats.inc_value('responses/received')
self.stats.inc_value('responses/download_time', download_time)
def response_downloaded(self, response, request, spider):
"""
处理响应下载事件
"""
download_start_time = request.meta.get('download_start_time', time.time())
actual_download_time = time.time() - download_start_time
self.stats.inc_value('responses/actual_download_time', actual_download_time)
def item_scraped(self, item, response, spider):
"""
处理Item抓取事件
"""
self.stats.inc_value('items/scraped')
# 记录抓取延迟
if hasattr(response, 'meta') and 'scheduled_time' in response.meta:
scraping_time = time.time() - response.meta['scheduled_time']
self.stats.inc_value('items/scraping_time', scraping_time)
def request_dropped(self, request, response, spider):
"""
处理请求被丢弃事件
"""
self.stats.inc_value('requests/dropped')
spider.logger.warning(f"Request dropped: {request.url}")
@classmethod
def from_crawler(cls, crawler):
"""
从crawler实例创建中间件
"""
return cls(crawler)#数据预处理技术
数据预处理是在数据进入Spider之前进行的处理,可以提高数据质量和处理效率。
#内容预处理中间件
import re
import json
from lxml import etree
class ContentPreprocessingMiddleware:
"""
内容预处理中间件
"""
def process_spider_input(self, response, spider):
"""
预处理响应内容
"""
content_type = response.headers.get('Content-Type', b'').decode('utf-8').lower()
if 'application/json' in content_type:
self._preprocess_json(response, spider)
elif 'text/html' in content_type:
self._preprocess_html(response, spider)
elif 'application/xml' in content_type or 'text/xml' in content_type:
self._preprocess_xml(response, spider)
return None
def _preprocess_json(self, response, spider):
"""
预处理JSON响应
"""
try:
data = json.loads(response.text)
# 预处理JSON数据
if isinstance(data, dict):
# 扁平化嵌套结构
flattened = self._flatten_dict(data)
response.meta['flattened_json'] = flattened
response.meta['json_data'] = data
response.meta['json_processed'] = True
except json.JSONDecodeError as e:
spider.logger.error(f"JSON decode error: {e}")
response.meta['json_processed'] = False
def _preprocess_html(self, response, spider):
"""
预处理HTML响应
"""
try:
# 解析HTML文档
tree = etree.HTML(response.text)
# 提取基本信息
title = tree.xpath('//title/text()')
response.meta['page_title'] = title[0] if title else ''
# 检查页面结构
links = tree.xpath('//a/@href')
response.meta['link_count'] = len(links)
# 检查是否为列表页或详情页
if self._is_list_page(tree):
response.meta['page_type'] = 'list'
elif self._is_detail_page(tree):
response.meta['page_type'] = 'detail'
else:
response.meta['page_type'] = 'other'
response.meta['html_processed'] = True
except Exception as e:
spider.logger.error(f"HTML preprocessing error: {e}")
response.meta['html_processed'] = False
def _preprocess_xml(self, response, spider):
"""
预处理XML响应
"""
try:
tree = etree.fromstring(response.text.encode('utf-8'))
# 提取XML信息
root_tag = tree.tag
response.meta['xml_root'] = root_tag
response.meta['xml_elements_count'] = len(list(tree.iter()))
response.meta['xml_processed'] = True
except Exception as e:
spider.logger.error(f"XML preprocessing error: {e}")
response.meta['xml_processed'] = False
def _flatten_dict(self, d, parent_key='', sep='_'):
"""
扁平化嵌套字典
"""
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(self._flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def _is_list_page(self, tree):
"""
判断是否为列表页
"""
# 检查是否有列表元素
list_elements = tree.xpath('//ul/li | //ol/li | //div[contains(@class, "item")]')
return len(list_elements) > 5 # 假设超过5个列表项为列表页
def _is_detail_page(self, tree):
"""
判断是否为详情页
"""
# 检查是否有详情页特征
detail_elements = tree.xpath('//div[contains(@class, "detail")] | //div[contains(@class, "content")]')
return len(detail_elements) > 0#编码预处理中间件
import chardet
class EncodingPreprocessingMiddleware:
"""
编码预处理中间件
"""
def process_spider_input(self, response, spider):
"""
预处理响应编码
"""
# 检测实际编码
detected_encoding = chardet.detect(response.body)
response.meta['detected_encoding'] = detected_encoding
# 检查响应声明的编码
declared_encoding = response.headers.encoding
response.meta['declared_encoding'] = declared_encoding
# 如果编码不匹配,尝试使用检测到的编码
if detected_encoding['confidence'] > 0.8:
if (declared_encoding and
declared_encoding.lower() != detected_encoding['encoding'].lower()):
spider.logger.warning(
f"Encoding mismatch: declared={declared_encoding}, detected={detected_encoding['encoding']}"
)
# 尝试使用检测到的编码重新解码
try:
decoded_text = response.body.decode(detected_encoding['encoding'])
# 创建新的Response对象
from scrapy.http import HtmlResponse
new_response = HtmlResponse(
url=response.url,
body=decoded_text.encode('utf-8'),
encoding='utf-8',
request=response.request
)
# 替换响应(这在中间件中可能有限制)
response.meta['corrected_encoding'] = detected_encoding['encoding']
except UnicodeDecodeError:
spider.logger.error(f"Failed to decode with detected encoding: {detected_encoding['encoding']}")
# 记录编码信息
response.meta['encoding_confidence'] = detected_encoding['confidence']
return None#数据后处理技术
数据后处理是在数据离开Spider之后进行的处理,用于数据清洗和格式化。
#数据清洗后处理
import re
from itemadapter import ItemAdapter
class DataCleaningPostprocessingMiddleware:
"""
数据清洗后处理中间件
"""
def __init__(self):
# 定义清洗规则
self.cleaning_rules = {
'text': self._clean_text,
'number': self._clean_number,
'url': self._clean_url,
'email': self._clean_email,
}
def process_spider_output(self, response, result, spider):
"""
后处理输出数据
"""
for item_or_request in result:
if isinstance(item_or_request, (dict, type(None))):
if item_or_request is not None:
cleaned_item = self._clean_item(item_or_request, spider)
yield cleaned_item
else:
yield item_or_request
elif hasattr(item_or_request, 'fields'):
# 处理Scrapy Item
adapter = ItemAdapter(item_or_request)
cleaned_data = self._clean_item(adapter.asdict(), spider)
# 更新原始Item
for key, value in cleaned_data.items():
adapter[key] = value
yield item_or_request
else:
yield item_or_request
def _clean_item(self, item_dict, spider):
"""
清洗单个项目
"""
cleaned = {}
for field_name, value in item_dict.items():
if value is None:
cleaned[field_name] = None
continue
field_type = self._detect_field_type(field_name, value)
if field_type in self.cleaning_rules:
cleaned[field_name] = self.cleaning_rules[field_type](value)
else:
# 默认处理
if isinstance(value, str):
cleaned[field_name] = self._clean_text(value)
else:
cleaned[field_name] = value
return cleaned
def _detect_field_type(self, field_name, value):
"""
检测字段类型
"""
field_name_lower = field_name.lower()
# 基于字段名判断类型
if any(keyword in field_name_lower for keyword in ['name', 'title', 'description', 'content']):
return 'text'
elif any(keyword in field_name_lower for keyword in ['price', 'amount', 'count', 'number']):
return 'number'
elif any(keyword in field_name_lower for keyword in ['url', 'link', 'href']):
return 'url'
elif 'email' in field_name_lower:
return 'email'
# 基于值判断类型
if isinstance(value, (int, float)):
return 'number'
elif isinstance(value, str):
if self._looks_like_url(value):
return 'url'
elif self._looks_like_email(value):
return 'email'
return 'text' # 默认为文本
def _clean_text(self, text):
"""
清洗文本数据
"""
if not isinstance(text, str):
return text
# 去除多余空白
cleaned = re.sub(r'\s+', ' ', text.strip())
# 去除特殊字符(保留中文、英文、数字、基本标点)
cleaned = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:()""''-]', '', cleaned)
return cleaned
def _clean_number(self, number):
"""
清洗数字数据
"""
if isinstance(number, (int, float)):
return number
if isinstance(number, str):
# 提取数字
numbers = re.findall(r'-?\d+\.?\d*', number.replace(',', ''))
if numbers:
try:
if '.' in numbers[0]:
return float(numbers[0])
else:
return int(numbers[0])
except ValueError:
pass
return number
def _clean_url(self, url):
"""
清洗URL数据
"""
if not isinstance(url, str):
return url
# 去除首尾空白
url = url.strip()
# 处理相对URL
if url.startswith('//'):
url = 'https:' + url
elif url.startswith('/'):
# 需要基础URL来处理相对路径
pass
return url
def _clean_email(self, email):
"""
清洗邮箱数据
"""
if not isinstance(email, str):
return email
email = email.strip().lower()
# 验证邮箱格式
import re
if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
return email
return email
def _looks_like_url(self, text):
"""
检查文本是否像URL
"""
return bool(re.match(r'^https?://', text.strip()))
def _looks_like_email(self, text):
"""
检查文本是否像邮箱
"""
return bool(re.match(r'^[^@]+@[^@]+\.[^@]+$', text.strip()))#数据验证后处理
from itemadapter import ItemAdapter
class DataValidationPostprocessingMiddleware:
"""
数据验证后处理中间件
"""
def __init__(self):
self.validation_rules = {
'required': self._validate_required,
'min_length': self._validate_min_length,
'max_length': self._validate_max_length,
'pattern': self._validate_pattern,
'range': self._validate_range,
}
def process_spider_output(self, response, result, spider):
"""
验证输出数据
"""
for item_or_request in result:
if isinstance(item_or_request, (dict, type(None))):
if item_or_request is not None:
validated_item = self._validate_item(item_or_request, spider)
if validated_item is not None:
yield validated_item
else:
yield item_or_request
elif hasattr(item_or_request, 'fields'):
# 处理Scrapy Item
adapter = ItemAdapter(item_or_request)
validated_data = self._validate_item(adapter.asdict(), spider)
if validated_data is not None:
# 更新原始Item
for key, value in validated_data.items():
adapter[key] = value
yield item_or_request
else:
yield item_or_request
def _validate_item(self, item_dict, spider):
"""
验证单个项目
"""
# 获取验证配置
validation_config = getattr(spider, 'validation_config', {})
validated = {}
errors = []
for field_name, value in item_dict.items():
field_validations = validation_config.get(field_name, [])
is_valid = True
for validation_rule in field_validations:
rule_type = validation_rule['type']
rule_params = validation_rule.get('params', {})
if rule_type in self.validation_rules:
valid, error_msg = self.validation_rules[rule_type](value, **rule_params)
if not valid:
is_valid = False
errors.append(f"{field_name}: {error_msg}")
break
if is_valid:
validated[field_name] = value
else:
# 如果字段验证失败,可以选择跳过整个item或设置默认值
if validation_config.get('strict_mode', False):
spider.logger.error(f"Validation failed for item: {errors}")
return None # 跳过整个item
else:
# 设置默认值或跳过该字段
default_value = validation_config.get('default_values', {}).get(field_name)
if default_value is not None:
validated[field_name] = default_value
# 验证完成后可以添加验证状态
validated['_validation_passed'] = len(errors) == 0
validated['_validation_errors'] = errors
return validated
def _validate_required(self, value, **kwargs):
"""
验证必需字段
"""
if value is None or (isinstance(value, str) and not value.strip()):
return False, "Field is required"
return True, ""
def _validate_min_length(self, value, length=0, **kwargs):
"""
验证最小长度
"""
if isinstance(value, str) and len(value) < length:
return False, f"Length must be at least {length}"
return True, ""
def _validate_max_length(self, value, length=1000, **kwargs):
"""
验证最大长度
"""
if isinstance(value, str) and len(value) > length:
return False, f"Length must be at most {length}"
return True, ""
def _validate_pattern(self, value, pattern='', **kwargs):
"""
验证正则表达式模式
"""
import re
if isinstance(value, str) and not re.match(pattern, value):
return False, f"Does not match pattern: {pattern}"
return True, ""
def _validate_range(self, value, min_val=None, max_val=None, **kwargs):
"""
验证数值范围
"""
try:
num_value = float(value)
if min_val is not None and num_value < min_val:
return False, f"Value must be at least {min_val}"
if max_val is not None and num_value > max_val:
return False, f"Value must be at most {max_val}"
return True, ""
except (ValueError, TypeError):
return False, "Value must be a number"#高级中间件技巧
#条件中间件
class ConditionalMiddleware:
"""
条件中间件 - 根据不同条件应用不同处理逻辑
"""
def __init__(self):
self.strategies = {
'ecommerce': self._handle_ecommerce,
'news': self._handle_news,
'social': self._handle_social,
'forum': self._handle_forum
}
def process_spider_input(self, response, spider):
"""
根据网站类型选择处理策略
"""
site_type = self._classify_site(response.url)
if site_type in self.strategies:
return self.strategies[site_type](response, spider, 'input')
return None
def process_spider_output(self, response, result, spider):
"""
根据网站类型选择输出处理策略
"""
site_type = self._classify_site(response.url)
for item_or_request in result:
if site_type in self.strategies:
processed = self.strategies[site_type](response, [item_or_request], spider, 'output')
yield from processed
else:
yield item_or_request
def _classify_site(self, url):
"""
分类网站类型
"""
url_lower = url.lower()
ecommerce_keywords = ['shop', 'buy', 'cart', 'product', 'mall', 'store', 'taobao', 'jd', 'amazon', 'aliexpress']
news_keywords = ['news', 'article', 'blog', 'journal', 'media', 'press']
social_keywords = ['facebook', 'twitter', 'instagram', 'weibo', 'tiktok', 'youtube']
forum_keywords = ['forum', 'bbs', 'discuss', 'thread', 'topic']
for keyword in ecommerce_keywords:
if keyword in url_lower:
return 'ecommerce'
for keyword in news_keywords:
if keyword in url_lower:
return 'news'
for keyword in social_keywords:
if keyword in url_lower:
return 'social'
for keyword in forum_keywords:
if keyword in url_lower:
return 'forum'
return 'general'
def _handle_ecommerce(self, data, spider, phase):
"""
处理电商网站
"""
if phase == 'input':
# 电商网站输入处理
spider.logger.debug(f"E-commerce input processing for {data.url}")
data.meta['site_type'] = 'ecommerce'
return None
else:
# 电商网站输出处理
result = data # data is the result iterator
processed = []
for item in result:
if isinstance(item, dict):
# 电商特定字段处理
item['category_type'] = 'ecommerce'
item['processing_strategy'] = 'detailed'
processed.append(item)
return processed
def _handle_news(self, data, spider, phase):
"""
处理新闻网站
"""
if phase == 'input':
spider.logger.debug(f"News input processing for {data.url}")
data.meta['site_type'] = 'news'
return None
else:
result = data
processed = []
for item in result:
if isinstance(item, dict):
item['category_type'] = 'news'
item['processing_strategy'] = 'fast'
processed.append(item)
return processed
def _handle_social(self, data, spider, phase):
"""
处理社交媒体
"""
if phase == 'input':
spider.logger.debug(f"Social input processing for {data.url}")
data.meta['site_type'] = 'social'
return None
else:
result = data
processed = []
for item in result:
if isinstance(item, dict):
item['category_type'] = 'social'
item['processing_strategy'] = 'compliance'
processed.append(item)
return processed
def _handle_forum(self, data, spider, phase):
"""
处理论坛网站
"""
if phase == 'input':
spider.logger.debug(f"Forum input processing for {data.url}")
data.meta['site_type'] = 'forum'
return None
else:
result = data
processed = []
for item in result:
if isinstance(item, dict):
item['category_type'] = 'forum'
item['processing_strategy'] = 'contextual'
processed.append(item)
return processed#缓存中间件
import hashlib
import pickle
import os
from datetime import datetime, timedelta
class CachingMiddleware:
"""
缓存中间件 - 缓存处理结果以提高效率
"""
def __init__(self, cache_dir='spider_cache'):
self.cache_dir = cache_dir
self.cache_ttl = timedelta(hours=24) # 缓存有效期24小时
os.makedirs(cache_dir, exist_ok=True)
def process_spider_input(self, response, spider):
"""
检查是否有缓存结果
"""
cache_key = self._generate_cache_key(response.url, response.body)
cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl")
if os.path.exists(cache_file):
cache_time = datetime.fromtimestamp(os.path.getmtime(cache_file))
if datetime.now() - cache_time < self.cache_ttl:
try:
with open(cache_file, 'rb') as f:
cached_result = pickle.load(f)
spider.logger.info(f"Cache hit for {response.url}")
response.meta['cached_result'] = cached_result
response.meta['cache_hit'] = True
return None
except:
# 缓存损坏,删除文件
os.remove(cache_file)
response.meta['cache_hit'] = False
return None
def process_spider_output(self, response, result, spider):
"""
缓存处理结果
"""
if response.meta.get('cache_hit'):
# 如果是缓存命中,直接返回缓存结果
cached_result = response.meta.get('cached_result', [])
yield from cached_result
else:
# 正常处理并缓存结果
results = list(result) # 转换为列表以便缓存
# 生成缓存键并保存
cache_key = self._generate_cache_key(response.url, response.body)
cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl")
try:
with open(cache_file, 'wb') as f:
pickle.dump(results, f)
spider.logger.info(f"Result cached for {response.url}")
except Exception as e:
spider.logger.error(f"Failed to cache result: {e}")
yield from results
def _generate_cache_key(self, url, content):
"""
生成缓存键
"""
content_hash = hashlib.md5(content).hexdigest()
url_hash = hashlib.md5(url.encode()).hexdigest()
return f"{url_hash}_{content_hash[:8]}"#性能优化策略
#内存优化中间件
import gc
import sys
from collections import defaultdict
class MemoryOptimizedMiddleware:
"""
内存优化中间件
"""
def __init__(self, max_items_before_gc=1000):
self.max_items_before_gc = max_items_before_gc
self.processed_count = 0
self.memory_monitor = defaultdict(int)
def process_spider_input(self, response, spider):
"""
输入处理时的内存优化
"""
# 监控响应大小
response_size = sys.getsizeof(response.body)
self.memory_monitor['total_response_size'] += response_size
# 检查是否需要垃圾回收
if self.processed_count > 0 and self.processed_count % self.max_items_before_gc == 0:
collected = gc.collect()
spider.logger.debug(f"Garbage collected: {collected} objects")
self.processed_count += 1
return None
def process_spider_output(self, response, result, spider):
"""
输出处理时的内存优化
"""
for item_or_request in result:
# 限制单个项目的大小
item_size = sys.getsizeof(str(item_or_request))
if item_size > 1024 * 1024: # 1MB
spider.logger.warning(f"Large item detected: {item_size} bytes")
# 限制字符串长度
if isinstance(item_or_request, dict):
for key, value in item_or_request.items():
if isinstance(value, str) and len(value) > 10000: # 10KB
item_or_request[key] = value[:10000] + "...[truncated]"
yield item_or_request
def spider_closed(self, spider):
"""
爬虫关闭时的内存清理
"""
spider.logger.info(f"Memory usage summary: {dict(self.memory_monitor)}")
gc.collect()#并发优化中间件
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
class ConcurrentOptimizedMiddleware:
"""
并发优化中间件
"""
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=4)
self.semaphore = threading.Semaphore(10) # 限制并发数
def process_spider_input(self, response, spider):
"""
并发优化的输入处理
"""
with self.semaphore:
# 执行CPU密集型预处理任务
processed_response = self._cpu_intensive_preprocessing(response)
return None
def _cpu_intensive_preprocessing(self, response):
"""
CPU密集型预处理任务
"""
# 例如:复杂的文本分析、图像处理等
# 使用线程池避免阻塞
future = self.executor.submit(self._heavy_computation, response)
# 可以选择同步等待或异步处理
return future.result(timeout=5) # 5秒超时
def _heavy_computation(self, response):
"""
模拟重计算任务
"""
# 实际的计算逻辑
return response#分布式环境下的中间件
在分布式爬虫环境中,Spider Middleware需要考虑跨节点协作、数据一致性、负载均衡等问题。以下是一些适用于分布式环境的中间件设计模式和实现方案。
#分布式任务协调中间件
import redis
import json
import time
from scrapy import signals
from scrapy.exceptions import IgnoreRequest
class DistributedTaskCoordinationMiddleware:
"""
分布式任务协调中间件 - 确保任务在分布式环境中的协调处理
"""
def __init__(self, crawler):
self.crawler = crawler
self.redis_client = redis.from_url(crawler.settings.get('REDIS_URL', 'redis://localhost:6379'))
self.task_coordination_key = 'distributed_spider:task_coordination'
self.node_id = crawler.settings.get('NODE_ID', 'default_node')
# 连接信号
crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def spider_opened(self, spider):
"""
爬虫开启时注册节点
"""
node_info = {
'node_id': self.node_id,
'started_at': time.time(),
'status': 'active'
}
self.redis_client.hset(
f'{self.task_coordination_key}:nodes',
self.node_id,
json.dumps(node_info)
)
spider.logger.info(f"Node {self.node_id} registered in distributed coordination")
def spider_closed(self, spider, reason):
"""
爬虫关闭时注销节点
"""
self.redis_client.hdel(
f'{self.task_coordination_key}:nodes',
self.node_id
)
spider.logger.info(f"Node {self.node_id} unregistered from distributed coordination")
def process_spider_input(self, response, spider):
"""
处理输入前检查任务分配
"""
url = response.url
task_key = f"{self.task_coordination_key}:assigned_tasks:{url}"
# 检查该任务是否已被其他节点处理
assigned_node = self.redis_client.get(task_key)
if assigned_node and assigned_node.decode() != self.node_id:
# 任务已被其他节点分配,跳过
spider.logger.info(f"Task {url} already assigned to node {assigned_node.decode()}, skipping")
raise IgnoreRequest(f"Task already assigned to another node")
# 分配任务给当前节点
self.redis_client.setex(task_key, 3600, self.node_id) # 1小时过期
response.meta['distributed_task_assigned'] = True
return None
def process_spider_output(self, response, result, spider):
"""
处理输出时更新任务状态
"""
for item_or_request in result:
# 标记任务处理完成
if response.meta.get('distributed_task_assigned'):
url = response.url
task_complete_key = f"{self.task_coordination_key}:completed_tasks:{url}"
self.redis_client.setex(task_complete_key, 86400, self.node_id) # 24小时过期
yield item_or_request
class DistributedRateLimitMiddleware:
"""
分布式速率限制中间件 - 在多个节点间协调请求频率
"""
def __init__(self, crawler):
self.crawler = crawler
self.redis_client = redis.from_url(crawler.settings.get('REDIS_URL', 'redis://localhost:6379'))
self.node_id = crawler.settings.get('NODE_ID', 'default_node')
self.rate_limits = {}
# 从配置中加载速率限制规则
rate_limit_config = crawler.settings.get('DISTRIBUTED_RATE_LIMITS', {})
for domain, limit_info in rate_limit_config.items():
self.rate_limits[domain] = {
'requests': limit_info.get('requests', 10),
'window': limit_info.get('window', 60)
}
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_spider_input(self, response, spider):
"""
检查分布式速率限制
"""
from urllib.parse import urlparse
domain = urlparse(response.url).netloc
if domain in self.rate_limits:
limit_info = self.rate_limits[domain]
requests_allowed = limit_info['requests']
window = limit_info['window']
# 使用Redis的滑动窗口算法检查速率限制
current_time = int(time.time())
window_start = current_time - window
# 获取当前窗口内的请求数
requests_key = f"distributed_rate_limit:{domain}:requests"
pipe = self.redis_client.pipeline()
pipe.zremrangebyscore(requests_key, 0, window_start)
pipe.zcard(requests_key)
pipe.zadd(requests_key, {f"{self.node_id}:{current_time}": current_time})
pipe.expire(requests_key, window * 2) # 设置过期时间
results = pipe.execute()
current_requests = results[1]
if current_requests > requests_allowed:
# 超出速率限制,延迟处理
delay_needed = window - (current_time % window)
spider.logger.warning(f"Distributed rate limit exceeded for {domain}, delaying for {delay_needed}s")
# 将请求重新排队(需要自定义调度器配合)
response.meta['distributed_rate_limited'] = True
response.meta['delay_until'] = current_time + delay_needed
return None
class DistributedDataConsistencyMiddleware:
"""
分布式数据一致性中间件 - 确保跨节点数据处理的一致性
"""
def __init__(self, crawler):
self.crawler = crawler
self.redis_client = redis.from_url(crawler.settings.get('REDIS_URL', 'redis://localhost:6379'))
self.node_id = crawler.settings.get('NODE_ID', 'default_node')
self.consistency_key = 'distributed_spider:data_consistency'
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_spider_output(self, response, result, spider):
"""
处理输出时确保数据一致性
"""
for item_or_request in result:
if isinstance(item_or_request, dict):
# 为数据项生成全局唯一ID
import hashlib
item_content = str(sorted(item_or_request.items()))
global_item_id = hashlib.md5(item_content.encode()).hexdigest()
consistency_check_key = f"{self.consistency_key}:processed:{global_item_id}"
# 使用Redis分布式锁确保一致性
lock_key = f"{consistency_check_key}:lock"
lock_timeout = 30 # 30秒锁超时
# 尝试获取分布式锁
if self._acquire_lock(lock_key, self.node_id, lock_timeout):
try:
# 检查是否已处理过
already_processed_by = self.redis_client.get(consistency_check_key)
if already_processed_by:
spider.logger.info(f"Item already processed by node {already_processed_by.decode()}")
continue # 跳过已处理的项目
# 标记为当前节点处理
self.redis_client.setex(
consistency_check_key,
86400 * 7, # 7天过期
self.node_id
)
# 添加处理节点信息
item_or_request['distributed_processed_by'] = self.node_id
item_or_request['distributed_process_time'] = time.time()
yield item_or_request
finally:
# 释放锁
self._release_lock(lock_key, self.node_id)
else:
spider.logger.warning(f"Could not acquire lock for item {global_item_id}, skipping")
else:
yield item_or_request
def _acquire_lock(self, lock_key, node_id, timeout):
"""
获取分布式锁
"""
lua_acquire = """
if redis.call("GET", KEYS[1]) == false then
redis.call("SET", KEYS[1], ARGV[1])
redis.call("EXPIRE", KEYS[1], ARGV[2])
return 1
else
return 0
end
"""
return bool(self.redis_client.eval(lua_acquire, 1, lock_key, node_id, timeout))
def _release_lock(self, lock_key, node_id):
"""
释放分布式锁
"""
lua_release = """
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
"""
return bool(self.redis_client.eval(lua_release, 1, lock_key, node_id))
class DistributedLoadBalancingMiddleware:
"""
分布式负载均衡中间件 - 根据节点负载动态调整任务分配
"""
def __init__(self, crawler):
self.crawler = crawler
self.redis_client = redis.from_url(crawler.settings.get('REDIS_URL', 'redis://localhost:6379'))
self.node_id = crawler.settings.get('NODE_ID', 'default_node')
self.load_balance_key = 'distributed_spider:load_balance'
self.task_assignment_key = f"{self.load_balance_key}:assignments:{self.node_id}"
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_spider_input(self, response, spider):
"""
根据负载情况决定是否处理请求
"""
# 更新节点负载信息
self._update_node_load(spider)
# 检查当前节点负载是否过高
current_load = self._get_current_node_load()
max_load = self.crawler.settings.get('NODE_MAX_LOAD', 100)
if current_load > max_load:
spider.logger.warning(f"Node {self.node_id} load too high ({current_load}), considering task offloading")
# 可以实现任务重新分配逻辑
# 这里简单地记录负载过高
response.meta['node_load_high'] = True
return None
def process_spider_output(self, response, result, spider):
"""
处理完任务后更新负载信息
"""
# 减少当前节点的任务计数
self.redis_client.decr(self.task_assignment_key, 1)
for item_or_request in result:
yield item_or_request
def _update_node_load(self, spider):
"""
更新节点负载信息
"""
# 增加当前节点的任务计数
self.redis_client.incr(self.task_assignment_key, 1)
# 更新负载时间戳
self.redis_client.setex(
f"{self.load_balance_key}:node_last_active:{self.node_id}",
300, # 5分钟过期
time.time()
)
def _get_current_node_load(self):
"""
获取当前节点负载
"""
task_count = int(self.redis_client.get(self.task_assignment_key) or 0)
return task_count
## 常见问题与解决方案 \{#常见问题与解决方案}
### 问题1: 中间件执行顺序错误
**现象**: 中间件没有按照预期顺序执行
**解决方案**:
```python
# 确保在settings.py中正确设置优先级
SPIDER_MIDDLEWARES = {
'myproject.middlewares.ValidationMiddleware': 400, # 先执行验证
'myproject.middlewares.CleaningMiddleware': 500, # 再执行清洗
'myproject.middlewares.EnrichmentMiddleware': 600, # 最后执行增强
}#问题2: 内存泄漏
现象: 长时间运行后内存占用过高 解决方案:
class MemorySafeMiddleware:
def __init__(self):
self.item_cache = {} # 使用弱引用或限制大小
self.max_cache_size = 1000
def process_spider_output(self, response, result, spider):
# 控制缓存大小
if len(self.item_cache) > self.max_cache_size:
self.item_cache.clear()
for item in result:
yield item#问题3: 异常处理不当
现象: 异常导致整个爬虫停止 解决方案:
class RobustExceptionMiddleware:
def process_spider_exception(self, response, exception, spider):
spider.logger.error(f"Exception occurred: {exception}", exc_info=True)
# 记录错误但不中断爬虫
response.meta['error_occurred'] = True
response.meta['error_message'] = str(exception)
# 返回空结果继续处理
return []#最佳实践总结
#设计原则
- 单一职责: 每个中间件只处理一种类型的逻辑,如输入预处理、输出后处理、异常处理等
- 可配置性: 通过settings.py配置中间件参数,提高灵活性
- 健壮性: 妥善处理异常,避免影响整个爬虫运行
- 性能考虑: 避免在中间件中进行耗时操作,考虑异步处理
#性能优化策略
- 内存管理: 及时清理不必要的数据,使用生成器而非列表
- 缓存机制: 对于重复处理的数据使用缓存
- 并发处理: 在适当场景使用并发处理提高效率
- 资源限制: 设置合理的资源使用上限
#分布式环境最佳实践
- 状态管理: 使用Redis等外部存储管理分布式状态
- 一致性保证: 实现分布式锁确保数据一致性
- 负载均衡: 根据节点负载动态分配任务
- 容错机制: 设计故障转移和恢复机制
#部署建议
- 监控: 实施适当的监控和日志记录
- 测试: 充分测试各种边界情况
- 文档: 为自定义中间件编写清晰的文档
- 版本控制: 管理中间件的版本变更
💡 核心要点: Spider Middleware是Scrapy数据处理流程中的关键组件,通过合理使用中间件,可以实现复杂的数据预处理和后处理逻辑,提高爬虫的灵活性和数据质量。
#SEO优化策略
- 关键词优化: 在标题、内容中合理布局"Scrapy", "Spider Middleware", "数据预处理", "数据后处理", "异常处理"等关键词
- 内容结构: 使用清晰的标题层级(H1-H6),便于搜索引擎理解内容结构
- 内部链接: 建立与其他相关教程的内部链接,提升页面权重
- 元数据优化: 在页面头部包含描述性的标题、描述和标签
🔗 相关教程推荐
- Downloader Middleware - 请求响应处理
- Pipeline管道实战 - 数据处理管道
- Selector 选择器 - 数据提取技术
- Item 与 Item Loader - 数据结构定义
- Scrapy-Redis分布式架构 - 分布式爬虫实现
🏷️ 标签云: Scrapy Spider Middleware 数据预处理 数据后处理 异常处理 信号系统 爬虫框架 网络爬虫 Python爬虫 分布式爬虫

