Scrapy反爬对抗实战完全指南 - 验证码破解与全方位反检测技术详解

📂 所属阶段:第三阶段 — 攻防演练(中间件与反爬篇)
🔗 相关章节:Downloader Middleware · Selenium与Playwright集成 · 代理IP池集成

目录

反爬机制概述

现代网站采用了多层次的反爬虫机制,了解这些机制是制定有效对策的基础。

常见反爬技术分类

"""
反爬技术分类:

1. 基于请求特征的检测:
   - User-Agent检测
   - IP频率限制
   - 请求头完整性检查
   - Cookie验证

2. 基于行为的检测:
   - 访问频率分析
   - 页面停留时间
   - 鼠标轨迹分析
   - 点击模式检测

3. 基于技术的检测:
   - JavaScript执行检测
   - 浏览器指纹识别
   - 设备指纹分析
   - 网络栈特征检测

4. 基于内容的检测:
   - 验证码挑战
   - 滑块验证
   - 图像识别验证
   - 行为验证
"""

反爬检测原理

"""
反爬检测工作原理:

1. 请求分析层:
   - 检查请求头是否完整
   - 验证User-Agent合法性
   - 分析请求模式

2. 行为分析层:
   - 监控访问频率
   - 分析用户行为模式
   - 检测自动化特征

3. 技术检测层:
   - JavaScript环境检测
   - 浏览器特性验证
   - 设备指纹匹配

4. 内容保护层:
   - 动态内容生成
   - 验证码挑战
   - 人机验证
"""

验证码识别与破解

图像验证码处理

import cv2
import numpy as np
from PIL import Image
import pytesseract
from captcha_solver import CaptchaSolver

class ImageCaptchaHandler:
    """
    图像验证码处理器
    """
    
    def __init__(self):
        self.solvers = {
            'tesseract': self._solve_with_tesseract,
            'opencv': self._solve_with_opencv,
            'ml_model': self._solve_with_ml_model
        }
    
    def preprocess_image(self, image_path):
        """
        图像预处理
        """
        # 读取图像
        img = cv2.imread(image_path)
        
        # 转换为灰度图
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        
        # 去噪
        denoised = cv2.medianBlur(gray, 5)
        
        # 二值化
        _, binary = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
        
        # 形态学操作
        kernel = np.ones((2,2), np.uint8)
        processed = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
        
        return processed
    
    def _solve_with_tesseract(self, image_path):
        """
        使用Tesseract OCR解码
        """
        processed_img = self.preprocess_image(image_path)
        text = pytesseract.image_to_string(processed_img, config='--psm 8')
        return text.strip()
    
    def _solve_with_opencv(self, image_path):
        """
        使用OpenCV模板匹配
        """
        img = cv2.imread(image_path, 0)
        # 这里可以实现字符分割和模板匹配逻辑
        # 简化示例
        text = pytesseract.image_to_string(img)
        return text.strip()
    
    def _solve_with_ml_model(self, image_path):
        """
        使用机器学习模型
        """
        # 加载预训练模型
        # model = load_model('captcha_model.h5')
        # prediction = model.predict(image_array)
        # return decode_prediction(prediction)
        pass
    
    def solve_captcha(self, image_path, method='tesseract'):
        """
        解码验证码
        """
        solver = self.solvers.get(method)
        if solver:
            return solver(image_path)
        else:
            raise ValueError(f"Unknown solver method: {method}")

class AdvancedCaptchaHandler(ImageCaptchaHandler):
    """
    高级验证码处理器
    """
    
    def __init__(self):
        super().__init__()
        self.ml_models = {}
    
    def handle_distorted_captcha(self, image_path):
        """
        处理扭曲验证码
        """
        img = cv2.imread(image_path)
        
        # 几何校正
        corrected = self._correct_geometry(img)
        
        # 字符分割
        characters = self._segment_characters(corrected)
        
        # 逐个识别
        result = ""
        for char_img in characters:
            recognized = self._recognize_character(char_img)
            result += recognized
        
        return result
    
    def _correct_geometry(self, img):
        """
        几何校正
        """
        # 实现透视变换等几何校正算法
        return img
    
    def _segment_characters(self, img):
        """
        字符分割
        """
        # 实现字符分割算法
        return [img]  # 简化返回
    
    def _recognize_character(self, char_img):
        """
        字符识别
        """
        # 使用训练好的字符识别模型
        text = pytesseract.image_to_string(char_img, config='--psm 10')
        return text.strip()[0] if text.strip() else ""

# 验证码中间件
class CaptchaMiddleware:
    """
    验证码处理中间件
    """
    
    def __init__(self):
        self.captcha_handler = AdvancedCaptchaHandler()
        self.captcha_urls = set()
    
    def process_response(self, request, response, spider):
        """
        处理响应,检测验证码
        """
        if self._detect_captcha(response):
            # 保存验证码图片
            captcha_img_path = self._save_captcha_image(response)
            
            # 解码验证码
            captcha_text = self.captcha_handler.solve_captcha(captcha_img_path)
            
            # 重新发起请求,带上验证码参数
            new_request = self._create_captcha_request(request, captcha_text)
            return new_request
        
        return response
    
    def _detect_captcha(self, response):
        """
        检测验证码
        """
        # 检查响应中是否包含验证码特征
        captcha_indicators = [
            'captcha', 'verification', 'validate', 'auth', 'security'
        ]
        
        for indicator in captcha_indicators:
            if indicator.lower() in response.text.lower():
                return True
        
        # 检查是否有验证码图片
        import re
        captcha_patterns = [
            r'captcha.*?\.(jpg|jpeg|png|gif)',
            r'verification.*?\.(jpg|jpeg|png|gif)',
            r'/auth/image'
        ]
        
        for pattern in captcha_patterns:
            if re.search(pattern, response.text, re.IGNORECASE):
                return True
        
        return False
    
    def _save_captcha_image(self, response):
        """
        保存验证码图片
        """
        import tempfile
        import os
        
        # 查找验证码图片URL
        import re
        img_pattern = r'<img[^>]*src=["\']([^"\']*captcha[^"\'>]*)["\'][^>]*>'
        matches = re.findall(img_pattern, response.text, re.IGNORECASE)
        
        if matches:
            captcha_url = matches[0]
            if not captcha_url.startswith('http'):
                from urllib.parse import urljoin
                captcha_url = urljoin(response.url, captcha_url)
            
            # 下载验证码图片
            import requests
            img_response = requests.get(captcha_url)
            
            # 保存到临时文件
            temp_dir = tempfile.mkdtemp()
            img_path = os.path.join(temp_dir, 'captcha.png')
            with open(img_path, 'wb') as f:
                f.write(img_response.content)
            
            return img_path
        
        return None
    
    def _create_captcha_request(self, original_request, captcha_text):
        """
        创建带验证码的请求
        """
        # 构造包含验证码的新请求
        # 这里需要根据具体网站的表单结构来实现
        pass

滑块验证码处理

import time
import random
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By

class SliderCaptchaHandler:
    """
    滑块验证码处理器
    """
    
    def __init__(self, driver):
        self.driver = driver
    
    def solve_slider_captcha(self, slider_element, track_element=None):
        """
        解决滑块验证码
        """
        # 获取滑块位置
        slider_location = slider_element.location
        slider_size = slider_element.size
        
        # 模拟人类滑动轨迹
        track = self._generate_human_like_track(slider_element)
        
        # 执行滑动
        self._perform_slide(slider_element, track)
        
        # 验证结果
        time.sleep(2)
        return self._verify_result()
    
    def _generate_human_like_track(self, slider_element):
        """
        生成类似人类的滑动轨迹
        """
        # 计算滑动距离
        slider_width = slider_element.size['width']
        
        # 生成不规则的滑动路径
        track = []
        current_pos = 0
        target_pos = slider_width
        
        # 分段滑动,模拟人类操作
        segments = random.randint(3, 6)
        segment_distance = target_pos / segments
        
        for i in range(segments):
            # 每段的速度不同
            speed_factor = random.uniform(0.8, 1.2)
            segment_steps = int(segment_distance * speed_factor)
            
            for j in range(segment_steps):
                current_pos += 1 / speed_factor
                # 添加随机偏移,模拟手部抖动
                offset = random.uniform(-2, 2)
                track.append((current_pos + offset, random.uniform(-1, 1)))
        
        return track
    
    def _perform_slide(self, slider_element, track):
        """
        执行滑动操作
        """
        actions = ActionChains(self.driver)
        
        # 点击并按住滑块
        actions.click_and_hold(slider_element).perform()
        
        # 按轨迹滑动
        for x_offset, y_offset in track:
            actions.move_by_offset(x_offset, y_offset).perform()
            # 随机暂停,模拟人类操作
            time.sleep(random.uniform(0.01, 0.05))
        
        # 释放滑块
        actions.release().perform()
    
    def _verify_result(self):
        """
        验证滑动结果
        """
        # 检查是否有验证成功的标志
        try:
            success_element = self.driver.find_element(By.CLASS_NAME, "verify-success")
            return True
        except:
            # 检查是否有验证失败的标志
            try:
                fail_element = self.driver.find_element(By.CLASS_NAME, "verify-fail")
                return False
            except:
                # 不确定结果,返回假定成功
                return True

