#Scrapy反爬对抗实战完全指南 - 验证码破解与全方位反检测技术详解
📂 所属阶段:第三阶段 — 攻防演练(中间件与反爬篇)
🔗 相关章节:Downloader Middleware · Selenium与Playwright集成 · 代理IP池集成
#目录
- 反爬机制概述
- 验证码识别与破解
- 请求头伪装技术
- 浏览器指纹反检测
- IP封禁规避策略
- 频率限制绕过
- Cookie与Session管理
- JavaScript反爬绕过
- 分布式爬虫部署
- 反爬检测与监控
- 法律合规与道德规范
- SEO优化建议
#反爬机制概述
现代网站采用了多层次的反爬虫机制,了解这些机制是制定有效对策的基础。
#常见反爬技术分类
"""
反爬技术分类:
1. 基于请求特征的检测:
- User-Agent检测
- IP频率限制
- 请求头完整性检查
- Cookie验证
2. 基于行为的检测:
- 访问频率分析
- 页面停留时间
- 鼠标轨迹分析
- 点击模式检测
3. 基于技术的检测:
- JavaScript执行检测
- 浏览器指纹识别
- 设备指纹分析
- 网络栈特征检测
4. 基于内容的检测:
- 验证码挑战
- 滑块验证
- 图像识别验证
- 行为验证
"""#反爬检测原理
"""
反爬检测工作原理:
1. 请求分析层:
- 检查请求头是否完整
- 验证User-Agent合法性
- 分析请求模式
2. 行为分析层:
- 监控访问频率
- 分析用户行为模式
- 检测自动化特征
3. 技术检测层:
- JavaScript环境检测
- 浏览器特性验证
- 设备指纹匹配
4. 内容保护层:
- 动态内容生成
- 验证码挑战
- 人机验证
"""#验证码识别与破解
#图像验证码处理
import cv2
import numpy as np
from PIL import Image
import pytesseract
from captcha_solver import CaptchaSolver
class ImageCaptchaHandler:
"""
图像验证码处理器
"""
def __init__(self):
self.solvers = {
'tesseract': self._solve_with_tesseract,
'opencv': self._solve_with_opencv,
'ml_model': self._solve_with_ml_model
}
def preprocess_image(self, image_path):
"""
图像预处理
"""
# 读取图像
img = cv2.imread(image_path)
# 转换为灰度图
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 去噪
denoised = cv2.medianBlur(gray, 5)
# 二值化
_, binary = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
# 形态学操作
kernel = np.ones((2,2), np.uint8)
processed = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
return processed
def _solve_with_tesseract(self, image_path):
"""
使用Tesseract OCR解码
"""
processed_img = self.preprocess_image(image_path)
text = pytesseract.image_to_string(processed_img, config='--psm 8')
return text.strip()
def _solve_with_opencv(self, image_path):
"""
使用OpenCV模板匹配
"""
img = cv2.imread(image_path, 0)
# 这里可以实现字符分割和模板匹配逻辑
# 简化示例
text = pytesseract.image_to_string(img)
return text.strip()
def _solve_with_ml_model(self, image_path):
"""
使用机器学习模型
"""
# 加载预训练模型
# model = load_model('captcha_model.h5')
# prediction = model.predict(image_array)
# return decode_prediction(prediction)
pass
def solve_captcha(self, image_path, method='tesseract'):
"""
解码验证码
"""
solver = self.solvers.get(method)
if solver:
return solver(image_path)
else:
raise ValueError(f"Unknown solver method: {method}")
class AdvancedCaptchaHandler(ImageCaptchaHandler):
"""
高级验证码处理器
"""
def __init__(self):
super().__init__()
self.ml_models = {}
def handle_distorted_captcha(self, image_path):
"""
处理扭曲验证码
"""
img = cv2.imread(image_path)
# 几何校正
corrected = self._correct_geometry(img)
# 字符分割
characters = self._segment_characters(corrected)
# 逐个识别
result = ""
for char_img in characters:
recognized = self._recognize_character(char_img)
result += recognized
return result
def _correct_geometry(self, img):
"""
几何校正
"""
# 实现透视变换等几何校正算法
return img
def _segment_characters(self, img):
"""
字符分割
"""
# 实现字符分割算法
return [img] # 简化返回
def _recognize_character(self, char_img):
"""
字符识别
"""
# 使用训练好的字符识别模型
text = pytesseract.image_to_string(char_img, config='--psm 10')
return text.strip()[0] if text.strip() else ""
# 验证码中间件
class CaptchaMiddleware:
"""
验证码处理中间件
"""
def __init__(self):
self.captcha_handler = AdvancedCaptchaHandler()
self.captcha_urls = set()
def process_response(self, request, response, spider):
"""
处理响应,检测验证码
"""
if self._detect_captcha(response):
# 保存验证码图片
captcha_img_path = self._save_captcha_image(response)
# 解码验证码
captcha_text = self.captcha_handler.solve_captcha(captcha_img_path)
# 重新发起请求,带上验证码参数
new_request = self._create_captcha_request(request, captcha_text)
return new_request
return response
def _detect_captcha(self, response):
"""
检测验证码
"""
# 检查响应中是否包含验证码特征
captcha_indicators = [
'captcha', 'verification', 'validate', 'auth', 'security'
]
for indicator in captcha_indicators:
if indicator.lower() in response.text.lower():
return True
# 检查是否有验证码图片
import re
captcha_patterns = [
r'captcha.*?\.(jpg|jpeg|png|gif)',
r'verification.*?\.(jpg|jpeg|png|gif)',
r'/auth/image'
]
for pattern in captcha_patterns:
if re.search(pattern, response.text, re.IGNORECASE):
return True
return False
def _save_captcha_image(self, response):
"""
保存验证码图片
"""
import tempfile
import os
# 查找验证码图片URL
import re
img_pattern = r'<img[^>]*src=["\']([^"\']*captcha[^"\'>]*)["\'][^>]*>'
matches = re.findall(img_pattern, response.text, re.IGNORECASE)
if matches:
captcha_url = matches[0]
if not captcha_url.startswith('http'):
from urllib.parse import urljoin
captcha_url = urljoin(response.url, captcha_url)
# 下载验证码图片
import requests
img_response = requests.get(captcha_url)
# 保存到临时文件
temp_dir = tempfile.mkdtemp()
img_path = os.path.join(temp_dir, 'captcha.png')
with open(img_path, 'wb') as f:
f.write(img_response.content)
return img_path
return None
def _create_captcha_request(self, original_request, captcha_text):
"""
创建带验证码的请求
"""
# 构造包含验证码的新请求
# 这里需要根据具体网站的表单结构来实现
pass#滑块验证码处理
import time
import random
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
class SliderCaptchaHandler:
"""
滑块验证码处理器
"""
def __init__(self, driver):
self.driver = driver
def solve_slider_captcha(self, slider_element, track_element=None):
"""
解决滑块验证码
"""
# 获取滑块位置
slider_location = slider_element.location
slider_size = slider_element.size
# 模拟人类滑动轨迹
track = self._generate_human_like_track(slider_element)
# 执行滑动
self._perform_slide(slider_element, track)
# 验证结果
time.sleep(2)
return self._verify_result()
def _generate_human_like_track(self, slider_element):
"""
生成类似人类的滑动轨迹
"""
# 计算滑动距离
slider_width = slider_element.size['width']
# 生成不规则的滑动路径
track = []
current_pos = 0
target_pos = slider_width
# 分段滑动,模拟人类操作
segments = random.randint(3, 6)
segment_distance = target_pos / segments
for i in range(segments):
# 每段的速度不同
speed_factor = random.uniform(0.8, 1.2)
segment_steps = int(segment_distance * speed_factor)
for j in range(segment_steps):
current_pos += 1 / speed_factor
# 添加随机偏移,模拟手部抖动
offset = random.uniform(-2, 2)
track.append((current_pos + offset, random.uniform(-1, 1)))
return track
def _perform_slide(self, slider_element, track):
"""
执行滑动操作
"""
actions = ActionChains(self.driver)
# 点击并按住滑块
actions.click_and_hold(slider_element).perform()
# 按轨迹滑动
for x_offset, y_offset in track:
actions.move_by_offset(x_offset, y_offset).perform()
# 随机暂停,模拟人类操作
time.sleep(random.uniform(0.01, 0.05))
# 释放滑块
actions.release().perform()
def _verify_result(self):
"""
验证滑动结果
"""
# 检查是否有验证成功的标志
try:
success_element = self.driver.find_element(By.CLASS_NAME, "verify-success")
return True
except:
# 检查是否有验证失败的标志
try:
fail_element = self.driver.find_element(By.CLASS_NAME, "verify-fail")
return False
except:
# 不确定结果,返回假定成功
return True
class GeometricCaptchaHandler:
"""
几何图形验证码处理器
"""
def __init__(self):
self.template_matching = cv2.TM_CCOEFF_NORMED
def solve_geometric_captcha(self, bg_image_path, target_image_path):
"""
解决几何图形验证码(如拼图验证码)
"""
# 读取背景图和目标图
bg_img = cv2.imread(bg_image_path, 0)
target_img = cv2.imread(target_image_path, 0)
# 模板匹配找到目标位置
result = cv2.matchTemplate(bg_img, target_img, self.template_matching)
_, _, _, max_loc = cv2.minMaxLoc(result)
# 计算偏移量
offset_x = max_loc[0]
return offset_x
def solve_rotation_captcha(self, image_path):
"""
解决旋转验证码
"""
img = cv2.imread(image_path)
# 计算图像的角度
angle = self._detect_rotation_angle(img)
# 旋转图像到正确角度
rotated = self._rotate_image(img, -angle)
return angle
def _detect_rotation_angle(self, img):
"""
检测图像旋转角度
"""
# 使用霍夫变换检测直线,从而计算角度
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150, apertureSize=3)
lines = cv2.HoughLines(edges, 1, np.pi/180, threshold=100)
if lines is not None:
angles = []
for rho, theta in lines[:, 0]:
angle = theta * 180 / np.pi
angles.append(angle)
# 计算平均角度
avg_angle = np.mean(angles)
return avg_angle
return 0
def _rotate_image(self, img, angle):
"""
旋转图像
"""
center = tuple(np.array(img.shape[1::-1]) / 2)
rot_mat = cv2.getRotationMatrix2D(center, angle, 1.0)
rotated = cv2.warpAffine(img, rot_mat, img.shape[1::-1], flags=cv2.INTER_LINEAR)
return rotated#请求头伪装技术
#动态请求头生成
import random
import platform
import uuid
from fake_useragent import UserAgent
class DynamicHeadersGenerator:
"""
动态请求头生成器
"""
def __init__(self):
self.ua = UserAgent()
self.accept_headers = [
'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'application/json, text/plain, */*',
'*/*'
]
self.accept_languages = [
'zh-CN,zh;q=0.9,en;q=0.8',
'en-US,en;q=0.9,zh-CN;q=0.8',
'zh;q=0.9,en;q=0.8,en-US;q=0.7',
'en-GB,en;q=0.9,en-US;q=0.8,de;q=0.7',
'fr-CH, fr;q=0.9, en;q=0.8, de;q=0.7'
]
self.accept_encodings = [
'gzip, deflate, br',
'gzip, deflate',
'deflate',
'identity'
]
def generate_headers(self, url=None):
"""
生成随机请求头
"""
headers = {
'User-Agent': self._get_random_user_agent(),
'Accept': random.choice(self.accept_headers),
'Accept-Language': random.choice(self.accept_languages),
'Accept-Encoding': random.choice(self.accept_encodings),
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0'
}
# 根据URL添加特定头部
if url:
domain = self._extract_domain(url)
specific_headers = self._get_domain_specific_headers(domain)
headers.update(specific_headers)
return headers
def _get_random_user_agent(self):
"""
获取随机User-Agent
"""
try:
return self.ua.random
except:
# 备用User-Agent列表
backup_uas = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15'
]
return random.choice(backup_uas)
def _extract_domain(self, url):
"""
提取域名
"""
from urllib.parse import urlparse
return urlparse(url).netloc
def _get_domain_specific_headers(self, domain):
"""
获取特定域名的头部
"""
domain_headers = {
'google.com': {
'Referer': 'https://www.google.com/',
'Origin': 'https://www.google.com'
},
'baidu.com': {
'Referer': 'https://www.baidu.com/',
'X-Requested-With': 'XMLHttpRequest'
}
}
return domain_headers.get(domain, {})
class HeadersRotatorMiddleware:
"""
请求头轮换中间件
"""
def __init__(self):
self.header_generator = DynamicHeadersGenerator()
self.used_headers = set()
def process_request(self, request, spider):
"""
处理请求,轮换请求头
"""
# 生成新的请求头
new_headers = self.header_generator.generate_headers(request.url)
# 更新请求头
for key, value in new_headers.items():
request.headers[key] = value
# 记录使用的请求头(用于统计)
header_signature = self._get_header_signature(new_headers)
self.used_headers.add(header_signature)
return None
def _get_header_signature(self, headers):
"""
获取请求头签名
"""
ua = headers.get('User-Agent', '')
accept = headers.get('Accept', '')
language = headers.get('Accept-Language', '')
return f"{ua[:50]}_{accept[:30]}_{language[:20]}"
class FingerprintSpoofingMiddleware:
"""
指纹伪装中间件
"""
def __init__(self):
self.device_fingerprints = [
{
'screen_resolution': '1920x1080',
'color_depth': '24',
'timezone_offset': '480',
'language': 'zh-CN',
'platform': 'Win32',
'cookies_enabled': True,
'javascript_enabled': True
},
{
'screen_resolution': '1366x768',
'color_depth': '24',
'timezone_offset': '480',
'language': 'en-US',
'platform': 'Win32',
'cookies_enabled': True,
'javascript_enabled': True
},
{
'screen_resolution': '1440x900',
'color_depth': '24',
'timezone_offset': '480',
'language': 'en-GB',
'platform': 'MacIntel',
'cookies_enabled': True,
'javascript_enabled': True
}
]
def process_request(self, request, spider):
"""
处理请求,伪装设备指纹
"""
# 随机选择一个设备指纹
fingerprint = random.choice(self.device_fingerprints)
# 添加指纹相关信息到请求
request.meta['device_fingerprint'] = fingerprint
# 可以在这里添加更多指纹相关的请求头
request.headers['X-Screen-Resolution'] = fingerprint['screen_resolution']
request.headers['X-Color-Depth'] = fingerprint['color_depth']
request.headers['X-Timezone-Offset'] = fingerprint['timezone_offset']
return None#浏览器指纹反检测
#浏览器指纹伪装
class BrowserFingerprintSpoofing:
"""
浏览器指纹伪装类
"""
def __init__(self):
self.browser_configs = {
'chrome': self._configure_chrome_spoofing,
'firefox': self._configure_firefox_spoofing,
'safari': self._configure_safari_spoofing
}
def _configure_chrome_spoofing(self, options):
"""
Chrome浏览器指纹伪装配置
"""
# 屏蔽webdriver检测
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
# 添加隐藏webdriver的脚本
options.add_argument("--disable-blink-features=AutomationControlled")
# 设置窗口大小
options.add_argument("--window-size=1920,1080")
# 禁用某些可能暴露自动化的特性
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--no-sandbox")
options.add_argument("--disable-gpu")
return options
def _configure_firefox_spoofing(self, options):
"""
Firefox浏览器指纹伪装配置
"""
# 设置Firefox特定的伪装选项
options.set_preference("dom.webdriver.enabled", False)
options.set_preference("useAutomationExtension", False)
# 隐藏Firefox特定属性
options.set_preference("general.useragent.override",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0")
return options
def _configure_safari_spoofing(self, options):
"""
Safari浏览器指纹伪装配置
"""
# Safari配置较为复杂,通常使用真实Safari浏览器
pass
def apply_stealth_script(self, driver):
"""
应用反检测JavaScript脚本
"""
stealth_script = """
// 隐藏webdriver属性
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined,
});
// 模拟插件
Object.defineProperty(navigator, 'plugins', {
get: () => {
return [
{ filename: 'internal-pdf-viewer', description: 'Portable Document Format' },
{ filename: 'internal-nacl-plugin', description: 'Native Client' }
];
},
});
// 模拟语言
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'zh', 'en'],
});
// 模拟平台
Object.defineProperty(navigator, 'platform', {
get: () => 'Win32',
});
// 隐藏Chrome属性
Object.defineProperty(window, 'chrome', {
value: new Proxy({}, {
get(target, prop) {
if (prop === 'runtime') return {};
return target[prop];
}
}),
writable: false,
});
// 隐藏eval toString
const originalToString = Function.prototype.toString;
Function.prototype.toString = function() {
if (this === window.cdc_adoQpoasnfa76pfcZLmcfl_Array) {
return 'function Array() { [native code] }';
}
return originalToString.call(this);
};
// 修改webgl指纹
const getParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = function(parameter) {
if (parameter === 37445) {
return 'Intel Inc.';
} else if (parameter === 37446) {
return 'Intel Iris OpenGL Engine';
}
return getParameter(parameter);
};
"""
driver.execute_script(stealth_script)
class AdvancedStealthMiddleware:
"""
高级反检测中间件
"""
def __init__(self):
self.spoofing_techniques = [
self._spoof_canvas_fingerprint,
self._spoof_webgl_fingerprint,
self._spoof_audio_fingerprint,
self._spoof_font_fingerprint
]
def process_request(self, request, spider):
"""
处理请求,应用反检测技术
"""
if request.meta.get('use_selenium') or request.meta.get('use_playwright'):
# 在浏览器自动化请求中应用反检测
request.meta['apply_stealth'] = True
return None
def _spoof_canvas_fingerprint(self, driver):
"""
伪造Canvas指纹
"""
canvas_script = """
const originalToDataURL = HTMLCanvasElement.prototype.toDataURL;
const originalToBlob = HTMLCanvasElement.prototype.toBlob;
HTMLCanvasElement.prototype.toDataURL = function() {
// 修改canvas输出以避免指纹识别
return originalToDataURL.apply(this, arguments);
};
HTMLCanvasElement.prototype.toBlob = function() {
// 修改blob输出以避免指纹识别
return originalToBlob.apply(this, arguments);
};
"""
driver.execute_script(canvas_script)
def _spoof_webgl_fingerprint(self, driver):
"""
伪造WebGL指纹
"""
webgl_script = """
const getParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = function(parameter) {
switch (parameter) {
case 37445: // UNMASKED_VENDOR_WEBGL
return 'Intel Inc.';
case 37446: // UNMASKED_RENDERER_WEBGL
return 'Intel Iris OpenGL Engine';
case 7936: // VERSION
return 'WebGL 1.0 (OpenGL ES 2.0 Chromium)';
case 34076: // SHADING_LANGUAGE_VERSION
return 'WebGL GLSL ES 1.0 (OpenGL ES GLSL ES 1.0 Chromium)';
default:
return getParameter.call(this, parameter);
}
};
"""
driver.execute_script(webgl_script)
def _spoof_audio_fingerprint(self, driver):
"""
伪造Audio指纹
"""
audio_script = """
const audioContextProto = window.AudioContext || window.webkitAudioContext;
if (audioContextProto) {
const originalCreateAnalyser = audioContextProto.prototype.createAnalyser;
audioContextProto.prototype.createAnalyser = function() {
const analyser = originalCreateAnalyser.call(this);
// 修改音频分析器的行为以避免指纹识别
return analyser;
};
}
"""
driver.execute_script(audio_script)
def _spoof_font_fingerprint(self, driver):
"""
伪造字体指纹
"""
font_script = """
// 确保字体检测的一致性
const testFonts = [
'Arial', 'Courier New', 'Georgia', 'Times New Roman', 'Verdana',
'Helvetica', 'Impact', 'Comic Sans MS', 'Trebuchet MS', 'Consolas'
];
// 这里可以添加字体检测的标准化逻辑
"""
driver.execute_script(font_script)#IP封禁规避策略
#智能IP轮换
import time
import random
from collections import defaultdict, deque
import requests
class IntelligentIPManager:
"""
智能IP管理器
"""
def __init__(self, proxy_list=None):
self.proxy_list = proxy_list or []
self.ip_stats = defaultdict(lambda: {
'success_count': 0,
'failure_count': 0,
'score': 100,
'last_used': 0,
'ban_status': False,
'ban_time': 0
})
self.current_ip_index = 0
self.rotation_strategy = 'smart'
def add_proxy(self, proxy):
"""
添加代理IP
"""
if proxy not in self.proxy_list:
self.proxy_list.append(proxy)
self.ip_stats[proxy] = {
'success_count': 0,
'failure_count': 0,
'score': 100,
'last_used': 0,
'ban_status': False,
'ban_time': 0
}
def get_best_proxy(self):
"""
获取最佳代理IP
"""
if self.rotation_strategy == 'random':
return self._get_random_proxy()
elif self.rotation_strategy == 'round_robin':
return self._get_round_robin_proxy()
elif self.rotation_strategy == 'smart':
return self._get_smart_proxy()
else:
return self._get_random_proxy()
def _get_random_proxy(self):
"""
随机获取代理
"""
available_proxies = [
proxy for proxy, stats in self.ip_stats.items()
if not stats['ban_status'] and self._is_proxy_valid(proxy)
]
if available_proxies:
return random.choice(available_proxies)
else:
return None
def _get_round_robin_proxy(self):
"""
轮询获取代理
"""
available_proxies = [
proxy for proxy, stats in self.ip_stats.items()
if not stats['ban_status'] and self._is_proxy_valid(proxy)
]
if available_proxies:
proxy = available_proxies[self.current_ip_index % len(available_proxies)]
self.current_ip_index += 1
return proxy
else:
return None
def _get_smart_proxy(self):
"""
智能选择最佳代理
"""
# 计算每个代理的综合评分
scored_proxies = []
for proxy, stats in self.ip_stats.items():
if stats['ban_status']:
# 检查是否解除封禁
if time.time() - stats['ban_time'] > 1800: # 30分钟解封
stats['ban_status'] = False
else:
continue
if not self._is_proxy_valid(proxy):
continue
# 计算综合评分
success_rate = stats['success_count'] / max(1, stats['success_count'] + stats['failure_count'])
recency_factor = 1.0 if time.time() - stats['last_used'] > 300 else 0.8 # 5分钟冷却
score = stats['score'] * success_rate * recency_factor
scored_proxies.append((proxy, score))
if scored_proxies:
# 返回评分最高的代理
best_proxy = max(scored_proxies, key=lambda x: x[1])[0]
return best_proxy
else:
return None
def _is_proxy_valid(self, proxy):
"""
检查代理是否有效
"""
# 检查代理格式
if not proxy.startswith(('http://', 'https://', 'socks5://', 'socks4://')):
return False
# 可以添加更多验证逻辑
return True
def update_proxy_stats(self, proxy, success=True):
"""
更新代理统计信息
"""
stats = self.ip_stats[proxy]
stats['last_used'] = time.time()
if success:
stats['success_count'] += 1
stats['score'] = min(100, stats['score'] + 5)
else:
stats['failure_count'] += 1
stats['score'] = max(0, stats['score'] - 10)
def mark_proxy_banned(self, proxy):
"""
标记代理被封禁
"""
self.ip_stats[proxy]['ban_status'] = True
self.ip_stats[proxy]['ban_time'] = time.time()
self.ip_stats[proxy]['score'] = 0
class ProxyRotationMiddleware:
"""
代理轮换中间件
"""
def __init__(self):
self.ip_manager = IntelligentIPManager()
self.request_count = 0
self.max_requests_per_ip = 10
def process_request(self, request, spider):
"""
处理请求,应用代理轮换
"""
# 获取最佳代理
proxy = self.ip_manager.get_best_proxy()
if proxy:
request.meta['proxy'] = proxy
spider.logger.debug(f"Using proxy: {proxy}")
# 记录请求开始时间
request.meta['request_start_time'] = time.time()
return None
def process_response(self, request, response, spider):
"""
处理响应,更新代理状态
"""
proxy = request.meta.get('proxy')
if proxy:
# 检查响应状态,判断是否被封禁
if self._is_banned_response(response):
spider.logger.warning(f"Proxy {proxy} might be banned (status: {response.status})")
self.ip_manager.mark_proxy_banned(proxy)
self.ip_manager.update_proxy_stats(proxy, success=False)
else:
self.ip_manager.update_proxy_stats(proxy, success=True)
return response
def process_exception(self, request, exception, spider):
"""
处理异常,更新代理状态
"""
proxy = request.meta.get('proxy')
if proxy:
spider.logger.error(f"Exception with proxy {proxy}: {str(exception)}")
self.ip_manager.update_proxy_stats(proxy, success=False)
# 尝试使用新代理重试
new_request = request.copy()
new_proxy = self.ip_manager.get_best_proxy()
if new_proxy and new_proxy != proxy:
new_request.meta['proxy'] = new_proxy
new_request.dont_filter = True
return new_request
def _is_banned_response(self, response):
"""
检查是否为封禁响应
"""
banned_statuses = [403, 429, 503]
banned_texts = [
'blocked', 'forbidden', 'access denied', 'rate limit',
'too many requests', 'captcha', 'verification'
]
if response.status in banned_statuses:
return True
response_text = response.text.lower()
for text in banned_texts:
if text in response_text:
return True
return False
class GeographicIPManager:
"""
地理位置IP管理器
"""
def __init__(self):
self.location_proxies = defaultdict(list)
self.country_codes = {
'US': '美国',
'CN': '中国',
'JP': '日本',
'KR': '韩国',
'DE': '德国',
'FR': '法国',
'GB': '英国'
}
def add_location_proxy(self, country_code, proxy):
"""
添加指定地理位置的代理
"""
self.location_proxies[country_code].append(proxy)
def get_location_proxy(self, country_code):
"""
获取指定地理位置的代理
"""
if country_code in self.location_proxies:
return random.choice(self.location_proxies[country_code])
else:
# 如果没有指定国家的代理,返回任意代理
all_proxies = []
for proxies in self.location_proxies.values():
all_proxies.extend(proxies)
return random.choice(all_proxies) if all_proxies else None
def validate_proxy_location(self, proxy, expected_country):
"""
验证代理IP的地理位置
"""
try:
response = requests.get(
'http://ip-api.com/json',
proxies={'http': proxy, 'https': proxy},
timeout=10
)
data = response.json()
return data.get('countryCode') == expected_country
except:
return False#频率限制绕过
#智能频率控制
import time
import random
from datetime import datetime, timedelta
from collections import defaultdict, deque
class IntelligentRateLimiter:
"""
智能频率限制器
"""
def __init__(self):
self.request_history = defaultdict(deque)
self.domain_limits = {}
self.global_limit = 1 # 默认每秒1个请求
self.max_history_size = 100 # 最大历史记录数
def set_domain_limit(self, domain, requests_per_second):
"""
设置特定域名的请求频率限制
"""
self.domain_limits[domain] = requests_per_second
def can_make_request(self, url):
"""
检查是否可以发起请求
"""
from urllib.parse import urlparse
domain = urlparse(url).netloc
# 获取该域名的限制
limit = self.domain_limits.get(domain, self.global_limit)
# 获取最近的请求记录
recent_requests = [
req_time for req_time in self.request_history[domain]
if time.time() - req_time < 1 # 最近1秒内的请求
]
# 检查是否超过限制
if len(recent_requests) >= limit:
return False
return True
def record_request(self, url):
"""
记录请求
"""
from urllib.parse import urlparse
domain = urlparse(url).netloc
current_time = time.time()
self.request_history[domain].append(current_time)
# 限制历史记录大小
while len(self.request_history[domain]) > self.max_history_size:
self.request_history[domain].popleft()
def get_wait_time(self, url):
"""
获取需要等待的时间
"""
from urllib.parse import urlparse
domain = urlparse(url).netloc
limit = self.domain_limits.get(domain, self.global_limit)
if limit <= 0:
return 0
recent_requests = [
req_time for req_time in self.request_history[domain]
if time.time() - req_time < 1
]
if len(recent_requests) >= limit:
# 计算最早的一个请求到现在的间隔
oldest_recent = min(recent_requests) if recent_requests else time.time()
wait_time = 1 - (time.time() - oldest_recent)
return max(0, wait_time)
return 0
class HumanBehaviorSimulator:
"""
人类行为模拟器
"""
def __init__(self):
self.activity_patterns = {
'morning': {'activity': 0.3, 'speed': 1.2}, # 早上低活跃度,较快
'work_hours': {'activity': 0.8, 'speed': 0.9}, # 工作时间高活跃度,正常
'evening': {'activity': 0.6, 'speed': 1.1}, # 晚上中等活跃度,稍快
'night': {'activity': 0.2, 'speed': 1.5} # 夜晚低活跃度,较慢
}
def get_current_activity_level(self):
"""
获取当前活跃度级别
"""
current_hour = datetime.now().hour
if 6 <= current_hour < 9: # 早上
return self.activity_patterns['morning']
elif 9 <= current_hour < 18: # 工作时间
return self.activity_patterns['work_hours']
elif 18 <= current_hour < 22: # 晚上
return self.activity_patterns['evening']
else: # 夜晚
return self.activity_patterns['night']
def simulate_human_delay(self, base_delay=1):
"""
模拟人类延迟
"""
activity = self.get_current_activity_level()
# 基于活跃度调整延迟
adjusted_delay = base_delay * activity['speed']
# 添加随机波动(模拟人类行为的不确定性)
variation = random.uniform(-0.3, 0.3)
final_delay = max(0.1, adjusted_delay + variation)
return final_delay
def simulate_browsing_pattern(self):
"""
模拟浏览模式
"""
# 模拟页面停留时间
stay_duration = random.uniform(10, 60) # 10-60秒停留
# 模拟滚动行为
scroll_events = random.randint(1, 5)
# 模拟点击行为
click_events = random.randint(0, 3)
return {
'stay_duration': stay_duration,
'scroll_events': scroll_events,
'click_events': click_events
}
class AdvancedFrequencyMiddleware:
"""
高级频率控制中间件
"""
def __init__(self):
self.rate_limiter = IntelligentRateLimiter()
self.behavior_simulator = HumanBehaviorSimulator()
self.request_queue = []
self.active_requests = 0
def process_request(self, request, spider):
"""
处理请求,应用频率控制
"""
# 检查是否可以立即发送请求
if self.rate_limiter.can_make_request(request.url):
# 记录请求
self.rate_limiter.record_request(request.url)
# 应用人类行为延迟
delay = self.behavior_simulator.simulate_human_delay(1)
if delay > 0:
time.sleep(delay)
return None
else:
# 需要等待,计算等待时间
wait_time = self.rate_limiter.get_wait_time(request.url)
if wait_time > 0:
spider.logger.info(f"Waiting {wait_time:.2f}s before next request to {request.url}")
time.sleep(wait_time)
# 重新检查并记录请求
self.rate_limiter.record_request(request.url)
return None
def process_response(self, request, response, spider):
"""
处理响应
"""
# 模拟人类浏览行为
browsing_pattern = self.behavior_simulator.simulate_browsing_pattern()
# 根据模式应用延迟
time.sleep(random.uniform(0.5, 2.0))
return response
class BurstProtectionMiddleware:
"""
突发请求保护中间件
"""
def __init__(self):
self.sliding_window = defaultdict(deque)
self.window_size = 60 # 60秒窗口
self.max_requests_per_window = 20
def process_request(self, request, spider):
"""
处理请求,防止突发
"""
from urllib.parse import urlparse
domain = urlparse(request.url).netloc
current_time = time.time()
# 清理过期的请求记录
while (self.sliding_window[domain] and
current_time - self.sliding_window[domain][0] > self.window_size):
self.sliding_window[domain].popleft()
# 检查是否超过限制
if len(self.sliding_window[domain]) >= self.max_requests_per_window:
# 需要延迟
oldest_time = self.sliding_window[domain][0]
sleep_time = self.window_size - (current_time - oldest_time) + 1
spider.logger.info(f"Burst protection: sleeping {sleep_time:.2f}s for {domain}")
time.sleep(sleep_time)
# 记录当前请求
self.sliding_window[domain].append(current_time)
return None#Cookie与Session管理
#高级Cookie管理
import json
import pickle
import os
from http.cookies import SimpleCookie
from scrapy.http.cookies import CookieJar
import time
class AdvancedCookieManager:
"""
高级Cookie管理器
"""
def __init__(self, storage_path='./cookies'):
self.storage_path = storage_path
self.cookie_jars = {}
self.session_storage = {}
self.cookie_policies = {}
# 确保存储目录存在
os.makedirs(storage_path, exist_ok=True)
def create_cookie_jar(self, domain, policy='default'):
"""
为特定域名创建Cookie jar
"""
cookie_jar = CookieJar()
self.cookie_jars[domain] = {
'jar': cookie_jar,
'policy': policy,
'last_used': time.time(),
'requests_count': 0
}
return cookie_jar
def get_cookie_jar(self, domain):
"""
获取域名对应的Cookie jar
"""
if domain not in self.cookie_jars:
return self.create_cookie_jar(domain)
return self.cookie_jars[domain]['jar']
def save_cookies(self, domain, filename=None):
"""
保存Cookie到文件
"""
if filename is None:
filename = f"{domain.replace('.', '_').replace(':', '_')}_cookies.pkl"
filepath = os.path.join(self.storage_path, filename)
if domain in self.cookie_jars:
with open(filepath, 'wb') as f:
pickle.dump(self.cookie_jars[domain]['jar'], f)
def load_cookies(self, domain, filename=None):
"""
从文件加载Cookie
"""
if filename is None:
filename = f"{domain.replace('.', '_').replace(':', '_')}_cookies.pkl"
filepath = os.path.join(self.storage_path, filename)
if os.path.exists(filepath):
with open(filepath, 'rb') as f:
cookie_jar = pickle.load(f)
self.cookie_jars[domain] = {
'jar': cookie_jar,
'policy': 'default',
'last_used': time.time(),
'requests_count': 0
}
return cookie_jar
def rotate_cookies(self, domain):
"""
轮换Cookie(使用不同的Cookie集合)
"""
# 可以维护多个Cookie集合,随机选择使用
cookie_sets = self._get_cookie_sets(domain)
if cookie_sets:
import random
chosen_set = random.choice(cookie_sets)
return chosen_set
return None
def _get_cookie_sets(self, domain):
"""
获取域名的多个Cookie集合
"""
sets = []
for filename in os.listdir(self.storage_path):
if filename.startswith(domain.replace('.', '_').replace(':', '_')) and filename.endswith('_cookies.pkl'):
filepath = os.path.join(self.storage_path, filename)
with open(filepath, 'rb') as f:
sets.append(pickle.load(f))
return sets
def validate_cookies(self, domain):
"""
验证Cookie的有效性
"""
cookie_jar = self.get_cookie_jar(domain)
# 检查Cookie是否过期
current_time = time.time()
valid_cookies = []
for cookie in cookie_jar.jar:
if cookie.expires is None or cookie.expires > current_time:
valid_cookies.append(cookie)
# 更新Cookie jar
new_jar = CookieJar()
for cookie in valid_cookies:
new_jar.set_cookie(cookie)
self.cookie_jars[domain]['jar'] = new_jar
return len(valid_cookies) > 0
class SessionManagementMiddleware:
"""
会话管理中间件
"""
def __init__(self):
self.cookie_manager = AdvancedCookieManager()
self.session_rotation_interval = 3600 # 1小时轮换
self.active_sessions = {}
def process_request(self, request, spider):
"""
处理请求,管理会话
"""
from urllib.parse import urlparse
domain = urlparse(request.url).netloc
# 获取或创建Cookie jar
cookie_jar = self.cookie_manager.get_cookie_jar(domain)
# 添加Cookie到请求
cookies = self._extract_cookies_for_request(cookie_jar, request.url)
for cookie in cookies:
request.cookies.set(cookie.name, cookie.value)
# 检查是否需要轮换会话
if self._should_rotate_session(domain):
self._rotate_session(domain, request, spider)
return None
def process_response(self, request, response, spider):
"""
处理响应,更新Cookie
"""
from urllib.parse import urlparse
domain = urlparse(request.url).netloc
# 从响应中提取新的Cookie
cookie_jar = self.cookie_manager.get_cookie_jar(domain)
cookie_jar.extract_cookies(response, request)
# 保存更新后的Cookie
self.cookie_manager.save_cookies(domain)
return response
def _extract_cookies_for_request(self, cookie_jar, url):
"""
为请求提取合适的Cookie
"""
return cookie_jar._cookies_for_request(url)
def _should_rotate_session(self, domain):
"""
检查是否应该轮换会话
"""
if domain not in self.active_sessions:
self.active_sessions[domain] = time.time()
return False
last_rotation = self.active_sessions[domain]
return time.time() - last_rotation > self.session_rotation_interval
def _rotate_session(self, domain, request, spider):
"""
轮换会话
"""
# 尝试加载新的Cookie集合
new_cookies = self.cookie_manager.rotate_cookies(domain)
if new_cookies:
spider.logger.info(f"Rotating session for {domain}")
self.active_sessions[domain] = time.time()
# 验证现有Cookie
is_valid = self.cookie_manager.validate_cookies(domain)
if not is_valid:
spider.logger.warning(f"Cookies for {domain} are invalid")
class CookieStealingMiddleware:
"""
Cookie窃取中间件(用于登录后获取认证Cookie)
"""
def __init__(self):
self.login_domains = set()
self.stolen_cookies = {}
def add_login_domain(self, domain):
"""
添加需要登录的域名
"""
self.login_domains.add(domain)
def process_response(self, request, response, spider):
"""
处理响应,窃取Cookie
"""
from urllib.parse import urlparse
domain = urlparse(request.url).netloc
if domain in self.login_domains:
# 提取所有Cookie
cookies = {}
for header in response.headers.getlist('Set-Cookie'):
cookie = SimpleCookie()
cookie.load(header.decode('utf-8'))
for key, morsel in cookie.items():
cookies[key] = morsel.value
if cookies:
self.stolen_cookies[domain] = {
'cookies': cookies,
'timestamp': time.time(),
'url': request.url
}
spider.logger.info(f"Stole cookies from {domain}: {list(cookies.keys())}")
return response
def get_stolen_cookies(self, domain):
"""
获取窃取的Cookie
"""
return self.stolen_cookies.get(domain, {}).get('cookies', {})#JavaScript反爬绕过
#JavaScript环境伪造
from scrapy.http import HtmlResponse
import time
class JavaScriptBypassMiddleware:
"""
JavaScript反爬绕过中间件
"""
def __init__(self):
self.js_execution_envs = {
'headless-chrome': self._setup_headless_chrome,
'real-browser': self._setup_real_browser,
'js-interpreter': self._setup_js_interpreter
}
self.execution_timeout = 30
def _setup_headless_chrome(self):
"""
设置无头Chrome环境
"""
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
driver = webdriver.Chrome(options=chrome_options)
# 隐藏webdriver
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
return driver
def process_request(self, request, spider):
"""
处理需要JavaScript执行的请求
"""
if request.meta.get('js_render'):
# 使用浏览器渲染页面
driver = self._setup_headless_chrome()
try:
driver.set_page_load_timeout(self.execution_timeout)
driver.get(request.url)
# 执行自定义JavaScript
custom_js = request.meta.get('custom_js', '')
if custom_js:
driver.execute_script(custom_js)
# 等待页面完全加载
time.sleep(request.meta.get('wait_time', 2))
# 获取渲染后的内容
body = driver.page_source.encode('utf-8')
# 创建新的响应对象
response = HtmlResponse(
url=request.url,
body=body,
encoding='utf-8',
request=request
)
return response
except Exception as e:
spider.logger.error(f"JavaScript rendering failed: {str(e)}")
return request
finally:
driver.quit()
return None
class DynamicContentMiddleware:
"""
动态内容处理中间件
"""
def __init__(self):
self.loading_detectors = [
self._detect_react_app,
self._detect_vue_app,
self._detect_ajax_loader,
self._detect_infinite_scroll
]
def process_request(self, request, spider):
"""
处理请求,检测动态内容
"""
# 检查是否需要特殊处理
if self._has_dynamic_content(request.url):
request.meta['js_render'] = True
request.meta['wait_time'] = 3 # 等待3秒让内容加载
return None
def _has_dynamic_content(self, url):
"""
检测URL是否包含动态内容
"""
dynamic_indicators = [
'/spa/', '/app/', '/dashboard/',
'single-page', 'angular', 'react', 'vue'
]
url_lower = url.lower()
return any(indicator in url_lower for indicator in dynamic_indicators)
def _detect_react_app(self, response_text):
"""
检测React应用
"""
return 'react' in response_text.lower() or 'id="root"' in response_text
def _detect_vue_app(self, response_text):
"""
检测Vue应用
"""
return 'vue' in response_text.lower() or 'id="app"' in response_text
def _detect_ajax_loader(self, response_text):
"""
检测AJAX加载器
"""
loader_patterns = [
'loading', 'spinner', 'progress', 'ajax'
]
return any(pattern in response_text.lower() for pattern in loader_patterns)
def _detect_infinite_scroll(self, response_text):
"""
检测无限滚动
"""
scroll_patterns = [
'infinite-scroll', 'load-more', 'pagination'
]
return any(pattern in response_text.lower() for pattern in scroll_patterns)
class ObfuscationBypassMiddleware:
"""
代码混淆绕过中间件
"""
def __init__(self):
self.deobfuscators = {
'string_concat': self._deobfuscate_string_concat,
'array_decode': self._deobfuscate_array_decode,
'function_rename': self._deobfuscate_function_rename
}
def process_request(self, request, spider):
"""
处理请求,绕过代码混淆
"""
# 这里可以添加JavaScript反混淆逻辑
# 由于Scrapy本身不执行JavaScript,这主要适用于预处理场景
# 标记需要特殊处理的请求
if self._has_obfuscated_js(request.url):
request.meta['needs_deobfuscation'] = True
return None
def _has_obfuscated_js(self, url):
"""
检测是否包含混淆的JavaScript
"""
# 检测常见的混淆特征
obfuscation_indicators = [
'eval(', 'fromCharCode', 'charCodeAt', 'split("")',
'String.fromCharCode', 'Function(', 'setTimeout('
]
# 这里需要下载并分析页面内容来检测
return False
def _deobfuscate_string_concat(self, js_code):
"""
解混淆字符串拼接
"""
# 实现字符串拼接解混淆逻辑
pass
def _deobfuscate_array_decode(self, js_code):
"""
解混淆数组解码
"""
# 实现数组解码解混淆逻辑
pass
def _deobfuscate_function_rename(self, js_code):
"""
解混淆函数重命名
"""
# 实现函数重命名解混淆逻辑
pass#分布式爬虫部署
#分布式部署策略
import multiprocessing
import threading
import time
import json
from queue import Queue
import redis
from scrapy.crawler import CrawlerProcess
class DistributedSpiderManager:
"""
分布式爬虫管理器
"""
def __init__(self, redis_host='localhost', redis_port=6379):
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
self.task_queue = 'spider_tasks'
self.result_queue = 'spider_results'
self.worker_registry = 'spider_workers'
self.monitoring = True
def add_task(self, spider_name, urls, **kwargs):
"""
添加爬虫任务
"""
task = {
'spider_name': spider_name,
'urls': urls if isinstance(urls, list) else [urls],
'kwargs': kwargs,
'timestamp': time.time(),
'status': 'pending'
}
self.redis_client.rpush(self.task_queue, json.dumps(task))
return task
def get_task(self):
"""
获取爬虫任务
"""
task_json = self.redis_client.blpop(self.task_queue, timeout=5)
if task_json:
return json.loads(task_json[1])
return None
def complete_task(self, task_id, results):
"""
完成任务并存储结果
"""
result = {
'task_id': task_id,
'results': results,
'completed_at': time.time(),
'status': 'completed'
}
self.redis_client.rpush(self.result_queue, json.dumps(result))
def register_worker(self, worker_id, capabilities):
"""
注册爬虫工作节点
"""
worker_info = {
'worker_id': worker_id,
'capabilities': capabilities,
'registered_at': time.time(),
'last_heartbeat': time.time()
}
self.redis_client.hset(self.worker_registry, worker_id, json.dumps(worker_info))
def heartbeat(self, worker_id):
"""
工作节点心跳
"""
worker_info = self.redis_client.hget(self.worker_registry, worker_id)
if worker_info:
info = json.loads(worker_info)
info['last_heartbeat'] = time.time()
self.redis_client.hset(self.worker_registry, worker_id, json.dumps(info))
def get_active_workers(self):
"""
获取活跃的工作节点
"""
workers = self.redis_client.hgetall(self.worker_registry)
active_workers = {}
for worker_id, worker_info in workers.items():
info = json.loads(worker_info)
# 如果心跳在5分钟内,则认为是活跃的
if time.time() - info['last_heartbeat'] < 300:
active_workers[worker_id] = info
return active_workers
def start_monitoring(self):
"""
开始监控
"""
def monitor():
while self.monitoring:
# 监控任务队列长度
task_count = self.redis_client.llen(self.task_queue)
result_count = self.redis_client.llen(self.result_queue)
print(f"Distributed Spider Monitor - Tasks: {task_count}, Results: {result_count}")
time.sleep(10)
monitor_thread = threading.Thread(target=monitor, daemon=True)
monitor_thread.start()
def stop_monitoring(self):
"""
停止监控
"""
self.monitoring = False
class LoadBalancerMiddleware:
"""
负载均衡中间件
"""
def __init__(self):
self.node_manager = DistributedSpiderManager()
self.request_distribution = {}
self.current_node_index = 0
def process_request(self, request, spider):
"""
处理请求,实现负载均衡
"""
# 获取活跃节点
active_nodes = self.node_manager.get_active_workers()
if active_nodes:
# 轮询分配请求到不同节点
nodes = list(active_nodes.keys())
target_node = nodes[self.current_node_index % len(nodes)]
self.current_node_index += 1
# 标记请求需要转发到特定节点
request.meta['target_node'] = target_node
spider.logger.info(f"Routing request to node: {target_node}")
return None
class GeoDistributedMiddleware:
"""
地理分布中间件
"""
def __init__(self):
self.region_nodes = {
'us': [],
'eu': [],
'asia': [],
'global': []
}
def process_request(self, request, spider):
"""
根据目标网站地理位置分配节点
"""
from urllib.parse import urlparse
domain = urlparse(request.url).netloc
# 简单的地理分配逻辑
region = self._get_target_region(domain)
if region in self.region_nodes and self.region_nodes[region]:
target_node = self.region_nodes[region][0] # 简化:选择第一个节点
request.meta['target_region'] = region
request.meta['target_node'] = target_node
return None
def _get_target_region(self, domain):
"""
获取目标区域
"""
# 根据域名后缀判断区域
if any(tld in domain for tld in ['.com', '.org', '.net']):
return 'us'
elif any(tld in domain for tld in ['.co.uk', '.de', '.fr']):
return 'eu'
elif any(tld in domain for tld in ['.com.cn', '.jp', '.kr']):
return 'asia'
else:
return 'global'
class ScalableSpiderFramework:
"""
可扩展爬虫框架
"""
def __init__(self):
self.cluster_manager = DistributedSpiderManager()
self.auto_scaling_enabled = True
self.scaling_thresholds = {
'queue_length': 100,
'response_time': 5.0,
'error_rate': 0.1
}
def scale_up(self):
"""
水平扩展:增加爬虫节点
"""
print("Scaling up: Adding new spider nodes...")
# 这里可以启动新的爬虫进程或容器
pass
def scale_down(self):
"""
水平收缩:减少爬虫节点
"""
print("Scaling down: Removing spider nodes...")
# 这里可以停止一些爬虫进程或容器
pass
def monitor_cluster(self):
"""
监控集群状态
"""
while True:
# 检查队列长度
queue_length = self.cluster_manager.redis_client.llen(self.cluster_manager.task_queue)
# 检查错误率
recent_results = self.cluster_manager.redis_client.lrange(
self.cluster_manager.result_queue, 0, 10
)
error_count = sum(1 for result in recent_results if '"error":' in result)
error_rate = error_count / max(1, len(recent_results))
# 根据指标决定是否扩缩容
if queue_length > self.scaling_thresholds['queue_length']:
self.scale_up()
elif (queue_length < self.scaling_thresholds['queue_length'] * 0.3 and
error_rate < self.scaling_thresholds['error_rate']):
self.scale_down()
time.sleep(30) # 每30秒检查一次#反爬检测与监控
#检测系统实现
import logging
import time
from collections import defaultdict, deque
import statistics
class AntiDetectionMonitor:
"""
反爬检测监控系统
"""
def __init__(self):
self.event_log = deque(maxlen=1000)
self.detection_patterns = {
'status_403': self._detect_403_ban,
'status_429': self._detect_rate_limit,
'captcha_page': self._detect_captcha,
'suspicious_content': self._detect_suspicious_content
}
self.alert_callbacks = []
self.monitoring_enabled = True
def log_event(self, event_type, details, severity='info'):
"""
记录事件
"""
event = {
'timestamp': time.time(),
'type': event_type,
'details': details,
'severity': severity
}
self.event_log.append(event)
# 触发告警(如果是严重事件)
if severity in ['warning', 'critical']:
self._trigger_alert(event)
def _detect_403_ban(self, response):
"""
检测403封禁
"""
if response.status == 403:
return {
'type': 'ip_ban',
'confidence': 0.9,
'description': 'Received 403 Forbidden response'
}
return None
def _detect_rate_limit(self, response):
"""
检测频率限制
"""
if response.status == 429:
return {
'type': 'rate_limit',
'confidence': 0.95,
'description': 'Received 429 Too Many Requests response'
}
# 检查响应头中的限速信息
retry_after = response.headers.get('Retry-After')
if retry_after:
return {
'type': 'rate_limit',
'confidence': 0.8,
'description': f'Rate limit with Retry-After: {retry_after}'
}
return None
def _detect_captcha(self, response):
"""
检测验证码页面
"""
content = response.text.lower()
captcha_indicators = [
'captcha', 'verification', 'validate', 'auth', 'security',
'are you a human', 'robot check', 'prove you are human'
]
for indicator in captcha_indicators:
if indicator in content:
return {
'type': 'captcha_detected',
'confidence': 0.85,
'description': f'Captcha indicator found: {indicator}'
}
return None
def _detect_suspicious_content(self, response):
"""
检测可疑内容
"""
content = response.text.lower()
suspicious_patterns = [
'access denied', 'blocked', 'forbidden', 'not allowed',
'suspicious activity', 'bot detected', 'automated access'
]
for pattern in suspicious_patterns:
if pattern in content:
return {
'type': 'suspicious_content',
'confidence': 0.8,
'description': f'Suspicious pattern found: {pattern}'
}
return None
def analyze_response(self, response, request):
"""
分析响应,检测反爬特征
"""
detections = []
for pattern_name, detector in self.detection_patterns.items():
detection = detector(response)
if detection:
detections.append(detection)
self.log_event(
'anti_detection_trigger',
{
'detection': detection,
'url': request.url,
'status': response.status
},
'warning'
)
return detections
def _trigger_alert(self, event):
"""
触发告警
"""
for callback in self.alert_callbacks:
try:
callback(event)
except Exception as e:
print(f"Alert callback error: {e}")
def add_alert_callback(self, callback):
"""
添加告警回调函数
"""
self.alert_callbacks.append(callback)
def get_detection_summary(self):
"""
获取检测摘要
"""
summary = {
'total_events': len(self.event_log),
'recent_events': list(self.event_log)[-10:], # 最近10个事件
'detection_types': defaultdict(int),
'severities': defaultdict(int)
}
for event in self.event_log:
if event['type'] == 'anti_detection_trigger':
detection = event['details']['detection']
summary['detection_types'][detection['type']] += 1
summary['severities'][event['severity']] += 1
return summary
class PerformanceMonitor:
"""
性能监控系统
"""
def __init__(self):
self.metrics = {
'request_times': deque(maxlen=1000),
'success_rates': deque(maxlen=100),
'error_rates': deque(maxlen=100),
'throughput': deque(maxlen=100)
}
self.start_time = time.time()
def record_request_time(self, request_time):
"""
记录请求时间
"""
self.metrics['request_times'].append(request_time)
def record_success(self):
"""
记录成功请求
"""
# 计算最近的成功率
if len(self.metrics['success_rates']) == 0:
self.metrics['success_rates'].append(1.0)
else:
# 简化的成功率计算
current_success = self.metrics['success_rates'][-1] if self.metrics['success_rates'] else 1.0
self.metrics['success_rates'].append(current_success)
def record_error(self):
"""
记录错误请求
"""
if len(self.metrics['error_rates']) == 0:
self.metrics['error_rates'].append(1.0)
else:
current_error = self.metrics['error_rates'][-1] if self.metrics['error_rates'] else 0.0
self.metrics['error_rates'].append(current_error)
def get_performance_metrics(self):
"""
获取性能指标
"""
metrics = {}
if self.metrics['request_times']:
metrics['avg_response_time'] = statistics.mean(self.metrics['request_times'])
metrics['max_response_time'] = max(self.metrics['request_times'])
metrics['min_response_time'] = min(self.metrics['request_times'])
metrics['response_time_std'] = statistics.stdev(self.metrics['request_times']) if len(self.metrics['request_times']) > 1 else 0
if self.metrics['success_rates']:
metrics['recent_success_rate'] = statistics.mean(self.metrics['success_rates'])
if self.metrics['error_rates']:
metrics['recent_error_rate'] = statistics.mean(self.metrics['error_rates'])
metrics['uptime'] = time.time() - self.start_time
return metrics
class DetectionCountermeasuresMiddleware:
"""
检测反制中间件
"""
def __init__(self):
self.monitor = AntiDetectionMonitor()
self.performance_monitor = PerformanceMonitor()
self.countermeasures = {
'ip_rotation': self._apply_ip_rotation,
'delay_increase': self._apply_delay_increase,
'user_agent_rotation': self._apply_user_agent_rotation,
'request_modification': self._apply_request_modification
}
def process_request(self, request, spider):
"""
处理请求,应用反制措施
"""
# 检查是否触发了反爬检测
if request.meta.get('anti_detection_triggered'):
countermeasure = request.meta.get('countermeasure', 'delay_increase')
action = self.countermeasures.get(countermeasure)
if action:
action(request, spider)
return None
def _apply_ip_rotation(self, request, spider):
"""
应用IP轮换
"""
# 请求使用新的IP地址
spider.logger.info("Applying IP rotation countermeasure")
# 这里可以实现具体的IP轮换逻辑
def _apply_delay_increase(self, request, spider):
"""
应用延迟增加
"""
base_delay = request.meta.get('current_delay', 1)
new_delay = min(base_delay * 1.5, 10) # 最大延迟10秒
request.meta['current_delay'] = new_delay
spider.logger.info(f"Increasing delay to {new_delay} seconds")
time.sleep(new_delay)
def _apply_user_agent_rotation(self, request, spider):
"""
应用User-Agent轮换
"""
from fake_useragent import UserAgent
ua = UserAgent()
new_ua = ua.random
request.headers['User-Agent'] = new_ua
spider.logger.info(f"Rotating User-Agent to: {new_ua}")
def _apply_request_modification(self, request, spider):
"""
应用请求修改
"""
# 修改请求的各个方面以绕过检测
request.headers['Accept-Language'] = random.choice([
'zh-CN,zh;q=0.9,en;q=0.8',
'en-US,en;q=0.9,zh-CN;q=0.8',
'en-GB,en;q=0.9,en-US;q=0.8,de;q=0.7'
])
spider.logger.info("Modifying request headers")
class BehavioralAnalysisMiddleware:
"""
行为分析中间件
"""
def __init__(self):
self.pattern_analyzer = PatternAnalyzer()
self.anomaly_detector = AnomalyDetector()
def process_request(self, request, spider):
"""
分析请求行为模式
"""
behavior_score = self.pattern_analyzer.analyze_request_pattern(request)
if behavior_score > 0.8: # 高度疑似自动化行为
request.meta['behavior_score'] = behavior_score
request.meta['anti_detection_triggered'] = True
request.meta['countermeasure'] = 'delay_increase'
spider.logger.warning(f"Behavioral anomaly detected (score: {behavior_score})")
return None
class PatternAnalyzer:
"""
模式分析器
"""
def __init__(self):
self.normal_patterns = {
'request_intervals': [], # 正常的请求间隔
'user_agents': set(), # 正常的User-Agent
'accept_headers': set() # 正常的Accept头
}
def analyze_request_pattern(self, request):
"""
分析请求模式,检测异常
"""
# 检查请求频率模式
frequency_score = self._check_frequency_pattern(request)
# 检查请求头模式
header_score = self._check_header_pattern(request)
# 检查路径访问模式
path_score = self._check_path_pattern(request)
# 综合评分
total_score = (frequency_score + header_score + path_score) / 3
return total_score
def _check_frequency_pattern(self, request):
"""
检查请求频率模式
"""
# 这里可以分析请求的时间间隔模式
# 如果时间间隔过于规律,可能是自动化请求
return 0.5 # 简化返回
def _check_header_pattern(self, request):
"""
检查请求头模式
"""
# 检查User-Agent、Accept等头部是否符合正常用户模式
return 0.5 # 简化返回
def _check_path_pattern(self, request):
"""
检查路径访问模式
"""
# 检查URL访问路径是否符合正常用户浏览模式
return 0.5 # 简化返回
class AnomalyDetector:
"""
异常检测器
"""
def __init__(self):
self.known_good_patterns = []
self.threshold = 0.7
def detect_anomaly(self, request_features):
"""
检测请求特征中的异常
"""
# 使用机器学习或统计方法检测异常
anomaly_score = self._calculate_anomaly_score(request_features)
return anomaly_score > self.threshold
def _calculate_anomaly_score(self, features):
"""
计算异常分数
"""
# 简化实现
return random.random()
## 法合规与道德规范 \{#法律合规与道德规范}
在进行反爬对抗时,必须遵守相关法律法规和道德准则:
### 法律法规遵循
```python
"""
重要的法律考虑因素:
1. 知识产权保护:
- 尊重网站版权和知识产权
- 避免商业用途的数据滥用
- 遵守相关数据保护法律
2. 网络安全法规:
- 遵守《网络安全法》
- 遵守《数据安全法》
- 遵守《个人信息保护法》
3. 合同义务:
- 遵守网站服务条款
- 尊重robots.txt协议
- 避免违反网站使用协议
"""#道德原则
"""
爬虫道德准则:
1. 适度原则:
- 控制请求频率,避免对目标服务器造成过大压力
- 尊重网站的资源和带宽限制
- 避免恶意消耗网站资源
2. 透明原则:
- 明确标注爬虫身份(User-Agent)
- 尽可能公开爬虫目的和数据使用方式
- 遵守网站的robots.txt规则
3. 合规原则:
- 仅获取公开可访问的数据
- 不获取受保护的私人信息
- 遵守数据使用授权
"""#最佳实践建议
"""
反爬对抗最佳实践:
1. 技术层面:
- 优先使用官方API而非爬虫
- 实现智能请求节流
- 使用合法的User-Agent
- 尊重网站的Rate Limit
- 实现错误重试机制
2. 法律层面:
- 事先审查目标网站的使用条款
- 获取必要的法律许可
- 建立合规审查机制
- 定期更新合规策略
3. 商业层面:
- 评估数据获取的商业价值
- 考虑与网站方合作的可能性
- 建立可持续的数据获取策略
"""#最佳实践总结
反爬对抗是一个复杂的技术领域,需要平衡技术实现、法律合规和商业需求。
#综合策略
"""
综合反爬对抗策略:
1. 预防为主:
- 在设计阶段考虑反爬因素
- 建立全面的检测机制
- 准备多种应对方案
2. 分层防御:
- 请求层:IP轮换、请求头伪装
- 应用层:行为模拟、会话管理
- 网络层:代理池、分布式部署
3. 持续优化:
- 监控反爬效果
- 根据检测结果调整策略
- 不断更新技术手段
"""💡 核心要点: 反爬对抗是一个攻防博弈的过程,需要持续关注新技术、新策略的发展,并始终保持法律合规意识。
#SEO优化建议
为了提高这篇反爬对抗实战教程在搜索引擎中的排名,以下是几个关键的SEO优化建议:
#标题优化
- 主标题: 包含核心关键词"反爬对抗", "验证码破解", "反检测", "IP轮换", "请求头伪装"
- 二级标题: 每个章节标题都包含相关的长尾关键词
- H1-H6层次结构: 保持正确的标题层级,便于搜索引擎理解内容结构
#内容优化
- 关键词密度: 在内容中自然地融入关键词如"Scrapy", "反爬虫", "验证码", "IP轮换", "请求头伪造", "浏览器指纹", "爬虫防护", "反检测"等
- 元描述: 在文章开头的元数据中包含吸引人的描述
- 内部链接: 链接到其他相关教程,如Downloader Middleware等
- 外部权威链接: 引用官方文档和权威资源
#技术SEO
- 页面加载速度: 优化代码块和图片加载
- 移动端适配: 确保在移动设备上良好显示
- 结构化数据: 使用适当的HTML标签和语义化元素
#用户体验优化
- 内容可读性: 使用清晰的段落结构和代码示例
- 互动元素: 提供实际可运行的代码示例
- 更新频率: 定期更新内容以保持时效性
🔗 相关教程推荐
- Downloader Middleware - 中间件基础
- Selenium与Playwright集成 - 浏览器自动化
- 代理IP池集成 - IP管理
- 自动限速AutoThrottle - 频率控制
- 数据去重与增量更新 - 数据处理
🏷️ 标签云: Scrapy 反爬虫 验证码破解 IP轮换 请求头伪装 浏览器指纹 反检测 爬虫安全 反爬对抗 数据采集

