#Downloader Middleware完全指南 - 请求响应拦截与反爬策略详解
📂 所属阶段:第三阶段 — 攻防演练(中间件与反爬篇)
🔗 相关章节:Spider 实战 · Pipeline管道实战
#目录
- Middleware基础概念
- Middleware生命周期
- 请求处理方法
- 响应处理方法
- 异常处理方法
- User-Agent轮换策略
- 代理IP管理
- Cookies管理
- 请求延迟与限速
- 高级Middleware技巧
- 性能优化策略
- 常见问题与解决方案
- SEO优化建议
#Middleware基础概念
Downloader Middleware是Scrapy框架中的核心组件,位于Engine和Downloader之间,用于拦截请求和响应。它是实现反爬策略的关键工具。
#Middleware的作用与优势
"""
Downloader Middleware的主要作用:
1. 请求拦截:修改请求头、添加认证信息、设置代理等
2. 响应处理:处理响应内容、解密数据、重试失败请求等
3. 异常处理:处理下载异常、重试机制、错误恢复等
4. 反爬策略:实现User-Agent轮换、IP代理、请求频率控制等
"""#Middleware与其他组件的关系
"""
Scrapy请求处理流程:
Engine -> Scheduler -> Engine -> Downloader Middleware -> Downloader ->
Downloader Middleware -> Engine -> Spider Middleware -> Spider
"""#Middleware生命周期
Downloader Middleware具有完整的生命周期方法,可以在不同阶段对请求和响应进行处理。
#基础Middleware结构
class BaseMiddleware:
"""
基础Downloader Middleware示例
"""
def __init__(self):
"""
初始化方法,在爬虫启动时调用一次
"""
pass
@classmethod
def from_crawler(cls, crawler):
"""
从crawler实例创建middleware的方法
用于访问settings等配置信息
"""
return cls()
def process_request(self, request, spider):
"""
处理请求的方法
在请求被Downloader处理之前调用
"""
# 返回None表示继续处理
# 返回Response对象表示终止请求,直接返回响应
# 返回Request对象表示发起新的请求
# 抛出IgnoreRequest异常表示忽略此请求
return None
def process_response(self, request, response, spider):
"""
处理响应的方法
在响应被Spider处理之前调用
"""
# 必须返回Response对象
return response
def process_exception(self, request, exception, spider):
"""
处理异常的方法
在下载处理过程中发生异常时调用
"""
# 返回Response对象表示异常已处理
# 返回Request对象表示发起新的请求
# 返回None表示异常未处理,会抛给Scrapy进行后续处理
return None#Middleware配置
# settings.py
DOWNLOADER_MIDDLEWARES = {
# 格式:'路径.到.中间件类': 优先级数字
'myproject.middlewares.UserAgentMiddleware': 543,
'myproject.middlewares.ProxyMiddleware': 544,
'myproject.middlewares.CookiesMiddleware': 545,
}
# 优先级说明:
# 数字越小,优先级越高,越先执行
# Scrapy内置中间件的优先级范围:0-1000
# 自定义中间件建议使用500-1000之间的数字#请求处理方法
process_request方法在请求被Downloader处理之前调用,是最常用的中间件方法之一。
#基础请求处理
class BasicRequestMiddleware:
"""
基础请求处理中间件
"""
def process_request(self, request, spider):
"""
基础请求处理逻辑
"""
# 添加通用请求头
request.headers.setdefault('Accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8')
request.headers.setdefault('Accept-Language', 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3')
request.headers.setdefault('Accept-Encoding', 'gzip, deflate')
request.headers.setdefault('Connection', 'keep-alive')
request.headers.setdefault('Upgrade-Insecure-Requests', '1')
return None#请求修改示例
class RequestModificationMiddleware:
"""
请求修改中间件示例
"""
def process_request(self, request, spider):
"""
修改请求的各种示例
"""
# 1. 修改请求方法
if request.method == 'GET' and request.meta.get('force_post'):
request = request.replace(method='POST')
# 2. 添加自定义元数据
request.meta.setdefault('custom_timestamp', time.time())
# 3. 修改请求URL(谨慎使用)
if request.meta.get('rewrite_url'):
new_url = request.meta['rewrite_url']
request = request.replace(url=new_url)
# 4. 添加认证信息
auth_token = spider.crawler.settings.get('AUTH_TOKEN')
if auth_token:
request.headers['Authorization'] = f'Bearer {auth_token}'
return request#请求终止示例
from scrapy.http import HtmlResponse
from scrapy.exceptions import IgnoreRequest
class RequestTerminationMiddleware:
"""
请求终止中间件示例
"""
def process_request(self, request, spider):
"""
根据条件终止请求的示例
"""
# 1. 返回自定义响应
if request.meta.get('use_cached_response'):
cached_content = self.get_cached_content(request.url)
return HtmlResponse(
url=request.url,
body=cached_content,
encoding='utf-8',
request=request
)
# 2. 忽略特定请求
blocked_domains = spider.crawler.settings.getlist('BLOCKED_DOMAINS')
if any(domain in request.url for domain in blocked_domains):
raise IgnoreRequest("Domain blocked")
# 3. 重定向请求
redirect_map = spider.crawler.settings.get('REDIRECT_MAP', {})
for old_url, new_url in redirect_map.items():
if old_url in request.url:
return request.replace(url=new_url)
return None#响应处理方法
process_response方法在响应被Downloader处理后、Spider处理前调用,用于处理响应内容。
#基础响应处理
class BasicResponseMiddleware:
"""
基础响应处理中间件
"""
def process_response(self, request, response, spider):
"""
基础响应处理逻辑
"""
# 检查响应状态码
if response.status in [403, 404, 500]:
spider.logger.warning(f"Received status {response.status} for {request.url}")
# 检查响应内容类型
content_type = response.headers.get('Content-Type', b'').decode('utf-8').lower()
if 'text/html' not in content_type:
spider.logger.debug(f"Non-HTML content type: {content_type}")
# 添加自定义响应头信息
response.meta['processed_by'] = 'BasicResponseMiddleware'
response.meta['process_time'] = time.time()
return response#响应内容处理
import gzip
import zlib
class ContentProcessingMiddleware:
"""
响应内容处理中间件
"""
def process_response(self, request, response, spider):
"""
处理响应内容,如解压、解密等
"""
# 处理gzip压缩内容
content_encoding = response.headers.get('Content-Encoding', b'').decode('utf-8').lower()
if 'gzip' in content_encoding:
try:
decompressed_body = gzip.decompress(response.body)
response = response.replace(body=decompressed_body)
# 移除Content-Encoding头
response.headers.pop('Content-Encoding', None)
except Exception as e:
spider.logger.error(f"Gzip decompression failed: {e}")
# 处理deflate压缩内容
elif 'deflate' in content_encoding:
try:
decompressed_body = zlib.decompress(response.body)
response = response.replace(body=decompressed_body)
response.headers.pop('Content-Encoding', None)
except Exception as e:
spider.logger.error(f"Deflate decompression failed: {e}")
return response#响应重试逻辑
from scrapy.http import HtmlResponse
class RetryResponseMiddleware:
"""
响应重试中间件
"""
def process_response(self, request, response, spider):
"""
根据响应内容判断是否需要重试
"""
# 检查是否是反爬页面
response_text = response.text.lower()
anti_crawl_indicators = [
'访问过于频繁', '请稍后重试', 'blocked', 'forbidden',
'验证码', 'captcha', 'rate limit', 'too many requests'
]
if any(indicator in response_text for indicator in anti_crawl_indicators):
retry_times = request.meta.get('retry_times', 0)
max_retries = spider.crawler.settings.getint('MAX_RETRY_TIMES', 3)
if retry_times < max_retries:
spider.logger.info(f"Retrying {request.url}, attempt {retry_times + 1}")
# 增加重试次数并重新调度请求
new_request = request.copy()
new_request.meta['retry_times'] = retry_times + 1
new_request.dont_filter = True # 允许重复请求
# 可以添加延迟
delay = spider.crawler.settings.getfloat('DOWNLOAD_DELAY', 1)
new_request.meta['download_delay'] = delay * (retry_times + 1)
return new_request
else:
spider.logger.error(f"Max retries exceeded for {request.url}")
return response#异常处理方法
process_exception方法在下载过程中发生异常时调用,用于处理各种网络异常。
#基础异常处理
import time
from twisted.internet import defer
from twisted.internet.error import TimeoutError, DNSLookupError
from scrapy.core.downloader.handlers.http11 import TunnelError
class BasicExceptionMiddleware:
"""
基础异常处理中间件
"""
def process_exception(self, request, exception, spider):
"""
处理常见的下载异常
"""
if isinstance(exception, TimeoutError):
spider.logger.warning(f"Timeout for {request.url}")
# 可以返回新的请求尝试重试
return self.handle_retry(request, exception, spider)
elif isinstance(exception, DNSLookupError):
spider.logger.error(f"DNS lookup failed for {request.url}")
# DNS错误通常不应该重试
return None
elif isinstance(exception, TunnelError):
spider.logger.warning(f"Tunnel error for {request.url}")
return self.handle_retry(request, exception, spider)
elif isinstance(exception, ConnectionRefusedError):
spider.logger.warning(f"Connection refused for {request.url}")
return self.handle_retry(request, exception, spider)
# 对于其他异常,返回None让Scrapy处理
return None
def handle_retry(self, request, exception, spider):
"""
处理重试逻辑
"""
retry_times = request.meta.get('retry_times', 0)
max_retries = spider.crawler.settings.getint('MAX_RETRY_TIMES', 2)
if retry_times < max_retries:
spider.logger.info(f"Retrying {request.url} due to {type(exception).__name__}")
new_request = request.copy()
new_request.meta['retry_times'] = retry_times + 1
new_request.dont_filter = True
# 添加指数退避延迟
delay = spider.crawler.settings.getfloat('DOWNLOAD_DELAY', 1)
new_request.meta['download_delay'] = delay * (2 ** retry_times)
return new_request
return None#高级异常处理
import socket
import errno
from scrapy.exceptions import IgnoreRequest
class AdvancedExceptionMiddleware:
"""
高级异常处理中间件
"""
def process_exception(self, request, exception, spider):
"""
处理更复杂的异常情况
"""
exception_type = type(exception).__name__
# 处理网络连接相关异常
if isinstance(exception, (socket.error, ConnectionResetError)):
error_code = getattr(exception, 'errno', None)
if error_code in [errno.ECONNRESET, errno.ECONNREFUSED]:
spider.logger.debug(f"Connection reset/refused for {request.url}")
return self.schedule_retry(request, spider)
# 处理SSL相关异常
elif 'SSL' in exception_type or 'Certificate' in exception_type:
spider.logger.warning(f"SSL error for {request.url}: {exception}")
# SSL错误可能需要特殊的处理策略
if request.meta.get('retry_ssl_errors'):
return self.schedule_retry(request, spider)
# 处理HTTP协议异常
elif 'HttpError' in exception_type:
spider.logger.error(f"HTTP error for {request.url}: {exception}")
# 处理超时异常(更详细的处理)
elif isinstance(exception, TimeoutError):
timeout_type = getattr(exception, '__class__', None)
spider.logger.warning(f"Timeout for {request.url}, type: {timeout_type}")
return self.handle_timeout_retry(request, spider)
# 记录异常但不处理
spider.logger.error(f"Unhandled exception for {request.url}: {exception_type}")
return None
def schedule_retry(self, request, spider):
"""
安排重试
"""
retry_times = request.meta.get('retry_times', 0)
max_retries = spider.crawler.settings.getint('MAX_RETRY_TIMES', 3)
if retry_times < max_retries:
# 增加重试延迟
base_delay = spider.crawler.settings.getfloat('DOWNLOAD_DELAY', 1)
exponential_delay = base_delay * (2 ** retry_times)
new_request = request.copy()
new_request.meta.update({
'retry_times': retry_times + 1,
'download_delay': exponential_delay
})
new_request.dont_filter = True
return new_request
return None
def handle_timeout_retry(self, request, spider):
"""
处理超时重试,可能需要更换代理
"""
retry_times = request.meta.get('retry_times', 0)
if retry_times < 2: # 超时重试次数较少
new_request = request.copy()
new_request.meta['retry_times'] = retry_times + 1
new_request.dont_filter = True
# 考虑更换代理
if 'proxy' in new_request.meta:
new_request.meta['change_proxy'] = True
return new_request
return None#User-Agent轮换策略
User-Agent轮换是反爬的基础策略之一,可以有效避免被识别为爬虫。
#基础User-Agent轮换
import random
class UserAgentMiddleware:
"""
基础User-Agent轮换中间件
"""
def __init__(self):
self.user_agents = [
# Chrome on Windows
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
# Chrome on Mac
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
# Firefox on Windows
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0',
# Safari on Mac
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15',
# Mobile - iPhone
'Mozilla/5.0 (iPhone; CPU iPhone OS 17_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Mobile/15E148 Safari/604.1',
# Mobile - Android
'Mozilla/5.0 (Linux; Android 13; SM-G960U) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.6099.129 Mobile Safari/537.36',
]
def process_request(self, request, spider):
"""
随机选择User-Agent
"""
ua = random.choice(self.user_agents)
request.headers['User-Agent'] = ua
return None#高级User-Agent轮换
import random
import itertools
class AdvancedUserAgentMiddleware:
"""
高级User-Agent轮换中间件
"""
def __init__(self):
# 按浏览器类型分组的User-Agent
self.ua_groups = {
'chrome_desktop': [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
],
'firefox_desktop': [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/121.0',
],
'safari_desktop': [
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 14_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15',
],
'mobile': [
'Mozilla/5.0 (iPhone; CPU iPhone OS 17_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Mobile/15E148 Safari/604.1',
'Mozilla/5.0 (iPhone; CPU iPhone OS 16_7_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Mobile/15E148 Safari/604.1',
'Mozilla/5.0 (Linux; Android 13; SM-G960U) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.6099.129 Mobile Safari/537.36',
'Mozilla/5.0 (Linux; Android 13; Pixel 6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.6099.129 Mobile Safari/537.36',
]
}
# 创建轮换器
self.iterators = {group: itertools.cycle(uas) for group, uas in self.ua_groups.items()}
def process_request(self, request, spider):
"""
根据请求特征选择合适的User-Agent
"""
# 根据请求的URL或其他特征选择User-Agent类型
ua_type = self._select_ua_type(request, spider)
ua = next(self.iterators[ua_type])
request.headers['User-Agent'] = ua
# 同时设置其他相关头信息
self._set_related_headers(request, ua)
return None
def _select_ua_type(self, request, spider):
"""
根据请求特征选择User-Agent类型
"""
# 检查是否是移动端请求
if any(mobile_indicator in request.url.lower() for mobile_indicator in ['m.', 'mobile', 'wap']):
return 'mobile'
# 检查是否指定了特定的UA类型
ua_preference = request.meta.get('ua_preference')
if ua_preference in self.ua_groups:
return ua_preference
# 随机选择(可以实现更复杂的策略)
return random.choice(list(self.ua_groups.keys()))
def _set_related_headers(self, request, ua):
"""
设置与User-Agent相关的其他请求头
"""
if 'Chrome' in ua:
request.headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8'
request.headers['Accept-Language'] = 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7'
request.headers['Sec-Ch-Ua'] = '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"'
request.headers['Sec-Ch-Ua-Mobile'] = '?0'
request.headers['Sec-Ch-Ua-Platform'] = '"Windows"'
elif 'Firefox' in ua:
request.headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
request.headers['Accept-Language'] = 'en-US,en;q=0.5'
elif 'Safari' in ua and 'Version' in ua:
request.headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
request.headers['Accept-Language'] = 'en-us,en;q=0.5'#代理IP管理
代理IP管理是应对IP封禁的重要策略,包括代理池管理和智能切换。
#基础代理管理
import random
import time
from collections import defaultdict
class ProxyMiddleware:
"""
基础代理管理中间件
"""
def __init__(self):
# 代理池
self.proxy_pool = [
'http://proxy1:port',
'http://proxy2:port',
'http://proxy3:port',
]
# 代理使用统计
self.proxy_stats = defaultdict(lambda: {
'success_count': 0,
'failure_count': 0,
'last_used': 0,
'ban_count': 0
})
# 代理健康度阈值
self.failure_threshold = 5
self.ban_threshold = 3
def process_request(self, request, spider):
"""
为请求分配代理
"""
# 如果请求已经指定了代理,直接使用
if request.meta.get('proxy'):
return None
# 选择健康的代理
proxy = self._select_proxy()
if proxy:
request.meta['proxy'] = proxy
self.proxy_stats[proxy]['last_used'] = time.time()
return None
def process_response(self, request, response, spider):
"""
处理响应,更新代理状态
"""
proxy = request.meta.get('proxy')
if proxy:
if response.status in [200, 301, 302]:
self.proxy_stats[proxy]['success_count'] += 1
elif response.status in [403, 404, 429, 503]:
self.proxy_stats[proxy]['failure_count'] += 1
if response.status == 403: # 可能是IP被封
self.proxy_stats[proxy]['ban_count'] += 1
return response
def process_exception(self, request, exception, spider):
"""
处理异常,更新代理状态
"""
proxy = request.meta.get('proxy')
if proxy:
self.proxy_stats[proxy]['failure_count'] += 1
# 某些异常可能意味着代理失效
if 'timeout' in str(exception).lower():
self.proxy_stats[proxy]['ban_count'] += 1
def _select_proxy(self):
"""
选择一个健康的代理
"""
healthy_proxies = []
for proxy, stats in self.proxy_stats.items():
# 检查代理是否健康
if (stats['failure_count'] < self.failure_threshold and
stats['ban_count'] < self.ban_threshold):
healthy_proxies.append(proxy)
# 如果没有健康代理,从池中随机选择一个
if not healthy_proxies:
return random.choice(self.proxy_pool) if self.proxy_pool else None
# 随机选择一个健康代理
return random.choice(healthy_proxies)#高级代理管理
import random
import time
import requests
from concurrent.futures import ThreadPoolExecutor
from collections import OrderedDict
class AdvancedProxyMiddleware:
"""
高级代理管理中间件
"""
def __init__(self):
self.proxy_pool = OrderedDict()
self.proxy_quality = {} # 代理质量评分
self.proxy_last_check = {} # 最后检查时间
self.check_interval = 300 # 5分钟检查间隔
self.min_quality_score = 0.5 # 最低质量分数
# 从外部API获取代理(示例)
self._load_proxies_from_api()
def _load_proxies_from_api(self):
"""
从API加载代理(示例)
"""
# 这里应该是实际的API调用
sample_proxies = [
'http://127.0.0.1:8080',
'http://127.0.0.1:8081',
'http://127.0.0.1:8082',
]
for proxy in sample_proxies:
self.proxy_pool[proxy] = {
'status': 'unknown', # unknown, good, bad
'speed': float('inf'), # 响应时间
'location': 'unknown',
'protocol': 'http',
'anonymous': True
}
self.proxy_quality[proxy] = 1.0 # 初始质量分数
def process_request(self, request, spider):
"""
智能分配代理
"""
# 检查是否需要更换代理
if (request.meta.get('change_proxy') or
not request.meta.get('proxy')):
proxy = self._smart_select_proxy(request, spider)
if proxy:
request.meta['proxy'] = proxy
return None
def process_response(self, request, response, spider):
"""
更新代理质量评分
"""
proxy = request.meta.get('proxy')
if proxy:
quality_change = self._evaluate_proxy_quality(response.status)
self._update_proxy_quality(proxy, quality_change)
return response
def process_exception(self, request, exception, spider):
"""
处理异常,降低代理质量评分
"""
proxy = request.meta.get('proxy')
if proxy:
self._update_proxy_quality(proxy, -0.3) # 异常时大幅降低质量分
def _smart_select_proxy(self, request, spider):
"""
智能选择代理
"""
# 获取高质量代理
quality_proxies = [
proxy for proxy, quality in self.proxy_quality.items()
if quality >= self.min_quality_score and
self.proxy_pool[proxy]['status'] != 'bad'
]
if not quality_proxies:
# 如果没有高质量代理,选择最新的代理
recent_proxies = list(self.proxy_pool.keys())[-5:] # 最近的5个
quality_proxies = [p for p in recent_proxies if self.proxy_pool[p]['status'] != 'bad']
if quality_proxies:
# 按质量分数加权随机选择
weights = [self.proxy_quality[p] for p in quality_proxies]
return random.choices(quality_proxies, weights=weights)[0]
return None
def _evaluate_proxy_quality(self, status_code):
"""
根据状态码评估代理质量
"""
if status_code == 200:
return 0.1 # 质量提升
elif status_code in [403, 404, 429, 503]:
return -0.2 # 质量下降
elif status_code >= 500:
return -0.1 # 服务器错误,轻微下降
else:
return 0 # 其他情况不变
def _update_proxy_quality(self, proxy, change):
"""
更新代理质量评分
"""
current_quality = self.proxy_quality.get(proxy, 1.0)
new_quality = max(0, min(1, current_quality + change)) # 限制在0-1之间
self.proxy_quality[proxy] = new_quality
# 根据质量更新状态
if new_quality < 0.3:
self.proxy_pool[proxy]['status'] = 'bad'
elif new_quality < 0.7:
self.proxy_pool[proxy]['status'] = 'caution'
else:
self.proxy_pool[proxy]['status'] = 'good'
def _validate_proxy(self, proxy):
"""
验证代理可用性
"""
try:
start_time = time.time()
response = requests.get(
'http://httpbin.org/ip',
proxies={'http': proxy, 'https': proxy},
timeout=10
)
response_time = time.time() - start_time
if response.status_code == 200:
return True, response_time
except:
pass
return False, float('inf')#Cookies管理
Cookies管理对于维持会话和处理需要登录的网站很重要。
#基础Cookies管理
import json
import time
from urllib.parse import urlparse
class CookiesMiddleware:
"""
基础Cookies管理中间件
"""
def __init__(self):
self.domain_cookies = {} # 按域名存储cookies
self.cookie_jar = {} # cookie jar模拟
self.max_cookie_age = 3600 * 24 * 7 # 7天过期
def process_request(self, request, spider):
"""
为请求添加cookies
"""
domain = self._extract_domain(request.url)
# 获取该域名的cookies
cookies = self.domain_cookies.get(domain, {})
# 过滤过期cookies
current_time = time.time()
valid_cookies = {}
for name, cookie_data in cookies.items():
if current_time - cookie_data.get('timestamp', 0) < self.max_cookie_age:
valid_cookies[name] = cookie_data
else:
spider.logger.debug(f"Cookie expired: {name} for {domain}")
self.domain_cookies[domain] = valid_cookies
# 将cookies添加到请求中
if valid_cookies:
cookie_header = '; '.join([f"{name}={data['value']}"
for name, data in valid_cookies.items()])
request.headers['Cookie'] = cookie_header
return None
def process_response(self, request, response, spider):
"""
从响应中提取cookies
"""
domain = self._extract_domain(request.url)
# 从响应头中提取Set-Cookie
set_cookies = response.headers.getlist('Set-Cookie')
for cookie_header in set_cookies:
cookie_data = self._parse_cookie(cookie_header.decode('utf-8'))
if cookie_data:
if domain not in self.domain_cookies:
self.domain_cookies[domain] = {}
self.domain_cookies[domain][cookie_data['name']] = {
'value': cookie_data['value'],
'domain': cookie_data.get('domain', domain),
'path': cookie_data.get('path', '/'),
'expires': cookie_data.get('expires'),
'timestamp': time.time()
}
return response
def _extract_domain(self, url):
"""
从URL提取域名
"""
parsed = urlparse(url)
return parsed.netloc
def _parse_cookie(self, cookie_str):
"""
解析cookie字符串
"""
parts = cookie_str.split(';')
if not parts:
return None
name_value = parts[0].split('=', 1)
if len(name_value) != 2:
return None
cookie_data = {
'name': name_value[0].strip(),
'value': name_value[1].strip()
}
# 解析其他属性
for part in parts[1:]:
part = part.strip()
if '=' in part:
key, value = part.split('=', 1)
cookie_data[key.lower()] = value.strip()
else:
cookie_data[part.lower()] = True
return cookie_data#高级Cookies管理
import json
import pickle
import os
from datetime import datetime, timedelta
class AdvancedCookiesMiddleware:
"""
高级Cookies管理中间件
"""
def __init__(self, storage_path='cookies_storage'):
self.storage_path = storage_path
self.session_cookies = {} # 会话级别cookies
self.persistent_cookies = {} # 持久化cookies
self.cookie_domains = set() # 已知域名
# 加载持久化cookies
self._load_persistent_cookies()
@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
storage_path = settings.get('COOKIES_STORAGE_PATH', 'cookies_storage')
return cls(storage_path)
def process_request(self, request, spider):
"""
智能处理请求cookies
"""
domain = self._extract_domain(request.url)
# 获取适用于当前请求的cookies
applicable_cookies = self._get_applicable_cookies(domain, request.url)
if applicable_cookies:
# 构建Cookie头
cookie_pairs = [f"{name}={value}" for name, value in applicable_cookies.items()]
cookie_header = '; '.join(cookie_pairs)
request.headers['Cookie'] = cookie_header
return None
def process_response(self, request, response, spider):
"""
处理响应中的cookies
"""
domain = self._extract_domain(request.url)
# 提取Set-Cookie头
set_cookies = response.headers.getlist('Set-Cookie')
for cookie_header in set_cookies:
cookie_obj = self._parse_advanced_cookie(cookie_header.decode('utf-8'), domain)
if cookie_obj:
self._store_cookie(cookie_obj, domain)
return response
def _get_applicable_cookies(self, domain, url):
"""
获取适用于当前请求的cookies
"""
applicable = {}
# 检查当前域名的cookies
if domain in self.persistent_cookies:
for name, cookie_data in self.persistent_cookies[domain].items():
if self._is_cookie_applicable(cookie_data, url):
applicable[name] = cookie_data['value']
# 检查父域名的cookies
parent_domains = self._get_parent_domains(domain)
for parent_domain in parent_domains:
if parent_domain in self.persistent_cookies:
for name, cookie_data in self.persistent_cookies[parent_domain].items():
if (self._is_cookie_applicable(cookie_data, url) and
cookie_data.get('domain_attr') and # 确认是为父域名设置的
name not in applicable): # 避免覆盖
applicable[name] = cookie_data['value']
return applicable
def _is_cookie_applicable(self, cookie_data, url):
"""
检查cookie是否适用于当前URL
"""
# 检查路径匹配
cookie_path = cookie_data.get('path', '/')
if not url.startswith(cookie_data.get('domain', '') + cookie_path):
return False
# 检查过期时间
expires = cookie_data.get('expires')
if expires and datetime.now() > datetime.fromisoformat(expires):
return False
# 检查Secure属性
if cookie_data.get('secure') and not url.startswith('https'):
return False
return True
def _store_cookie(self, cookie_obj, domain):
"""
存储cookie
"""
if domain not in self.persistent_cookies:
self.persistent_cookies[domain] = {}
# 检查是否应该存储(根据过期时间和domain策略)
if self._should_store_cookie(cookie_obj):
self.persistent_cookies[domain][cookie_obj['name']] = cookie_obj
self.cookie_domains.add(domain)
def _should_store_cookie(self, cookie_obj):
"""
判断是否应该存储cookie
"""
# 检查过期时间
expires = cookie_obj.get('expires')
if expires and datetime.now() > datetime.fromisoformat(expires):
return False # 已经过期
# 检查HttpOnly属性(如果需要特殊处理)
if cookie_obj.get('httponly'):
# 根据需要决定是否存储HttpOnly cookies
pass
return True
def _parse_advanced_cookie(self, cookie_str, default_domain):
"""
解析高级cookie格式
"""
parts = cookie_str.split(';')
if not parts:
return None
name_value = parts[0].split('=', 1)
if len(name_value) != 2:
return None
cookie_obj = {
'name': name_value[0].strip(),
'value': name_value[1].strip(),
'domain': default_domain,
'domain_attr': False, # 是否显式设置了domain属性
'path': '/',
'expires': None,
'secure': False,
'httponly': False,
'samesite': None
}
for part in parts[1:]:
part = part.strip()
if '=' in part:
key, value = part.split('=', 1)
key = key.strip().lower()
value = value.strip()
if key == 'domain':
cookie_obj['domain'] = value.lstrip('.')
cookie_obj['domain_attr'] = True
elif key == 'path':
cookie_obj['path'] = value
elif key == 'expires':
try:
expires_dt = datetime.strptime(value, '%a, %d %b %Y %H:%M:%S %Z')
cookie_obj['expires'] = expires_dt.isoformat()
except:
pass # 忽略无效的过期时间
elif key == 'max-age':
try:
max_age = int(value)
expires_dt = datetime.now() + timedelta(seconds=max_age)
cookie_obj['expires'] = expires_dt.isoformat()
except:
pass
elif key == 'samesite':
cookie_obj['samesite'] = value.lower()
else:
key = part.strip().lower()
if key in ['secure', 'httponly']:
cookie_obj[key] = True
return cookie_obj
def _get_parent_domains(self, domain):
"""
获取父域名列表
"""
parts = domain.split('.')
parent_domains = []
for i in range(1, len(parts)):
parent = '.'.join(parts[i:])
if parent in self.cookie_domains:
parent_domains.append(parent)
return parent_domains
def _extract_domain(self, url):
"""
从URL提取域名
"""
from urllib.parse import urlparse
parsed = urlparse(url)
return parsed.netloc
def _load_persistent_cookies(self):
"""
从存储中加载持久化cookies
"""
if os.path.exists(self.storage_path):
try:
with open(self.storage_path, 'rb') as f:
loaded_data = pickle.load(f)
self.persistent_cookies = loaded_data.get('cookies', {})
self.cookie_domains = loaded_data.get('domains', set())
except:
pass # 如果加载失败,使用空的cookies
def _save_persistent_cookies(self):
"""
保存持久化cookies到存储
"""
os.makedirs(os.path.dirname(self.storage_path), exist_ok=True)
data = {
'cookies': self.persistent_cookies,
'domains': self.cookie_domains
}
try:
with open(self.storage_path, 'wb') as f:
pickle.dump(data, f)
except:
pass # 如果保存失败,不影响正常运行
def close_spider(self, spider):
"""
爬虫关闭时保存cookies
"""
self._save_persistent_cookies()#请求延迟与限速
请求延迟和限速是避免被反爬检测的重要策略。
#基础延迟管理
import time
import random
from collections import defaultdict
class RateLimitMiddleware:
"""
基础速率限制中间件
"""
def __init__(self):
self.domain_last_request = defaultdict(float)
self.global_last_request = 0
self.request_count = defaultdict(int)
self.domain_delays = {} # 每个域名的延迟设置
def process_request(self, request, spider):
"""
处理请求延迟
"""
domain = self._extract_domain(request.url)
# 获取延迟设置
min_delay = spider.crawler.settings.getfloat('DOWNLOAD_DELAY', 1)
randomize_delay = spider.crawler.settings.getbool('RANDOMIZE_DOWNLOAD_DELAY', True)
# 计算延迟
delay = min_delay
if randomize_delay:
delay = random.uniform(min_delay * 0.5, min_delay * 1.5)
# 检查是否需要等待
current_time = time.time()
domain_last = self.domain_last_request[domain]
global_last = self.global_last_request
wait_time = max(
(domain_last + delay) - current_time, # 域名级延迟
(global_last + delay) - current_time # 全局延迟
)
if wait_time > 0:
time.sleep(wait_time)
# 更新最后请求时间
self.domain_last_request[domain] = time.time()
self.global_last_request = time.time()
self.request_count[domain] += 1
return None
def _extract_domain(self, url):
"""
提取域名
"""
from urllib.parse import urlparse
return urlparse(url).netloc#高级限速管理
import time
import random
from collections import defaultdict, deque
import threading
class AdvancedRateLimitMiddleware:
"""
高级速率限制中间件
"""
def __init__(self):
self.domain_stats = defaultdict(lambda: {
'requests': deque(maxlen=100), # 最近100个请求的时间戳
'error_count': 0,
'slow_requests': 0,
'adaptive_delay': 1.0
})
self.global_stats = {
'total_requests': 0,
'start_time': time.time()
}
self.lock = threading.Lock()
def process_request(self, request, spider):
"""
高级请求延迟处理
"""
domain = self._extract_domain(request.url)
with self.lock:
domain_stats = self.domain_stats[domain]
# 自适应延迟计算
adaptive_delay = self._calculate_adaptive_delay(domain_stats, spider)
# 检查是否需要延迟
current_time = time.time()
if domain_stats['requests']:
last_request_time = domain_stats['requests'][-1]
time_since_last = current_time - last_request_time
if time_since_last < adaptive_delay:
sleep_time = adaptive_delay - time_since_last
time.sleep(sleep_time)
# 记录请求时间
domain_stats['requests'].append(time.time())
self.global_stats['total_requests'] += 1
return None
def process_response(self, request, response, spider):
"""
根据响应调整限速策略
"""
domain = self._extract_domain(request.url)
with self.lock:
domain_stats = self.domain_stats[domain]
# 根据响应状态调整策略
if response.status in [429, 503, 403]: # 限速或封禁响应
domain_stats['error_count'] += 1
domain_stats['adaptive_delay'] *= 1.5 # 增加延迟
spider.logger.warning(f"Increasing delay for {domain} due to status {response.status}")
elif response.status == 200:
# 成功响应,逐渐减少延迟
if domain_stats['adaptive_delay'] > 1.0:
domain_stats['adaptive_delay'] *= 0.95
domain_stats['adaptive_delay'] = max(domain_stats['adaptive_delay'], 0.5)
return response
def process_exception(self, request, exception, spider):
"""
处理异常,调整限速策略
"""
domain = self._extract_domain(request.url)
with self.lock:
domain_stats = self.domain_stats[domain]
domain_stats['error_count'] += 1
domain_stats['adaptive_delay'] *= 2.0 # 异常时大幅增加延迟
return None
def _calculate_adaptive_delay(self, domain_stats, spider):
"""
计算自适应延迟
"""
base_delay = spider.crawler.settings.getfloat('DOWNLOAD_DELAY', 1.0)
# 基于错误率调整
recent_requests = len(domain_stats['requests'])
if recent_requests > 0:
error_rate = domain_stats['error_count'] / recent_requests
if error_rate > 0.1: # 错误率超过10%
base_delay *= (1 + error_rate * 10)
# 应用自适应延迟因子
adaptive_factor = domain_stats['adaptive_delay']
# 随机化(避免过于规律的请求模式)
random_factor = random.uniform(0.8, 1.2)
final_delay = base_delay * adaptive_factor * random_factor
return max(final_delay, 0.1) # 至少0.1秒延迟
def _extract_domain(self, url):
"""
提取域名
"""
from urllib.parse import urlparse
return urlparse(url).netloc
def spider_closed(self, spider):
"""
爬虫关闭时的统计信息
"""
total_time = time.time() - self.global_stats['start_time']
total_requests = self.global_stats['total_requests']
spider.logger.info(f"Rate limiting stats - Total requests: {total_requests}, "
f"Duration: {total_time:.2f}s, RPS: {total_requests/total_time:.2f}" if total_time > 0 else "N/A")#高级Middleware技巧
#中间件链式处理
class ChainedMiddleware:
"""
链式处理中间件
"""
def __init__(self):
self.middlewares = [
self._process_user_agent,
self._process_proxy,
self._process_cookies,
self._process_headers
]
def process_request(self, request, spider):
"""
链式处理请求
"""
for middleware_func in self.middlewares:
result = middleware_func(request, spider)
if result is not None:
return result
return None
def _process_user_agent(self, request, spider):
"""
处理User-Agent
"""
if not request.headers.get('User-Agent'):
ua = random.choice([
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
])
request.headers['User-Agent'] = ua
def _process_proxy(self, request, spider):
"""
处理代理
"""
if request.meta.get('need_proxy') and not request.meta.get('proxy'):
# 分配代理的逻辑
pass
def _process_cookies(self, request, spider):
"""
处理cookies
"""
# cookies处理逻辑
pass
def _process_headers(self, request, spider):
"""
处理其他请求头
"""
request.headers.setdefault('Accept', 'text/html,application/xhtml+xml,*/*;q=0.9')#条件中间件
class ConditionalMiddleware:
"""
条件中间件
"""
def __init__(self):
self.strategies = {
'ecommerce': self._handle_ecommerce,
'news': self._handle_news,
'social': self._handle_social
}
def process_request(self, request, spider):
"""
根据网站类型应用不同策略
"""
site_type = self._classify_site(request.url)
if site_type in self.strategies:
return self.strategies[site_type](request, spider)
return None
def _classify_site(self, url):
"""
分类网站类型
"""
ecommerce_keywords = ['shop', 'buy', 'cart', 'product', 'amazon', 'taobao']
news_keywords = ['news', 'article', 'blog', 'journal']
social_keywords = ['facebook', 'twitter', 'instagram', 'weibo']
url_lower = url.lower()
for keyword in ecommerce_keywords:
if keyword in url_lower:
return 'ecommerce'
for keyword in news_keywords:
if keyword in url_lower:
return 'news'
for keyword in social_keywords:
if keyword in url_lower:
return 'social'
return 'general'
def _handle_ecommerce(self, request, spider):
"""
处理电商网站
"""
# 电商网站特殊处理
request.headers['Referer'] = self._get_random_referer()
request.meta.setdefault('download_timeout', 30)
return None
def _handle_news(self, request, spider):
"""
处理新闻网站
"""
# 新闻网站特殊处理
request.headers['Accept-Encoding'] = 'gzip, deflate'
return None
def _handle_social(self, request, spider):
"""
处理社交媒体
"""
# 社交媒体特殊处理
request.meta.setdefault('dont_redirect', True)
return None
def _get_random_referer(self):
"""
获取随机Referer
"""
referers = [
'https://www.google.com/',
'https://www.baidu.com/',
'https://www.bing.com/',
'https://www.yahoo.com/'
]
return random.choice(referers)#性能优化策略
#缓存优化
import hashlib
from functools import lru_cache
class OptimizedMiddleware:
"""
性能优化中间件
"""
def __init__(self):
self.ua_cache = {}
self.proxy_cache = {}
self.domain_cache = {}
@lru_cache(maxsize=1000)
def _get_domain_from_url(self, url):
"""
缓存域名解析结果
"""
from urllib.parse import urlparse
return urlparse(url).netloc
def process_request(self, request, spider):
"""
优化的请求处理
"""
# 使用缓存的域名
domain = self._get_domain_from_url(request.url)
# 使用LRU缓存的User-Agent
if not request.headers.get('User-Agent'):
ua = self._get_cached_user_agent(domain)
request.headers['User-Agent'] = ua
return None
def _get_cached_user_agent(self, domain):
"""
获取缓存的User-Agent
"""
if domain not in self.ua_cache:
self.ua_cache[domain] = random.choice([
f'Mozilla/5.0 (compatible; DomainSpecificBot/{random.randint(1, 9)}.{random.randint(0, 9)})',
f'SpecializedCrawler-{domain}/{random.randint(1, 9)}.{random.randint(0, 9)}'
])
return self.ua_cache[domain]#异步处理优化
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
class AsyncMiddleware:
"""
异步处理中间件
"""
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=10)
self.session = None
async def _initialize_session(self):
"""
初始化异步会话
"""
if self.session is None:
self.session = aiohttp.ClientSession()
def process_request(self, request, spider):
"""
异步处理请求(同步接口)
"""
# 对于复杂的异步操作,可以使用线程池
if request.meta.get('async_processing'):
# 将异步操作提交到线程池
future = self.executor.submit(self._sync_async_operation, request, spider)
# 可以选择等待或不等待
pass
return None
def _sync_async_operation(self, request, spider):
"""
同步包装的异步操作
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(self._async_operation(request, spider))
return result
finally:
loop.close()
async def _async_operation(self, request, spider):
"""
实际的异步操作
"""
await self._initialize_session()
# 异步操作逻辑
pass#常见问题与解决方案
#问题1: 中间件不生效
现象: 配置了中间件但没有执行 解决方案:
# 检查settings.py中的配置
DOWNLOADER_MIDDLEWARES = {
'myproject.middlewares.MyMiddleware': 543, # 确保路径正确
}
# 确保中间件类名正确且可导入
# 检查是否有语法错误#问题2: 请求被无限重定向
现象: 请求在中间件中被不断重定向 解决方案:
class SafeRedirectMiddleware:
def process_request(self, request, spider):
# 检查重定向次数
redirect_times = request.meta.get('redirect_times', 0)
if redirect_times > 5: # 限制重定向次数
return None # 不再重定向
# 你的重定向逻辑
new_request = request.replace(url=new_url)
new_request.meta['redirect_times'] = redirect_times + 1
return new_request#问题3: 代理切换不及时
现象: 代理IP失效后仍继续使用 解决方案:
class SmartProxyMiddleware:
def process_response(self, request, response, spider):
# 检测代理是否失效
if response.status in [403, 429, 503]:
# 标记当前代理为失效
current_proxy = request.meta.get('proxy')
if current_proxy:
self.mark_proxy_failed(current_proxy)
# 立即更换代理
new_request = request.copy()
new_request.meta['change_proxy'] = True
new_request.dont_filter = True
return new_request
return response#最佳实践建议
#设计原则
- 模块化: 将不同功能分离到不同的中间件中
- 可配置: 通过settings.py配置中间件参数
- 健壮性: 妥善处理异常,避免影响整个爬虫
- 性能考虑: 避免在中间件中进行耗时操作
#安全考虑
- 隐私保护: 注意处理敏感信息
- 合规性: 遵守网站的robots.txt和使用条款
- 频率控制: 合理控制请求频率,避免对目标服务器造成压力
💡 核心要点: Downloader Middleware是Scrapy反爬策略的核心组件,通过合理配置和使用中间件,可以有效应对各种反爬机制。记住要根据目标网站的特点选择合适的策略。
#SEO优化建议
为了提高这篇Downloader Middleware教程在搜索引擎中的排名,以下是几个关键的SEO优化建议:
#标题优化
- 主标题: 包含核心关键词"Downloader Middleware", "反爬策略", "代理IP"
- 二级标题: 每个章节标题都包含相关的长尾关键词
- H1-H6层次结构: 保持正确的标题层级,便于搜索引擎理解内容结构
#内容优化
- 关键词密度: 在内容中自然地融入关键词如"Scrapy", "Downloader Middleware", "反爬策略", "User-Agent", "Cookies管理", "代理IP"等
- 元描述: 在文章开头的元数据中包含吸引人的描述
- 内部链接: 链接到其他相关教程,如Spider 实战等
- 外部权威链接: 引用官方文档和权威资源
#技术SEO
- 页面加载速度: 优化代码块和图片加载
- 移动端适配: 确保在移动设备上良好显示
- 结构化数据: 使用适当的HTML标签和语义化元素
#用户体验优化
- 内容可读性: 使用清晰的段落结构和代码示例
- 互动元素: 提供实际可运行的代码示例
- 更新频率: 定期更新内容以保持时效性
🔗 相关教程推荐
- Spider 实战 - 爬虫逻辑实现
- Pipeline管道实战 - 数据处理管道
- Selector 选择器 - 数据提取技术
- Item 与 Item Loader - 数据结构定义
- Downloader Middleware - 请求响应拦截
🏷️ 标签云: Scrapy Downloader Middleware 反爬策略 代理IP User-Agent Cookies管理 请求处理 爬虫框架 网络爬虫 Python爬虫