class GeometricCaptchaHandler:
    """
    几何图形验证码处理器
    """
    
    def __init__(self):
        self.template_matching = cv2.TM_CCOEFF_NORMED
    
    def solve_geometric_captcha(self, bg_image_path, target_image_path):
        """
        解决几何图形验证码(如拼图验证码)
        """
        # 读取背景图和目标图
        bg_img = cv2.imread(bg_image_path, 0)
        target_img = cv2.imread(target_image_path, 0)
        
        # 模板匹配找到目标位置
        result = cv2.matchTemplate(bg_img, target_img, self.template_matching)
        _, _, _, max_loc = cv2.minMaxLoc(result)
        
        # 计算偏移量
        offset_x = max_loc[0]
        
        return offset_x
    
    def solve_rotation_captcha(self, image_path):
        """
        解决旋转验证码
        """
        img = cv2.imread(image_path)
        
        # 计算图像的角度
        angle = self._detect_rotation_angle(img)
        
        # 旋转图像到正确角度
        rotated = self._rotate_image(img, -angle)
        
        return angle
    
    def _detect_rotation_angle(self, img):
        """
        检测图像旋转角度
        """
        # 使用霍夫变换检测直线,从而计算角度
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150, apertureSize=3)
        
        lines = cv2.HoughLines(edges, 1, np.pi/180, threshold=100)
        
        if lines is not None:
            angles = []
            for rho, theta in lines[:, 0]:
                angle = theta * 180 / np.pi
                angles.append(angle)
            
            # 计算平均角度
            avg_angle = np.mean(angles)
            return avg_angle
        
        return 0
    
    def _rotate_image(self, img, angle):
        """
        旋转图像
        """
        center = tuple(np.array(img.shape[1::-1]) / 2)
        rot_mat = cv2.getRotationMatrix2D(center, angle, 1.0)
        rotated = cv2.warpAffine(img, rot_mat, img.shape[1::-1], flags=cv2.INTER_LINEAR)
        return rotated

请求头伪装技术

动态请求头生成

import random
import platform
import uuid
from fake_useragent import UserAgent

class DynamicHeadersGenerator:
    """
    动态请求头生成器
    """
    
    def __init__(self):
        self.ua = UserAgent()
        self.accept_headers = [
            'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
            'application/json, text/plain, */*',
            '*/*'
        ]
        
        self.accept_languages = [
            'zh-CN,zh;q=0.9,en;q=0.8',
            'en-US,en;q=0.9,zh-CN;q=0.8',
            'zh;q=0.9,en;q=0.8,en-US;q=0.7',
            'en-GB,en;q=0.9,en-US;q=0.8,de;q=0.7',
            'fr-CH, fr;q=0.9, en;q=0.8, de;q=0.7'
        ]
        
        self.accept_encodings = [
            'gzip, deflate, br',
            'gzip, deflate',
            'deflate',
            'identity'
        ]
    
    def generate_headers(self, url=None):
        """
        生成随机请求头
        """
        headers = {
            'User-Agent': self._get_random_user_agent(),
            'Accept': random.choice(self.accept_headers),
            'Accept-Language': random.choice(self.accept_languages),
            'Accept-Encoding': random.choice(self.accept_encodings),
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
            'Sec-Fetch-Dest': 'document',
            'Sec-Fetch-Mode': 'navigate',
            'Sec-Fetch-Site': 'none',
            'Cache-Control': 'max-age=0'
        }
        
        # 根据URL添加特定头部
        if url:
            domain = self._extract_domain(url)
            specific_headers = self._get_domain_specific_headers(domain)
            headers.update(specific_headers)
        
        return headers
    
    def _get_random_user_agent(self):
        """
        获取随机User-Agent
        """
        try:
            return self.ua.random
        except:
            # 备用User-Agent列表
            backup_uas = [
                'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
                'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
                'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
                'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15'
            ]
            return random.choice(backup_uas)
    
    def _extract_domain(self, url):
        """
        提取域名
        """
        from urllib.parse import urlparse
        return urlparse(url).netloc
    
    def _get_domain_specific_headers(self, domain):
        """
        获取特定域名的头部
        """
        domain_headers = {
            'google.com': {
                'Referer': 'https://www.google.com/',
                'Origin': 'https://www.google.com'
            },
            'baidu.com': {
                'Referer': 'https://www.baidu.com/',
                'X-Requested-With': 'XMLHttpRequest'
            }
        }
        return domain_headers.get(domain, {})

class HeadersRotatorMiddleware:
    """
    请求头轮换中间件
    """
    
    def __init__(self):
        self.header_generator = DynamicHeadersGenerator()
        self.used_headers = set()
    
    def process_request(self, request, spider):
        """
        处理请求,轮换请求头
        """
        # 生成新的请求头
        new_headers = self.header_generator.generate_headers(request.url)
        
        # 更新请求头
        for key, value in new_headers.items():
            request.headers[key] = value
        
        # 记录使用的请求头(用于统计)
        header_signature = self._get_header_signature(new_headers)
        self.used_headers.add(header_signature)
        
        return None
    
    def _get_header_signature(self, headers):
        """
        获取请求头签名
        """
        ua = headers.get('User-Agent', '')
        accept = headers.get('Accept', '')
        language = headers.get('Accept-Language', '')
        return f"{ua[:50]}_{accept[:30]}_{language[:20]}"

class FingerprintSpoofingMiddleware:
    """
    指纹伪装中间件
    """
    
    def __init__(self):
        self.device_fingerprints = [
            {
                'screen_resolution': '1920x1080',
                'color_depth': '24',
                'timezone_offset': '480',
                'language': 'zh-CN',
                'platform': 'Win32',
                'cookies_enabled': True,
                'javascript_enabled': True
            },
            {
                'screen_resolution': '1366x768',
                'color_depth': '24',
                'timezone_offset': '480',
                'language': 'en-US',
                'platform': 'Win32',
                'cookies_enabled': True,
                'javascript_enabled': True
            },
            {
                'screen_resolution': '1440x900',
                'color_depth': '24',
                'timezone_offset': '480',
                'language': 'en-GB',
                'platform': 'MacIntel',
                'cookies_enabled': True,
                'javascript_enabled': True
            }
        ]
    
    def process_request(self, request, spider):
        """
        处理请求,伪装设备指纹
        """
        # 随机选择一个设备指纹
        fingerprint = random.choice(self.device_fingerprints)
        
        # 添加指纹相关信息到请求
        request.meta['device_fingerprint'] = fingerprint
        
        # 可以在这里添加更多指纹相关的请求头
        request.headers['X-Screen-Resolution'] = fingerprint['screen_resolution']
        request.headers['X-Color-Depth'] = fingerprint['color_depth']
        request.headers['X-Timezone-Offset'] = fingerprint['timezone_offset']
        
        return None

浏览器指纹反检测

浏览器指纹伪装

class BrowserFingerprintSpoofing:
    """
    浏览器指纹伪装类
    """
    
    def __init__(self):
        self.browser_configs = {
            'chrome': self._configure_chrome_spoofing,
            'firefox': self._configure_firefox_spoofing,
            'safari': self._configure_safari_spoofing
        }
    
    def _configure_chrome_spoofing(self, options):
        """
        Chrome浏览器指纹伪装配置
        """
        # 屏蔽webdriver检测
        options.add_experimental_option("excludeSwitches", ["enable-automation"])
        options.add_experimental_option('useAutomationExtension', False)
        
        # 添加隐藏webdriver的脚本
        options.add_argument("--disable-blink-features=AutomationControlled")
        
        # 设置窗口大小
        options.add_argument("--window-size=1920,1080")
        
        # 禁用某些可能暴露自动化的特性
        options.add_argument("--disable-dev-shm-usage")
        options.add_argument("--no-sandbox")
        options.add_argument("--disable-gpu")
        
        return options
    
    def _configure_firefox_spoofing(self, options):
        """
        Firefox浏览器指纹伪装配置
        """
        # 设置Firefox特定的伪装选项
        options.set_preference("dom.webdriver.enabled", False)
        options.set_preference("useAutomationExtension", False)
        
        # 隐藏Firefox特定属性
        options.set_preference("general.useragent.override", 
                              "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0")
        
        return options
    
    def _configure_safari_spoofing(self, options):
        """
        Safari浏览器指纹伪装配置
        """
        # Safari配置较为复杂,通常使用真实Safari浏览器
        pass
    
    def apply_stealth_script(self, driver):
        """
        应用反检测JavaScript脚本
        """
        stealth_script = """
        // 隐藏webdriver属性
        Object.defineProperty(navigator, 'webdriver', {
            get: () => undefined,
        });

        // 模拟插件
        Object.defineProperty(navigator, 'plugins', {
            get: () => {
                return [
                    { filename: 'internal-pdf-viewer', description: 'Portable Document Format' },
                    { filename: 'internal-nacl-plugin', description: 'Native Client' }
                ];
            },
        });

        // 模拟语言
        Object.defineProperty(navigator, 'languages', {
            get: () => ['zh-CN', 'zh', 'en'],
        });

        // 模拟平台
        Object.defineProperty(navigator, 'platform', {
            get: () => 'Win32',
        });

        // 隐藏Chrome属性
        Object.defineProperty(window, 'chrome', {
            value: new Proxy({}, {
                get(target, prop) {
                    if (prop === 'runtime') return {};
                    return target[prop];
                }
            }),
            writable: false,
        });

        // 隐藏eval toString
        const originalToString = Function.prototype.toString;
        Function.prototype.toString = function() {
            if (this === window.cdc_adoQpoasnfa76pfcZLmcfl_Array) {
                return 'function Array() { [native code] }';
            }
            return originalToString.call(this);
        };

        // 修改webgl指纹
        const getParameter = WebGLRenderingContext.prototype.getParameter;
        WebGLRenderingContext.prototype.getParameter = function(parameter) {
            if (parameter === 37445) {
                return 'Intel Inc.';
            } else if (parameter === 37446) {
                return 'Intel Iris OpenGL Engine';
            }
            return getParameter(parameter);
        };
        """
        
        driver.execute_script(stealth_script)

class AdvancedStealthMiddleware:
    """
    高级反检测中间件
    """
    
    def __init__(self):
        self.spoofing_techniques = [
            self._spoof_canvas_fingerprint,
            self._spoof_webgl_fingerprint,
            self._spoof_audio_fingerprint,
            self._spoof_font_fingerprint
        ]
    
    def process_request(self, request, spider):
        """
        处理请求,应用反检测技术
        """
        if request.meta.get('use_selenium') or request.meta.get('use_playwright'):
            # 在浏览器自动化请求中应用反检测
            request.meta['apply_stealth'] = True
        
        return None
    
    def _spoof_canvas_fingerprint(self, driver):
        """
        伪造Canvas指纹
        """
        canvas_script = """
        const originalToDataURL = HTMLCanvasElement.prototype.toDataURL;
        const originalToBlob = HTMLCanvasElement.prototype.toBlob;

        HTMLCanvasElement.prototype.toDataURL = function() {
            // 修改canvas输出以避免指纹识别
            return originalToDataURL.apply(this, arguments);
        };

        HTMLCanvasElement.prototype.toBlob = function() {
            // 修改blob输出以避免指纹识别
            return originalToBlob.apply(this, arguments);
        };
        """
        
        driver.execute_script(canvas_script)
    
    def _spoof_webgl_fingerprint(self, driver):
        """
        伪造WebGL指纹
        """
        webgl_script = """
        const getParameter = WebGLRenderingContext.prototype.getParameter;
        WebGLRenderingContext.prototype.getParameter = function(parameter) {
            switch (parameter) {
                case 37445: // UNMASKED_VENDOR_WEBGL
                    return 'Intel Inc.';
                case 37446: // UNMASKED_RENDERER_WEBGL
                    return 'Intel Iris OpenGL Engine';
                case 7936:  // VERSION
                    return 'WebGL 1.0 (OpenGL ES 2.0 Chromium)';
                case 34076: // SHADING_LANGUAGE_VERSION
                    return 'WebGL GLSL ES 1.0 (OpenGL ES GLSL ES 1.0 Chromium)';
                default:
                    return getParameter.call(this, parameter);
            }
        };
        """
        
        driver.execute_script(webgl_script)
    
    def _spoof_audio_fingerprint(self, driver):
        """
        伪造Audio指纹
        """
        audio_script = """
        const audioContextProto = window.AudioContext || window.webkitAudioContext;
        if (audioContextProto) {
            const originalCreateAnalyser = audioContextProto.prototype.createAnalyser;
            audioContextProto.prototype.createAnalyser = function() {
                const analyser = originalCreateAnalyser.call(this);
                // 修改音频分析器的行为以避免指纹识别
                return analyser;
            };
        }
        """
        
        driver.execute_script(audio_script)
    
    def _spoof_font_fingerprint(self, driver):
        """
        伪造字体指纹
        """
        font_script = """
        // 确保字体检测的一致性
        const testFonts = [
            'Arial', 'Courier New', 'Georgia', 'Times New Roman', 'Verdana',
            'Helvetica', 'Impact', 'Comic Sans MS', 'Trebuchet MS', 'Consolas'
        ];

        // 这里可以添加字体检测的标准化逻辑
        """
        
        driver.execute_script(font_script)

IP封禁规避策略

智能IP轮换

import time
import random
from collections import defaultdict, deque
import requests

class IntelligentIPManager:
    """
    智能IP管理器
    """
    
    def __init__(self, proxy_list=None):
        self.proxy_list = proxy_list or []
        self.ip_stats = defaultdict(lambda: {
            'success_count': 0,
            'failure_count': 0,
            'score': 100,
            'last_used': 0,
            'ban_status': False,
            'ban_time': 0
        })
        self.current_ip_index = 0
        self.rotation_strategy = 'smart'
    
    def add_proxy(self, proxy):
        """
        添加代理IP
        """
        if proxy not in self.proxy_list:
            self.proxy_list.append(proxy)
            self.ip_stats[proxy] = {
                'success_count': 0,
                'failure_count': 0,
                'score': 100,
                'last_used': 0,
                'ban_status': False,
                'ban_time': 0
            }
    
    def get_best_proxy(self):
        """
        获取最佳代理IP
        """
        if self.rotation_strategy == 'random':
            return self._get_random_proxy()
        elif self.rotation_strategy == 'round_robin':
            return self._get_round_robin_proxy()
        elif self.rotation_strategy == 'smart':
            return self._get_smart_proxy()
        else:
            return self._get_random_proxy()
    
    def _get_random_proxy(self):
        """
        随机获取代理
        """
        available_proxies = [
            proxy for proxy, stats in self.ip_stats.items()
            if not stats['ban_status'] and self._is_proxy_valid(proxy)
        ]
        
        if available_proxies:
            return random.choice(available_proxies)
        else:
            return None
    
    def _get_round_robin_proxy(self):
        """
        轮询获取代理
        """
        available_proxies = [
            proxy for proxy, stats in self.ip_stats.items()
            if not stats['ban_status'] and self._is_proxy_valid(proxy)
        ]
        
        if available_proxies:
            proxy = available_proxies[self.current_ip_index % len(available_proxies)]
            self.current_ip_index += 1
            return proxy
        else:
            return None
    
    def _get_smart_proxy(self):
        """
        智能选择最佳代理
        """
        # 计算每个代理的综合评分
        scored_proxies = []
        
        for proxy, stats in self.ip_stats.items():
            if stats['ban_status']:
                # 检查是否解除封禁
                if time.time() - stats['ban_time'] > 1800:  # 30分钟解封
                    stats['ban_status'] = False
                else:
                    continue
            
            if not self._is_proxy_valid(proxy):
                continue
            
            # 计算综合评分
            success_rate = stats['success_count'] / max(1, stats['success_count'] + stats['failure_count'])
            recency_factor = 1.0 if time.time() - stats['last_used'] > 300 else 0.8  # 5分钟冷却
            
            score = stats['score'] * success_rate * recency_factor
            scored_proxies.append((proxy, score))
        
        if scored_proxies:
            # 返回评分最高的代理
            best_proxy = max(scored_proxies, key=lambda x: x[1])[0]
            return best_proxy
        else:
            return None
    
    def _is_proxy_valid(self, proxy):
        """
        检查代理是否有效
        """
        # 检查代理格式
        if not proxy.startswith(('http://', 'https://', 'socks5://', 'socks4://')):
            return False
        
        # 可以添加更多验证逻辑
        return True
    
    def update_proxy_stats(self, proxy, success=True):
        """
        更新代理统计信息
        """
        stats = self.ip_stats[proxy]
        stats['last_used'] = time.time()
        
        if success:
            stats['success_count'] += 1
            stats['score'] = min(100, stats['score'] + 5)
        else:
            stats['failure_count'] += 1
            stats['score'] = max(0, stats['score'] - 10)
    
    def mark_proxy_banned(self, proxy):
        """
        标记代理被封禁
        """
        self.ip_stats[proxy]['ban_status'] = True
        self.ip_stats[proxy]['ban_time'] = time.time()
        self.ip_stats[proxy]['score'] = 0

class ProxyRotationMiddleware:
    """
    代理轮换中间件
    """
    
    def __init__(self):
        self.ip_manager = IntelligentIPManager()
        self.request_count = 0
        self.max_requests_per_ip = 10
    
    def process_request(self, request, spider):
        """
        处理请求,应用代理轮换
        """
        # 获取最佳代理
        proxy = self.ip_manager.get_best_proxy()
        
        if proxy:
            request.meta['proxy'] = proxy
            spider.logger.debug(f"Using proxy: {proxy}")
        
        # 记录请求开始时间
        request.meta['request_start_time'] = time.time()
        
        return None
    
    def process_response(self, request, response, spider):
        """
        处理响应,更新代理状态
        """
        proxy = request.meta.get('proxy')
        
        if proxy:
            # 检查响应状态,判断是否被封禁
            if self._is_banned_response(response):
                spider.logger.warning(f"Proxy {proxy} might be banned (status: {response.status})")
                self.ip_manager.mark_proxy_banned(proxy)
                self.ip_manager.update_proxy_stats(proxy, success=False)
            else:
                self.ip_manager.update_proxy_stats(proxy, success=True)
        
        return response
    
    def process_exception(self, request, exception, spider):
        """
        处理异常,更新代理状态
        """
        proxy = request.meta.get('proxy')
        
        if proxy:
            spider.logger.error(f"Exception with proxy {proxy}: {str(exception)}")
            self.ip_manager.update_proxy_stats(proxy, success=False)
        
        # 尝试使用新代理重试
        new_request = request.copy()
        new_proxy = self.ip_manager.get_best_proxy()
        
        if new_proxy and new_proxy != proxy:
            new_request.meta['proxy'] = new_proxy
            new_request.dont_filter = True
            return new_request
    
    def _is_banned_response(self, response):
        """
        检查是否为封禁响应
        """
        banned_statuses = [403, 429, 503]
        banned_texts = [
            'blocked', 'forbidden', 'access denied', 'rate limit', 
            'too many requests', 'captcha', 'verification'
        ]
        
        if response.status in banned_statuses:
            return True
        
        response_text = response.text.lower()
        for text in banned_texts:
            if text in response_text:
                return True
        
        return False

class GeographicIPManager:
    """
    地理位置IP管理器
    """
    
    def __init__(self):
        self.location_proxies = defaultdict(list)
        self.country_codes = {
            'US': '美国',
            'CN': '中国',
            'JP': '日本',
            'KR': '韩国',
            'DE': '德国',
            'FR': '法国',
            'GB': '英国'
        }
    
    def add_location_proxy(self, country_code, proxy):
        """
        添加指定地理位置的代理
        """
        self.location_proxies[country_code].append(proxy)
    
    def get_location_proxy(self, country_code):
        """
        获取指定地理位置的代理
        """
        if country_code in self.location_proxies:
            return random.choice(self.location_proxies[country_code])
        else:
            # 如果没有指定国家的代理,返回任意代理
            all_proxies = []
            for proxies in self.location_proxies.values():
                all_proxies.extend(proxies)
            return random.choice(all_proxies) if all_proxies else None
    
    def validate_proxy_location(self, proxy, expected_country):
        """
        验证代理IP的地理位置
        """
        try:
            response = requests.get(
                'http://ip-api.com/json',
                proxies={'http': proxy, 'https': proxy},
                timeout=10
            )
            data = response.json()
            return data.get('countryCode') == expected_country
        except:
            return False

频率限制绕过

智能频率控制

import time
import random
from datetime import datetime, timedelta
from collections import defaultdict, deque

class IntelligentRateLimiter:
    """
    智能频率限制器
    """
    
    def __init__(self):
        self.request_history = defaultdict(deque)
        self.domain_limits = {}
        self.global_limit = 1  # 默认每秒1个请求
        self.max_history_size = 100  # 最大历史记录数
    
    def set_domain_limit(self, domain, requests_per_second):
        """
        设置特定域名的请求频率限制
        """
        self.domain_limits[domain] = requests_per_second
    
    def can_make_request(self, url):
        """
        检查是否可以发起请求
        """
        from urllib.parse import urlparse
        domain = urlparse(url).netloc
        
        # 获取该域名的限制
        limit = self.domain_limits.get(domain, self.global_limit)
        
        # 获取最近的请求记录
        recent_requests = [
            req_time for req_time in self.request_history[domain]
            if time.time() - req_time < 1  # 最近1秒内的请求
        ]
        
        # 检查是否超过限制
        if len(recent_requests) >= limit:
            return False
        
        return True
    
    def record_request(self, url):
        """
        记录请求
        """
        from urllib.parse import urlparse
        domain = urlparse(url).netloc
        
        current_time = time.time()
        self.request_history[domain].append(current_time)
        
        # 限制历史记录大小
        while len(self.request_history[domain]) > self.max_history_size:
            self.request_history[domain].popleft()
    
    def get_wait_time(self, url):
        """
        获取需要等待的时间
        """
        from urllib.parse import urlparse
        domain = urlparse(url).netloc
        
        limit = self.domain_limits.get(domain, self.global_limit)
        
        if limit <= 0:
            return 0
        
        recent_requests = [
            req_time for req_time in self.request_history[domain]
            if time.time() - req_time < 1
        ]
        
        if len(recent_requests) >= limit:
            # 计算最早的一个请求到现在的间隔
            oldest_recent = min(recent_requests) if recent_requests else time.time()
            wait_time = 1 - (time.time() - oldest_recent)
            return max(0, wait_time)
        
        return 0

class HumanBehaviorSimulator:
    """
    人类行为模拟器
    """
    
    def __init__(self):
        self.activity_patterns = {
            'morning': {'activity': 0.3, 'speed': 1.2},    # 早上低活跃度,较快
            'work_hours': {'activity': 0.8, 'speed': 0.9}, # 工作时间高活跃度,正常
            'evening': {'activity': 0.6, 'speed': 1.1},    # 晚上中等活跃度,稍快
            'night': {'activity': 0.2, 'speed': 1.5}       # 夜晚低活跃度,较慢
        }
    
    def get_current_activity_level(self):
        """
        获取当前活跃度级别
        """
        current_hour = datetime.now().hour
        
        if 6 <= current_hour < 9:      # 早上
            return self.activity_patterns['morning']
        elif 9 <= current_hour < 18:   # 工作时间
            return self.activity_patterns['work_hours']
        elif 18 <= current_hour < 22:  # 晚上
            return self.activity_patterns['evening']
        else:                          # 夜晚
            return self.activity_patterns['night']
    
    def simulate_human_delay(self, base_delay=1):
        """
        模拟人类延迟
        """
        activity = self.get_current_activity_level()
        
        # 基于活跃度调整延迟
        adjusted_delay = base_delay * activity['speed']
        
        # 添加随机波动(模拟人类行为的不确定性)
        variation = random.uniform(-0.3, 0.3)
        final_delay = max(0.1, adjusted_delay + variation)
        
        return final_delay
    
    def simulate_browsing_pattern(self):
        """
        模拟浏览模式
        """
        # 模拟页面停留时间
        stay_duration = random.uniform(10, 60)  # 10-60秒停留
        
        # 模拟滚动行为
        scroll_events = random.randint(1, 5)
        
        # 模拟点击行为
        click_events = random.randint(0, 3)
        
        return {
            'stay_duration': stay_duration,
            'scroll_events': scroll_events,
            'click_events': click_events
        }

class AdvancedFrequencyMiddleware:
    """
    高级频率控制中间件
    """
    
    def __init__(self):
        self.rate_limiter = IntelligentRateLimiter()
        self.behavior_simulator = HumanBehaviorSimulator()
        self.request_queue = []
        self.active_requests = 0
    
    def process_request(self, request, spider):
        """
        处理请求,应用频率控制
        """
        # 检查是否可以立即发送请求
        if self.rate_limiter.can_make_request(request.url):
            # 记录请求
            self.rate_limiter.record_request(request.url)
            
            # 应用人类行为延迟
            delay = self.behavior_simulator.simulate_human_delay(1)
            if delay > 0:
                time.sleep(delay)
            
            return None
        else:
            # 需要等待,计算等待时间
            wait_time = self.rate_limiter.get_wait_time(request.url)
            if wait_time > 0:
                spider.logger.info(f"Waiting {wait_time:.2f}s before next request to {request.url}")
                time.sleep(wait_time)
            
            # 重新检查并记录请求
            self.rate_limiter.record_request(request.url)
            return None
    
    def process_response(self, request, response, spider):
        """
        处理响应
        """
        # 模拟人类浏览行为
        browsing_pattern = self.behavior_simulator.simulate_browsing_pattern()
        
        # 根据模式应用延迟
        time.sleep(random.uniform(0.5, 2.0))
        
        return response

class BurstProtectionMiddleware:
    """
    突发请求保护中间件
    """
    
    def __init__(self):
        self.sliding_window = defaultdict(deque)
        self.window_size = 60  # 60秒窗口
        self.max_requests_per_window = 20
    
    def process_request(self, request, spider):
        """
        处理请求,防止突发
        """
        from urllib.parse import urlparse
        domain = urlparse(request.url).netloc
        
        current_time = time.time()
        
        # 清理过期的请求记录
        while (self.sliding_window[domain] and 
               current_time - self.sliding_window[domain][0] > self.window_size):
            self.sliding_window[domain].popleft()
        
        # 检查是否超过限制
        if len(self.sliding_window[domain]) >= self.max_requests_per_window:
            # 需要延迟
            oldest_time = self.sliding_window[domain][0]
            sleep_time = self.window_size - (current_time - oldest_time) + 1
            spider.logger.info(f"Burst protection: sleeping {sleep_time:.2f}s for {domain}")
            time.sleep(sleep_time)
        
        # 记录当前请求
        self.sliding_window[domain].append(current_time)
        
        return None

Cookie与Session管理

高级Cookie管理

import json
import pickle
import os
from http.cookies import SimpleCookie
from scrapy.http.cookies import CookieJar
import time

class AdvancedCookieManager:
    """
    高级Cookie管理器
    """
    
    def __init__(self, storage_path='./cookies'):
        self.storage_path = storage_path
        self.cookie_jars = {}
        self.session_storage = {}
        self.cookie_policies = {}
        
        # 确保存储目录存在
        os.makedirs(storage_path, exist_ok=True)
    
    def create_cookie_jar(self, domain, policy='default'):
        """
        为特定域名创建Cookie jar
        """
        cookie_jar = CookieJar()
        self.cookie_jars[domain] = {
            'jar': cookie_jar,
            'policy': policy,
            'last_used': time.time(),
            'requests_count': 0
        }
        return cookie_jar
    
    def get_cookie_jar(self, domain):
        """
        获取域名对应的Cookie jar
        """
        if domain not in self.cookie_jars:
            return self.create_cookie_jar(domain)
        return self.cookie_jars[domain]['jar']
    
    def save_cookies(self, domain, filename=None):
        """
        保存Cookie到文件
        """
        if filename is None:
            filename = f"{domain.replace('.', '_').replace(':', '_')}_cookies.pkl"
        
        filepath = os.path.join(self.storage_path, filename)
        
        if domain in self.cookie_jars:
            with open(filepath, 'wb') as f:
                pickle.dump(self.cookie_jars[domain]['jar'], f)
    
    def load_cookies(self, domain, filename=None):
        """
        从文件加载Cookie
        """
        if filename is None:
            filename = f"{domain.replace('.', '_').replace(':', '_')}_cookies.pkl"
        
        filepath = os.path.join(self.storage_path, filename)
        
        if os.path.exists(filepath):
            with open(filepath, 'rb') as f:
                cookie_jar = pickle.load(f)
                self.cookie_jars[domain] = {
                    'jar': cookie_jar,
                    'policy': 'default',
                    'last_used': time.time(),
                    'requests_count': 0
                }
                return cookie_jar
    
    def rotate_cookies(self, domain):
        """
        轮换Cookie(使用不同的Cookie集合)
        """
        # 可以维护多个Cookie集合,随机选择使用
        cookie_sets = self._get_cookie_sets(domain)
        if cookie_sets:
            import random
            chosen_set = random.choice(cookie_sets)
            return chosen_set
        return None
    
    def _get_cookie_sets(self, domain):
        """
        获取域名的多个Cookie集合
        """
        sets = []
        for filename in os.listdir(self.storage_path):
            if filename.startswith(domain.replace('.', '_').replace(':', '_')) and filename.endswith('_cookies.pkl'):
                filepath = os.path.join(self.storage_path, filename)
                with open(filepath, 'rb') as f:
                    sets.append(pickle.load(f))
        return sets
    
    def validate_cookies(self, domain):
        """
        验证Cookie的有效性
        """
        cookie_jar = self.get_cookie_jar(domain)
        
        # 检查Cookie是否过期
        current_time = time.time()
        valid_cookies = []
        
        for cookie in cookie_jar.jar:
            if cookie.expires is None or cookie.expires > current_time:
                valid_cookies.append(cookie)
        
        # 更新Cookie jar
        new_jar = CookieJar()
        for cookie in valid_cookies:
            new_jar.set_cookie(cookie)
        
        self.cookie_jars[domain]['jar'] = new_jar
        
        return len(valid_cookies) > 0

class SessionManagementMiddleware:
    """
    会话管理中间件
    """
    
    def __init__(self):
        self.cookie_manager = AdvancedCookieManager()
        self.session_rotation_interval = 3600  # 1小时轮换
        self.active_sessions = {}
    
    def process_request(self, request, spider):
        """
        处理请求,管理会话
        """
        from urllib.parse import urlparse
        domain = urlparse(request.url).netloc
        
        # 获取或创建Cookie jar
        cookie_jar = self.cookie_manager.get_cookie_jar(domain)
        
        # 添加Cookie到请求
        cookies = self._extract_cookies_for_request(cookie_jar, request.url)
        for cookie in cookies:
            request.cookies.set(cookie.name, cookie.value)
        
        # 检查是否需要轮换会话
        if self._should_rotate_session(domain):
            self._rotate_session(domain, request, spider)
        
        return None
    
    def process_response(self, request, response, spider):
        """
        处理响应,更新Cookie
        """
        from urllib.parse import urlparse
        domain = urlparse(request.url).netloc
        
        # 从响应中提取新的Cookie
        cookie_jar = self.cookie_manager.get_cookie_jar(domain)
        cookie_jar.extract_cookies(response, request)
        
        # 保存更新后的Cookie
        self.cookie_manager.save_cookies(domain)
        
        return response
    
    def _extract_cookies_for_request(self, cookie_jar, url):
        """
        为请求提取合适的Cookie
        """
        return cookie_jar._cookies_for_request(url)
    
    def _should_rotate_session(self, domain):
        """
        检查是否应该轮换会话
        """
        if domain not in self.active_sessions:
            self.active_sessions[domain] = time.time()
            return False
        
        last_rotation = self.active_sessions[domain]
        return time.time() - last_rotation > self.session_rotation_interval
    
    def _rotate_session(self, domain, request, spider):
        """
        轮换会话
        """
        # 尝试加载新的Cookie集合
        new_cookies = self.cookie_manager.rotate_cookies(domain)
        if new_cookies:
            spider.logger.info(f"Rotating session for {domain}")
            self.active_sessions[domain] = time.time()
        
        # 验证现有Cookie
        is_valid = self.cookie_manager.validate_cookies(domain)
        if not is_valid:
            spider.logger.warning(f"Cookies for {domain} are invalid")

class CookieStealingMiddleware:
    """
    Cookie窃取中间件(用于登录后获取认证Cookie)
    """
    
    def __init__(self):
        self.login_domains = set()
        self.stolen_cookies = {}
    
    def add_login_domain(self, domain):
        """
        添加需要登录的域名
        """
        self.login_domains.add(domain)
    
    def process_response(self, request, response, spider):
        """
        处理响应,窃取Cookie
        """
        from urllib.parse import urlparse
        domain = urlparse(request.url).netloc
        
        if domain in self.login_domains:
            # 提取所有Cookie
            cookies = {}
            for header in response.headers.getlist('Set-Cookie'):
                cookie = SimpleCookie()
                cookie.load(header.decode('utf-8'))
                for key, morsel in cookie.items():
                    cookies[key] = morsel.value
            
            if cookies:
                self.stolen_cookies[domain] = {
                    'cookies': cookies,
                    'timestamp': time.time(),
                    'url': request.url
                }
                spider.logger.info(f"Stole cookies from {domain}: {list(cookies.keys())}")
        
        return response
    
    def get_stolen_cookies(self, domain):
        """
        获取窃取的Cookie
        """
        return self.stolen_cookies.get(domain, {}).get('cookies', {})

JavaScript反爬绕过

JavaScript环境伪造

from scrapy.http import HtmlResponse
import time

class JavaScriptBypassMiddleware:
    """
    JavaScript反爬绕过中间件
    """
    
    def __init__(self):
        self.js_execution_envs = {
            'headless-chrome': self._setup_headless_chrome,
            'real-browser': self._setup_real_browser,
            'js-interpreter': self._setup_js_interpreter
        }
        self.execution_timeout = 30
    
    def _setup_headless_chrome(self):
        """
        设置无头Chrome环境
        """
        from selenium import webdriver
        from selenium.webdriver.chrome.options import Options
        
        chrome_options = Options()
        chrome_options.add_argument('--headless')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chrome_options.add_argument('--disable-blink-features=AutomationControlled')
        chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
        chrome_options.add_experimental_option('useAutomationExtension', False)
        
        driver = webdriver.Chrome(options=chrome_options)
        
        # 隐藏webdriver
        driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
        
        return driver
    
    def process_request(self, request, spider):
        """
        处理需要JavaScript执行的请求
        """
        if request.meta.get('js_render'):
            # 使用浏览器渲染页面
            driver = self._setup_headless_chrome()
            
            try:
                driver.set_page_load_timeout(self.execution_timeout)
                driver.get(request.url)
                
                # 执行自定义JavaScript
                custom_js = request.meta.get('custom_js', '')
                if custom_js:
                    driver.execute_script(custom_js)
                
                # 等待页面完全加载
                time.sleep(request.meta.get('wait_time', 2))
                
                # 获取渲染后的内容
                body = driver.page_source.encode('utf-8')
                
                # 创建新的响应对象
                response = HtmlResponse(
                    url=request.url,
                    body=body,
                    encoding='utf-8',
                    request=request
                )
                
                return response
                
            except Exception as e:
                spider.logger.error(f"JavaScript rendering failed: {str(e)}")
                return request
            finally:
                driver.quit()
        
        return None

class DynamicContentMiddleware:
    """
    动态内容处理中间件
    """
    
    def __init__(self):
        self.loading_detectors = [
            self._detect_react_app,
            self._detect_vue_app,
            self._detect_ajax_loader,
            self._detect_infinite_scroll
        ]
    
    def process_request(self, request, spider):
        """
        处理请求,检测动态内容
        """
        # 检查是否需要特殊处理
        if self._has_dynamic_content(request.url):
            request.meta['js_render'] = True
            request.meta['wait_time'] = 3  # 等待3秒让内容加载
        
        return None
    
    def _has_dynamic_content(self, url):
        """
        检测URL是否包含动态内容
        """
        dynamic_indicators = [
            '/spa/', '/app/', '/dashboard/',
            'single-page', 'angular', 'react', 'vue'
        ]
        
        url_lower = url.lower()
        return any(indicator in url_lower for indicator in dynamic_indicators)
    
    def _detect_react_app(self, response_text):
        """
        检测React应用
        """
        return 'react' in response_text.lower() or 'id="root"' in response_text
    
    def _detect_vue_app(self, response_text):
        """
        检测Vue应用
        """
        return 'vue' in response_text.lower() or 'id="app"' in response_text
    
    def _detect_ajax_loader(self, response_text):
        """
        检测AJAX加载器
        """
        loader_patterns = [
            'loading', 'spinner', 'progress', 'ajax'
        ]
        return any(pattern in response_text.lower() for pattern in loader_patterns)
    
    def _detect_infinite_scroll(self, response_text):
        """
        检测无限滚动
        """
        scroll_patterns = [
            'infinite-scroll', 'load-more', 'pagination'
        ]
        return any(pattern in response_text.lower() for pattern in scroll_patterns)

class ObfuscationBypassMiddleware:
    """
    代码混淆绕过中间件
    """
    
    def __init__(self):
        self.deobfuscators = {
            'string_concat': self._deobfuscate_string_concat,
            'array_decode': self._deobfuscate_array_decode,
            'function_rename': self._deobfuscate_function_rename
        }
    
    def process_request(self, request, spider):
        """
        处理请求,绕过代码混淆
        """
        # 这里可以添加JavaScript反混淆逻辑
        # 由于Scrapy本身不执行JavaScript,这主要适用于预处理场景
        
        # 标记需要特殊处理的请求
        if self._has_obfuscated_js(request.url):
            request.meta['needs_deobfuscation'] = True
        
        return None
    
    def _has_obfuscated_js(self, url):
        """
        检测是否包含混淆的JavaScript
        """
        # 检测常见的混淆特征
        obfuscation_indicators = [
            'eval(', 'fromCharCode', 'charCodeAt', 'split("")',
            'String.fromCharCode', 'Function(', 'setTimeout('
        ]
        # 这里需要下载并分析页面内容来检测
        return False
    
    def _deobfuscate_string_concat(self, js_code):
        """
        解混淆字符串拼接
        """
        # 实现字符串拼接解混淆逻辑
        pass
    
    def _deobfuscate_array_decode(self, js_code):
        """
        解混淆数组解码
        """
        # 实现数组解码解混淆逻辑
        pass
    
    def _deobfuscate_function_rename(self, js_code):
        """
        解混淆函数重命名
        """
        # 实现函数重命名解混淆逻辑
        pass

分布式爬虫部署

分布式部署策略

import multiprocessing
import threading
import time
import json
from queue import Queue
import redis
from scrapy.crawler import CrawlerProcess

class DistributedSpiderManager:
    """
    分布式爬虫管理器
    """
    
    def __init__(self, redis_host='localhost', redis_port=6379):
        self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
        self.task_queue = 'spider_tasks'
        self.result_queue = 'spider_results'
        self.worker_registry = 'spider_workers'
        self.monitoring = True
    
    def add_task(self, spider_name, urls, **kwargs):
        """
        添加爬虫任务
        """
        task = {
            'spider_name': spider_name,
            'urls': urls if isinstance(urls, list) else [urls],
            'kwargs': kwargs,
            'timestamp': time.time(),
            'status': 'pending'
        }
        
        self.redis_client.rpush(self.task_queue, json.dumps(task))
        return task
    
    def get_task(self):
        """
        获取爬虫任务
        """
        task_json = self.redis_client.blpop(self.task_queue, timeout=5)
        if task_json:
            return json.loads(task_json[1])
        return None
    
    def complete_task(self, task_id, results):
        """
        完成任务并存储结果
        """
        result = {
            'task_id': task_id,
            'results': results,
            'completed_at': time.time(),
            'status': 'completed'
        }
        
        self.redis_client.rpush(self.result_queue, json.dumps(result))
    
    def register_worker(self, worker_id, capabilities):
        """
        注册爬虫工作节点
        """
        worker_info = {
            'worker_id': worker_id,
            'capabilities': capabilities,
            'registered_at': time.time(),
            'last_heartbeat': time.time()
        }
        
        self.redis_client.hset(self.worker_registry, worker_id, json.dumps(worker_info))
    
    def heartbeat(self, worker_id):
        """
        工作节点心跳
        """
        worker_info = self.redis_client.hget(self.worker_registry, worker_id)
        if worker_info:
            info = json.loads(worker_info)
            info['last_heartbeat'] = time.time()
            self.redis_client.hset(self.worker_registry, worker_id, json.dumps(info))
    
    def get_active_workers(self):
        """
        获取活跃的工作节点
        """
        workers = self.redis_client.hgetall(self.worker_registry)
        active_workers = {}
        
        for worker_id, worker_info in workers.items():
            info = json.loads(worker_info)
            # 如果心跳在5分钟内,则认为是活跃的
            if time.time() - info['last_heartbeat'] < 300:
                active_workers[worker_id] = info
        
        return active_workers
    
    def start_monitoring(self):
        """
        开始监控
        """
        def monitor():
            while self.monitoring:
                # 监控任务队列长度
                task_count = self.redis_client.llen(self.task_queue)
                result_count = self.redis_client.llen(self.result_queue)
                
                print(f"Distributed Spider Monitor - Tasks: {task_count}, Results: {result_count}")
                
                time.sleep(10)
        
        monitor_thread = threading.Thread(target=monitor, daemon=True)
        monitor_thread.start()
    
    def stop_monitoring(self):
        """
        停止监控
        """
        self.monitoring = False

class LoadBalancerMiddleware:
    """
    负载均衡中间件
    """
    
    def __init__(self):
        self.node_manager = DistributedSpiderManager()
        self.request_distribution = {}
        self.current_node_index = 0
    
    def process_request(self, request, spider):
        """
        处理请求,实现负载均衡
        """
        # 获取活跃节点
        active_nodes = self.node_manager.get_active_workers()
        
        if active_nodes:
            # 轮询分配请求到不同节点
            nodes = list(active_nodes.keys())
            target_node = nodes[self.current_node_index % len(nodes)]
            self.current_node_index += 1
            
            # 标记请求需要转发到特定节点
            request.meta['target_node'] = target_node
            
            spider.logger.info(f"Routing request to node: {target_node}")
        
        return None

class GeoDistributedMiddleware:
    """
    地理分布中间件
    """
    
    def __init__(self):
        self.region_nodes = {
            'us': [],
            'eu': [],
            'asia': [],
            'global': []
        }
    
    def process_request(self, request, spider):
        """
        根据目标网站地理位置分配节点
        """
        from urllib.parse import urlparse
        domain = urlparse(request.url).netloc
        
        # 简单的地理分配逻辑
        region = self._get_target_region(domain)
        
        if region in self.region_nodes and self.region_nodes[region]:
            target_node = self.region_nodes[region][0]  # 简化:选择第一个节点
            request.meta['target_region'] = region
            request.meta['target_node'] = target_node
        
        return None
    
    def _get_target_region(self, domain):
        """
        获取目标区域
        """
        # 根据域名后缀判断区域
        if any(tld in domain for tld in ['.com', '.org', '.net']):
            return 'us'
        elif any(tld in domain for tld in ['.co.uk', '.de', '.fr']):
            return 'eu'
        elif any(tld in domain for tld in ['.com.cn', '.jp', '.kr']):
            return 'asia'
        else:
            return 'global'

class ScalableSpiderFramework:
    """
    可扩展爬虫框架
    """
    
    def __init__(self):
        self.cluster_manager = DistributedSpiderManager()
        self.auto_scaling_enabled = True
        self.scaling_thresholds = {
            'queue_length': 100,
            'response_time': 5.0,
            'error_rate': 0.1
        }
    
    def scale_up(self):
        """
        水平扩展:增加爬虫节点
        """
        print("Scaling up: Adding new spider nodes...")
        # 这里可以启动新的爬虫进程或容器
        pass
    
    def scale_down(self):
        """
        水平收缩:减少爬虫节点
        """
        print("Scaling down: Removing spider nodes...")
        # 这里可以停止一些爬虫进程或容器
        pass
    
    def monitor_cluster(self):
        """
        监控集群状态
        """
        while True:
            # 检查队列长度
            queue_length = self.cluster_manager.redis_client.llen(self.cluster_manager.task_queue)
            
            # 检查错误率
            recent_results = self.cluster_manager.redis_client.lrange(
                self.cluster_manager.result_queue, 0, 10
            )
            
            error_count = sum(1 for result in recent_results if '"error":' in result)
            error_rate = error_count / max(1, len(recent_results))
            
            # 根据指标决定是否扩缩容
            if queue_length > self.scaling_thresholds['queue_length']:
                self.scale_up()
            elif (queue_length < self.scaling_thresholds['queue_length'] * 0.3 and 
                  error_rate < self.scaling_thresholds['error_rate']):
                self.scale_down()
            
            time.sleep(30)  # 每30秒检查一次

反爬检测与监控

检测系统实现

import logging
import time
from collections import defaultdict, deque
import statistics

class AntiDetectionMonitor:
    """
    反爬检测监控系统
    """
    
    def __init__(self):
        self.event_log = deque(maxlen=1000)
        self.detection_patterns = {
            'status_403': self._detect_403_ban,
            'status_429': self._detect_rate_limit,
            'captcha_page': self._detect_captcha,
            'suspicious_content': self._detect_suspicious_content
        }
        self.alert_callbacks = []
        self.monitoring_enabled = True
    
    def log_event(self, event_type, details, severity='info'):
        """
        记录事件
        """
        event = {
            'timestamp': time.time(),
            'type': event_type,
            'details': details,
            'severity': severity
        }
        self.event_log.append(event)
        
        # 触发告警(如果是严重事件)
        if severity in ['warning', 'critical']:
            self._trigger_alert(event)
    
    def _detect_403_ban(self, response):
        """
        检测403封禁
        """
        if response.status == 403:
            return {
                'type': 'ip_ban',
                'confidence': 0.9,
                'description': 'Received 403 Forbidden response'
            }
        return None
    
    def _detect_rate_limit(self, response):
        """
        检测频率限制
        """
        if response.status == 429:
            return {
                'type': 'rate_limit',
                'confidence': 0.95,
                'description': 'Received 429 Too Many Requests response'
            }
        
        # 检查响应头中的限速信息
        retry_after = response.headers.get('Retry-After')
        if retry_after:
            return {
                'type': 'rate_limit',
                'confidence': 0.8,
                'description': f'Rate limit with Retry-After: {retry_after}'
            }
        
        return None
    
    def _detect_captcha(self, response):
        """
        检测验证码页面
        """
        content = response.text.lower()
        
        captcha_indicators = [
            'captcha', 'verification', 'validate', 'auth', 'security',
            'are you a human', 'robot check', 'prove you are human'
        ]
        
        for indicator in captcha_indicators:
            if indicator in content:
                return {
                    'type': 'captcha_detected',
                    'confidence': 0.85,
                    'description': f'Captcha indicator found: {indicator}'
                }
        
        return None
    
    def _detect_suspicious_content(self, response):
        """
        检测可疑内容
        """
        content = response.text.lower()
        
        suspicious_patterns = [
            'access denied', 'blocked', 'forbidden', 'not allowed',
            'suspicious activity', 'bot detected', 'automated access'
        ]
        
        for pattern in suspicious_patterns:
            if pattern in content:
                return {
                    'type': 'suspicious_content',
                    'confidence': 0.8,
                    'description': f'Suspicious pattern found: {pattern}'
                }
        
        return None
    
    def analyze_response(self, response, request):
        """
        分析响应,检测反爬特征
        """
        detections = []
        
        for pattern_name, detector in self.detection_patterns.items():
            detection = detector(response)
            if detection:
                detections.append(detection)
                self.log_event(
                    'anti_detection_trigger',
                    {
                        'detection': detection,
                        'url': request.url,
                        'status': response.status
                    },
                    'warning'
                )
        
        return detections
    
    def _trigger_alert(self, event):
        """
        触发告警
        """
        for callback in self.alert_callbacks:
            try:
                callback(event)
            except Exception as e:
                print(f"Alert callback error: {e}")
    
    def add_alert_callback(self, callback):
        """
        添加告警回调函数
        """
        self.alert_callbacks.append(callback)
    
    def get_detection_summary(self):
        """
        获取检测摘要
        """
        summary = {
            'total_events': len(self.event_log),
            'recent_events': list(self.event_log)[-10:],  # 最近10个事件
            'detection_types': defaultdict(int),
            'severities': defaultdict(int)
        }
        
        for event in self.event_log:
            if event['type'] == 'anti_detection_trigger':
                detection = event['details']['detection']
                summary['detection_types'][detection['type']] += 1
            
            summary['severities'][event['severity']] += 1
        
        return summary

class PerformanceMonitor:
    """
    性能监控系统
    """
    
    def __init__(self):
        self.metrics = {
            'request_times': deque(maxlen=1000),
            'success_rates': deque(maxlen=100),
            'error_rates': deque(maxlen=100),
            'throughput': deque(maxlen=100)
        }
        self.start_time = time.time()
    
    def record_request_time(self, request_time):
        """
        记录请求时间
        """
        self.metrics['request_times'].append(request_time)
    
    def record_success(self):
        """
        记录成功请求
        """
        # 计算最近的成功率
        if len(self.metrics['success_rates']) == 0:
            self.metrics['success_rates'].append(1.0)
        else:
            # 简化的成功率计算
            current_success = self.metrics['success_rates'][-1] if self.metrics['success_rates'] else 1.0
            self.metrics['success_rates'].append(current_success)
    
    def record_error(self):
        """
        记录错误请求
        """
        if len(self.metrics['error_rates']) == 0:
            self.metrics['error_rates'].append(1.0)
        else:
            current_error = self.metrics['error_rates'][-1] if self.metrics['error_rates'] else 0.0
            self.metrics['error_rates'].append(current_error)
    
    def get_performance_metrics(self):
        """
        获取性能指标
        """
        metrics = {}
        
        if self.metrics['request_times']:
            metrics['avg_response_time'] = statistics.mean(self.metrics['request_times'])
            metrics['max_response_time'] = max(self.metrics['request_times'])
            metrics['min_response_time'] = min(self.metrics['request_times'])
            metrics['response_time_std'] = statistics.stdev(self.metrics['request_times']) if len(self.metrics['request_times']) > 1 else 0
        
        if self.metrics['success_rates']:
            metrics['recent_success_rate'] = statistics.mean(self.metrics['success_rates'])
        
        if self.metrics['error_rates']:
            metrics['recent_error_rate'] = statistics.mean(self.metrics['error_rates'])
        
        metrics['uptime'] = time.time() - self.start_time
        
        return metrics

class DetectionCountermeasuresMiddleware:
    """
    检测反制中间件
    """
    
    def __init__(self):
        self.monitor = AntiDetectionMonitor()
        self.performance_monitor = PerformanceMonitor()
        self.countermeasures = {
            'ip_rotation': self._apply_ip_rotation,
            'delay_increase': self._apply_delay_increase,
            'user_agent_rotation': self._apply_user_agent_rotation,
            'request_modification': self._apply_request_modification
        }
    
    def process_request(self, request, spider):
        """
        处理请求,应用反制措施
        """
        # 检查是否触发了反爬检测
        if request.meta.get('anti_detection_triggered'):
            countermeasure = request.meta.get('countermeasure', 'delay_increase')
            action = self.countermeasures.get(countermeasure)
            if action:
                action(request, spider)
        
        return None
    
    def _apply_ip_rotation(self, request, spider):
        """
        应用IP轮换
        """
        # 请求使用新的IP地址
        spider.logger.info("Applying IP rotation countermeasure")
        # 这里可以实现具体的IP轮换逻辑
    
    def _apply_delay_increase(self, request, spider):
        """
        应用延迟增加
        """
        base_delay = request.meta.get('current_delay', 1)
        new_delay = min(base_delay * 1.5, 10)  # 最大延迟10秒
        request.meta['current_delay'] = new_delay
        
        spider.logger.info(f"Increasing delay to {new_delay} seconds")
        time.sleep(new_delay)
    
    def _apply_user_agent_rotation(self, request, spider):
        """
        应用User-Agent轮换
        """
        from fake_useragent import UserAgent
        ua = UserAgent()
        new_ua = ua.random
        request.headers['User-Agent'] = new_ua
        
        spider.logger.info(f"Rotating User-Agent to: {new_ua}")
    
    def _apply_request_modification(self, request, spider):
        """
        应用请求修改
        """
        # 修改请求的各个方面以绕过检测
        request.headers['Accept-Language'] = random.choice([
            'zh-CN,zh;q=0.9,en;q=0.8',
            'en-US,en;q=0.9,zh-CN;q=0.8',
            'en-GB,en;q=0.9,en-US;q=0.8,de;q=0.7'
        ])
        
        spider.logger.info("Modifying request headers")

class BehavioralAnalysisMiddleware:
    """
    行为分析中间件
    """
    
    def __init__(self):
        self.pattern_analyzer = PatternAnalyzer()
        self.anomaly_detector = AnomalyDetector()
    
    def process_request(self, request, spider):
        """
        分析请求行为模式
        """
        behavior_score = self.pattern_analyzer.analyze_request_pattern(request)
        
        if behavior_score > 0.8:  # 高度疑似自动化行为
            request.meta['behavior_score'] = behavior_score
            request.meta['anti_detection_triggered'] = True
            request.meta['countermeasure'] = 'delay_increase'
            
            spider.logger.warning(f"Behavioral anomaly detected (score: {behavior_score})")
        
        return None

class PatternAnalyzer:
    """
    模式分析器
    """
    
    def __init__(self):
        self.normal_patterns = {
            'request_intervals': [],  # 正常的请求间隔
            'user_agents': set(),     # 正常的User-Agent
            'accept_headers': set()   # 正常的Accept头
        }
    
    def analyze_request_pattern(self, request):
        """
        分析请求模式,检测异常
        """
        # 检查请求频率模式
        frequency_score = self._check_frequency_pattern(request)
        
        # 检查请求头模式
        header_score = self._check_header_pattern(request)
        
        # 检查路径访问模式
        path_score = self._check_path_pattern(request)
        
        # 综合评分
        total_score = (frequency_score + header_score + path_score) / 3
        
        return total_score
    
    def _check_frequency_pattern(self, request):
        """
        检查请求频率模式
        """
        # 这里可以分析请求的时间间隔模式
        # 如果时间间隔过于规律,可能是自动化请求
        return 0.5  # 简化返回
    
    def _check_header_pattern(self, request):
        """
        检查请求头模式
        """
        # 检查User-Agent、Accept等头部是否符合正常用户模式
        return 0.5  # 简化返回
    
    def _check_path_pattern(self, request):
        """
        检查路径访问模式
        """
        # 检查URL访问路径是否符合正常用户浏览模式
        return 0.5  # 简化返回

class AnomalyDetector:
    """
    异常检测器
    """
    
    def __init__(self):
        self.known_good_patterns = []
        self.threshold = 0.7
    
    def detect_anomaly(self, request_features):
        """
        检测请求特征中的异常
        """
        # 使用机器学习或统计方法检测异常
        anomaly_score = self._calculate_anomaly_score(request_features)
        return anomaly_score > self.threshold
    
    def _calculate_anomaly_score(self, features):
        """
        计算异常分数
        """
        # 简化实现
        return random.random()

## 法合规与道德规范 \{#法律合规与道德规范}

在进行反爬对抗时,必须遵守相关法律法规和道德准则:

### 法律法规遵循

```python
"""
重要的法律考虑因素:

1. 知识产权保护:
   - 尊重网站版权和知识产权
   - 避免商业用途的数据滥用
   - 遵守相关数据保护法律

2. 网络安全法规:
   - 遵守《网络安全法》
   - 遵守《数据安全法》
   - 遵守《个人信息保护法》

3. 合同义务:
   - 遵守网站服务条款
   - 尊重robots.txt协议
   - 避免违反网站使用协议
"""

道德原则

"""
爬虫道德准则:

1. 适度原则:
   - 控制请求频率,避免对目标服务器造成过大压力
   - 尊重网站的资源和带宽限制
   - 避免恶意消耗网站资源

2. 透明原则:
   - 明确标注爬虫身份(User-Agent)
   - 尽可能公开爬虫目的和数据使用方式
   - 遵守网站的robots.txt规则

3. 合规原则:
   - 仅获取公开可访问的数据
   - 不获取受保护的私人信息
   - 遵守数据使用授权
"""

最佳实践建议

"""
反爬对抗最佳实践:

1. 技术层面:
   - 优先使用官方API而非爬虫
   - 实现智能请求节流
   - 使用合法的User-Agent
   - 尊重网站的Rate Limit
   - 实现错误重试机制

2. 法律层面:
   - 事先审查目标网站的使用条款
   - 获取必要的法律许可
   - 建立合规审查机制
   - 定期更新合规策略

3. 商业层面:
   - 评估数据获取的商业价值
   - 考虑与网站方合作的可能性
   - 建立可持续的数据获取策略
"""

最佳实践总结

反爬对抗是一个复杂的技术领域,需要平衡技术实现、法律合规和商业需求。

综合策略

"""
综合反爬对抗策略:

1. 预防为主:
   - 在设计阶段考虑反爬因素
   - 建立全面的检测机制
   - 准备多种应对方案

2. 分层防御:
   - 请求层:IP轮换、请求头伪装
   - 应用层:行为模拟、会话管理
   - 网络层:代理池、分布式部署

3. 持续优化:
   - 监控反爬效果
   - 根据检测结果调整策略
   - 不断更新技术手段
"""

💡 核心要点: 反爬对抗是一个攻防博弈的过程,需要持续关注新技术、新策略的发展,并始终保持法律合规意识。


SEO优化建议

为了提高这篇反爬对抗实战教程在搜索引擎中的排名,以下是几个关键的SEO优化建议:

标题优化

  • 主标题: 包含核心关键词"反爬对抗", "验证码破解", "反检测", "IP轮换", "请求头伪装"
  • 二级标题: 每个章节标题都包含相关的长尾关键词
  • H1-H6层次结构: 保持正确的标题层级,便于搜索引擎理解内容结构

内容优化

  • 关键词密度: 在内容中自然地融入关键词如"Scrapy", "反爬虫", "验证码", "IP轮换", "请求头伪造", "浏览器指纹", "爬虫防护", "反检测"等
  • 元描述: 在文章开头的元数据中包含吸引人的描述
  • 内部链接: 链接到其他相关教程,如Downloader Middleware
  • 外部权威链接: 引用官方文档和权威资源

技术SEO

  • 页面加载速度: 优化代码块和图片加载
  • 移动端适配: 确保在移动设备上良好显示
  • 结构化数据: 使用适当的HTML标签和语义化元素

用户体验优化

  • 内容可读性: 使用清晰的段落结构和代码示例
  • 互动元素: 提供实际可运行的代码示例
  • 更新频率: 定期更新内容以保持时效性

🔗 相关教程推荐

🏷️ 标签云: Scrapy 反爬虫 验证码破解 IP轮换 请求头伪装 浏览器指纹 反检测 爬虫安全 反爬对抗 数据采集