Scrapy与Selenium/Playwright集成完全指南 - JavaScript动态渲染处理与浏览器自动化技术详解

📂 所属阶段:第三阶段 — 攻防演练(中间件与反爬篇)
🔗 相关章节:Downloader Middleware · 反爬对抗实战 · 数据去重与增量更新

目录

Selenium与Playwright概述

Selenium和Playwright是处理JavaScript动态渲染页面的两大主流工具,它们能够在Scrapy爬虫中处理复杂的前端交互场景。

Selenium与Playwright对比

"""
Selenium vs Playwright 对比:

Selenium:
- 优势:成熟稳定,社区庞大,支持多种浏览器
- 劣势:启动较慢,资源占用高,API较为复杂
- 适用场景:已有Selenium代码,复杂浏览器兼容性需求

Playwright:
- 优势:启动快,性能好,API简洁,内置等待机制
- 劣势:相对较新,某些功能仍在完善
- 适用场景:新项目,高性能要求,现代化爬虫
"""

何时使用浏览器自动化

"""
需要使用浏览器自动化的场景:
1. JavaScript动态加载内容
2. 单页应用(SPA)内容获取
3. 复杂用户交互模拟
4. Canvas/WebGL内容抓取
5. 验证码处理
6. 动态表单提交
7. WebSocket通信监控
"""

Selenium集成方案

基础Selenium集成

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from scrapy.http import HtmlResponse
import time

class SeleniumMiddleware:
    """
    基础Selenium中间件
    """
    
    def __init__(self):
        self.driver = self._create_driver()
    
    def _create_driver(self):
        """
        创建Chrome驱动
        """
        chrome_options = Options()
        chrome_options.add_argument('--headless')  # 无头模式
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chrome_options.add_argument('--disable-blink-features=AutomationControlled')
        chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
        chrome_options.add_experimental_option('useAutomationExtension', False)
        
        driver = webdriver.Chrome(options=chrome_options)
        
        # 隐藏webdriver属性
        driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
        
        return driver
    
    def process_request(self, request, spider):
        """
        处理需要Selenium的请求
        """
        if request.meta.get('use_selenium'):
            try:
                self.driver.get(request.url)
                
                # 等待页面加载完成
                wait = WebDriverWait(self.driver, 10)
                wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
                
                # 等待动态内容加载
                time.sleep(2)  # 简单等待,实际应用中应使用更智能的方法
                
                body = self.driver.page_source.encode('utf-8')
                return HtmlResponse(
                    url=request.url,
                    body=body,
                    encoding='utf-8',
                    request=request
                )
            except Exception as e:
                spider.logger.error(f"Selenium error for {request.url}: {str(e)}")
                # 返回原始请求以便重试
                return request
    
    def spider_closed(self, spider):
        """
        爬虫关闭时清理资源
        """
        if self.driver:
            self.driver.quit()

高级Selenium集成

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
from scrapy.http import HtmlResponse
import time
import random

class AdvancedSeleniumMiddleware:
    """
    高级Selenium中间件
    """
    
    def __init__(self):
        self.drivers = []  # 多驱动管理
        self.max_drivers = 5  # 最大驱动数
        self.create_initial_drivers()
    
    def create_initial_drivers(self):
        """
        创建初始驱动池
        """
        for _ in range(self.max_drivers):
            driver = self._create_driver()
            self.drivers.append(driver)
    
    def _create_driver(self):
        """
        创建配置优化的Chrome驱动
        """
        chrome_options = Options()
        
        # 基础选项
        chrome_options.add_argument('--headless')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chrome_options.add_argument('--disable-gpu')
        chrome_options.add_argument('--remote-debugging-port=9222')
        
        # 反检测选项
        chrome_options.add_argument('--disable-blink-features=AutomationControlled')
        chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
        chrome_options.add_experimental_option('useAutomationExtension', False)
        
        # 性能优化选项
        chrome_options.add_argument('--disable-background-timer-throttling')
        chrome_options.add_argument('--disable-renderer-backgrounding')
        chrome_options.add_argument('--disable-extensions')
        chrome_options.add_argument('--disable-plugins')
        chrome_options.add_argument('--disable-images')
        chrome_options.add_argument('--disable-javascript')
        chrome_options.add_argument('--blink-settings=imagesEnabled=false')
        
        driver = webdriver.Chrome(options=chrome_options)
        
        # 隐藏webdriver属性
        driver.execute_script("""
            Object.defineProperty(navigator, 'webdriver', {
                get: () => undefined
            });
            Object.defineProperty(navigator, 'plugins', {
                get: () => [1, 2, 3, 4, 5]
            });
        """)
        
        return driver
    
    def get_available_driver(self):
        """
        获取可用驱动
        """
        if self.drivers:
            return self.drivers.pop(0)
        else:
            # 如果没有可用驱动,创建新的
            return self._create_driver()
    
    def return_driver(self, driver):
        """
        归还驱动到池中
        """
        if len(self.drivers) < self.max_drivers:
            self.drivers.append(driver)
        else:
            # 超过最大数量,直接关闭
            driver.quit()
    
    def process_request(self, request, spider):
        """
        处理Selenium请求
        """
        if not request.meta.get('use_selenium'):
            return None
        
        driver = self.get_available_driver()
        
        try:
            # 设置页面加载超时
            driver.set_page_load_timeout(30)
            
            # 访问页面
            driver.get(request.url)
            
            # 执行自定义JavaScript(如果需要)
            js_scripts = request.meta.get('selenium_js_scripts', [])
            for script in js_scripts:
                driver.execute_script(script)
            
            # 等待特定元素加载
            wait_selector = request.meta.get('selenium_wait_for', None)
            if wait_selector:
                wait = WebDriverWait(driver, 10)
                wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, wait_selector)))
            else:
                # 默认等待页面加载
                WebDriverWait(driver, 10).until(
                    EC.presence_of_element_located((By.TAG_NAME, "body"))
                )
            
            # 滚动页面以触发懒加载
            scroll_pause_time = request.meta.get('selenium_scroll_pause', 1)
            scroll_times = request.meta.get('selenium_scroll_times', 0)
            
            for _ in range(scroll_times):
                driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
                time.sleep(scroll_pause_time)
            
            # 点击按钮或执行交互
            click_selectors = request.meta.get('selenium_click_selectors', [])
            for selector in click_selectors:
                try:
                    element = WebDriverWait(driver, 5).until(
                        EC.element_to_be_clickable((By.CSS_SELECTOR, selector))
                    )
                    element.click()
                    time.sleep(1)  # 等待点击后的响应
                except:
                    spider.logger.warning(f"Could not click element: {selector}")
            
            # 获取页面源码
            body = driver.page_source.encode('utf-8')
            
            # 创建响应对象
            response = HtmlResponse(
                url=request.url,
                body=body,
                encoding='utf-8',
                request=request
            )
            
            # 归还驱动
            self.return_driver(driver)
            
            return response
            
        except Exception as e:
            spider.logger.error(f"Selenium error for {request.url}: {str(e)}")
            
            # 确保驱动被归还
            self.return_driver(driver)
            
            # 返回原始请求以便重试或其他处理
            return request
    
    def spider_closed(self, spider):
        """
        爬虫关闭时清理所有驱动
        """
        for driver in self.drivers:
            try:
                driver.quit()
            except:
                pass
        self.drivers.clear()

Playwright集成方案

基础Playwright集成

from playwright.sync_api import sync_playwright
from scrapy.http import HtmlResponse
import asyncio
from concurrent.futures import ThreadPoolExecutor

class PlaywrightMiddleware:
    """
    基础Playwright中间件
    """
    
    def __init__(self):
        self.playwright = None
        self.browser = None
        self.setup_browser()
    
    def setup_browser(self):
        """
        设置Playwright浏览器
        """
        self.playwright = sync_playwright().start()
        self.browser = self.playwright.chromium.launch(
            headless=True,
            args=[
                '--no-sandbox',
                '--disable-setuid-sandbox',
                '--disable-dev-shm-usage',
                '--disable-accelerated-2d-canvas',
                '--no-first-run',
                '--no-zygote',
                '--disable-gpu'
            ]
        )
    
    def process_request(self, request, spider):
        """
        处理需要Playwright的请求
        """
        if request.meta.get('use_playwright'):
            try:
                page = self.browser.new_page()
                
                # 设置用户代理
                page.set_extra_http_headers({
                    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
                })
                
                # 访问页面
                response = page.goto(request.url, wait_until="networkidle")
                
                # 等待动态内容加载
                page.wait_for_load_state("domcontentloaded")
                
                # 获取页面内容
                content = page.content()
                
                # 关闭页面
                page.close()
                
                return HtmlResponse(
                    url=request.url,
                    body=content.encode('utf-8'),
                    encoding='utf-8',
                    request=request
                )
            except Exception as e:
                spider.logger.error(f"Playwright error for {request.url}: {str(e)}")
                return request
    
    def spider_closed(self, spider):
        """
        爬虫关闭时清理资源
        """
        if self.browser:
            self.browser.close()
        if self.playwright:
            self.playwright.stop()

高级Playwright集成

from playwright.sync_api import sync_playwright, Browser
from scrapy.http import HtmlResponse
import time
import random
from typing import Optional, Dict, Any
from dataclasses import dataclass

@dataclass
class BrowserConfig:
    """
    浏览器配置类
    """
    headless: bool = True
    user_agent: str = ""
    viewport: Dict[str, int] = None
    locale: str = "en-US"
    timezone_id: str = "Asia/Shanghai"
    geolocation: Dict[str, float] = None
    permissions: list = None
    extra_http_headers: Dict[str, str] = None
    
    def __post_init__(self):
        if self.viewport is None:
            self.viewport = {"width": 1920, "height": 1080}
        if self.geolocation is None:
            self.geolocation = {"longitude": 121.4737, "latitude": 31.2304}
        if self.permissions is None:
            self.permissions = ["geolocation", "notifications"]

class AdvancedPlaywrightMiddleware:
    """
    高级Playwright中间件
    """
    
    def __init__(self):
        self.playwright = None
        self.browser = None
        self.contexts = []  # 浏览器上下文池
        self.max_contexts = 10
        self.setup_environment()
    
    def setup_environment(self):
        """
        设置Playwright环境
        """
        self.playwright = sync_playwright().start()
        self.browser = self.playwright.chromium.launch(
            headless=True,
            args=[
                '--no-sandbox',
                '--disable-setuid-sandbox',
                '--disable-dev-shm-usage',
                '--disable-accelerated-2d-canvas',
                '--no-first-run',
                '--no-zygote',
                '--disable-gpu',
                '--disable-web-security',
                '--disable-features=VizDisplayCompositor'
            ]
        )
        
        # 创建初始上下文池
        for _ in range(3):
            context = self._create_context()
            self.contexts.append(context)
    
    def _create_context(self, config: Optional[BrowserConfig] = None):
        """
        创建浏览器上下文
        """
        if config is None:
            config = BrowserConfig()
        
        context = self.browser.new_context(
            user_agent=config.user_agent or "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
            viewport=config.viewport,
            locale=config.locale,
            timezone_id=config.timezone_id,
            geolocation=config.geolocation,
            permissions=config.permissions,
            extra_http_headers=config.extra_http_headers or {},
            java_script_enabled=True,
        )
        
        # 添加反检测脚本
        context.add_init_script("""
            Object.defineProperty(navigator, 'webdriver', {
                get: () => undefined,
            });
            Object.defineProperty(navigator, 'plugins', {
                get: () => [1, 2, 3, 4, 5],
            });
            Object.defineProperty(navigator, 'languages', {
                get: () => ['en-US', 'en'],
            });
        """)
        
        return context
    
    def get_available_context(self):
        """
        获取可用的浏览器上下文
        """
        if self.contexts:
            return self.contexts.pop(0)
        else:
            # 如果没有可用上下文,创建新的
            return self._create_context()
    
    def return_context(self, context):
        """
        归还浏览器上下文
        """
        if len(self.contexts) < self.max_contexts:
            # 清理上下文中的页面
            for page in context.pages:
                try:
                    page.close()
                except:
                    pass
            self.contexts.append(context)
        else:
            # 超过最大数量,直接关闭
            context.close()
    
    def process_request(self, request, spider):
        """
        处理Playwright请求
        """
        if not request.meta.get('use_playwright'):
            return None
        
        context = self.get_available_context()
        
        try:
            # 创建新页面
            page = context.new_page()
            
            # 设置请求拦截(如果需要)
            block_resources = request.meta.get('playwright_block_resources', [])
            if block_resources:
                def handle_route(route):
                    if any(resource in route.request.url for resource in block_resources):
                        route.abort()
                    else:
                        route.continue_()
                
                page.route("**/*", handle_route)
            
            # 访问页面
            page.goto(request.url, wait_until="networkidle")
            
            # 执行自定义脚本
            custom_scripts = request.meta.get('playwright_custom_scripts', [])
            for script in custom_scripts:
                page.evaluate(script)
            
            # 等待特定元素
            wait_for_selector = request.meta.get('playwright_wait_for_selector', None)
            if wait_for_selector:
                page.wait_for_selector(wait_for_selector, timeout=10000)
            
            # 滚动页面
            scroll_pause = request.meta.get('playwright_scroll_pause', 0)
            if scroll_pause > 0:
                # 模拟滚动
                for _ in range(3):
                    page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
                    page.wait_for_timeout(scroll_pause * 1000)
            
            # 点击元素
            click_selectors = request.meta.get('playwright_click_selectors', [])
            for selector in click_selectors:
                try:
                    page.click(selector)
                    page.wait_for_timeout(1000)  # 等待点击响应
                except:
                    spider.logger.warning(f"Could not click element: {selector}")
            
            # 获取内容
            content = page.content()
            
            # 关闭页面
            page.close()
            
            # 归还上下文
            self.return_context(context)
            
            return HtmlResponse(
                url=request.url,
                body=content.encode('utf-8'),
                encoding='utf-8',
                request=request
            )
            
        except Exception as e:
            spider.logger.error(f"Playwright error for {request.url}: {str(e)}")
            
            # 确保上下文被归还
            self.return_context(context)
            
            return request
    
    def spider_closed(self, spider):
        """
        爬虫关闭时清理资源
        """
        for context in self.contexts:
            try:
                context.close()
            except:
                pass
        self.contexts.clear()
        
        if self.browser:
            self.browser.close()
        if self.playwright:
            self.playwright.stop()

浏览器驱动管理

智能驱动管理器

import subprocess
import psutil
import os
import time
from typing import List, Optional
from dataclasses import dataclass

@dataclass
class DriverInfo:
    """
    驱动信息类
    """
    pid: int
    name: str
    memory_usage: float
    cpu_usage: float
    uptime: float
    status: str

class DriverManager:
    """
    浏览器驱动管理器
    """
    
    def __init__(self, max_memory_percent=80.0, max_cpu_percent=80.0):
        self.max_memory_percent = max_memory_percent
        self.max_cpu_percent = max_cpu_percent
        self.managed_processes = set()
    
    def monitor_system_resources(self):
        """
        监控系统资源使用情况
        """
        memory_percent = psutil.virtual_memory().percent
        cpu_percent = psutil.cpu_percent(interval=1)
        
        return {
            'memory_percent': memory_percent,
            'cpu_percent': cpu_percent,
            'available_memory': psutil.virtual_memory().available,
            'total_memory': psutil.virtual_memory().total
        }
    
    def get_browser_processes(self) -> List[DriverInfo]:
        """
        获取浏览器相关进程信息
        """
        processes = []
        
        for proc in psutil.process_iter(['pid', 'name', 'memory_percent', 'cpu_percent']):
            try:
                if any(browser in proc.info['name'].lower() 
                      for browser in ['chrome', 'chromium', 'firefox', 'edge']):
                    process = psutil.Process(proc.info['pid'])
                    uptime = time.time() - process.create_time()
                    
                    info = DriverInfo(
                        pid=proc.info['pid'],
                        name=proc.info['name'],
                        memory_usage=proc.info['memory_percent'],
                        cpu_usage=proc.info['cpu_percent'],
                        uptime=uptime,
                        status=process.status()
                    )
                    processes.append(info)
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue
        
        return processes
    
    def cleanup_hanging_processes(self):
        """
        清理挂起的浏览器进程
        """
        browser_processes = self.get_browser_processes()
        cleaned_count = 0
        
        for proc_info in browser_processes:
            try:
                process = psutil.Process(proc_info.pid)
                
                # 如果进程挂起或资源使用过高,终止它
                if (proc_info.status == psutil.STATUS_ZOMBIE or
                    proc_info.memory_usage > 50.0 or  # 内存使用超过50%
                    proc_info.uptime > 3600):  # 运行超过1小时
                    process.terminate()
                    process.wait(timeout=5)
                    cleaned_count += 1
                    print(f"Terminated hanging process: {proc_info.name} (PID: {proc_info.pid})")
            except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.TimeoutExpired):
                continue
        
        return cleaned_count
    
    def check_system_health(self) -> bool:
        """
        检查系统健康状况
        """
        resources = self.monitor_system_resources()
        
        if (resources['memory_percent'] > self.max_memory_percent or
            resources['cpu_percent'] > self.max_cpu_percent):
            return False
        
        return True
    
    def optimize_system_resources(self):
        """
        优化系统资源
        """
        # 清理挂起进程
        cleaned = self.cleanup_hanging_processes()
        
        # 检查是否需要限制并发
        if not self.check_system_health():
            print("System resources are under pressure, consider reducing concurrency")
        
        return cleaned

class ResourceAwareMiddleware:
    """
    资源感知中间件
    """
    
    def __init__(self):
        self.driver_manager = DriverManager()
        self.request_queue = []
        self.active_requests = 0
        self.max_active_requests = 5
    
    def process_request(self, request, spider):
        """
        处理请求,考虑系统资源
        """
        if not (request.meta.get('use_selenium') or request.meta.get('use_playwright')):
            return None
        
        # 检查系统健康状况
        if not self.driver_manager.check_system_health():
            # 系统资源紧张,将请求加入队列
            self.request_queue.append(request)
            spider.logger.info(f"System under pressure, queuing request: {request.url}")
            return None
        
        # 检查活跃请求数量
        if self.active_requests >= self.max_active_requests:
            self.request_queue.append(request)
            spider.logger.info(f"Max active requests reached, queuing request: {request.url}")
            return None
        
        # 允许请求继续
        self.active_requests += 1
        return None
    
    def process_response(self, request, response, spider):
        """
        处理响应,更新活跃请求数量
        """
        if (request.meta.get('use_selenium') or request.meta.get('use_playwright')):
            self.active_requests = max(0, self.active_requests - 1)
        
        # 尝试处理队列中的请求
        if (self.request_queue and 
            self.active_requests < self.max_active_requests and
            self.driver_manager.check_system_health()):
            queued_request = self.request_queue.pop(0)
            self.active_requests += 1
            # 重新调度请求
            return queued_request
        
        return response
    
    def spider_idle(self, spider):
        """
        爬虫空闲时处理队列
        """
        if (self.request_queue and 
            self.active_requests < self.max_active_requests and
            self.driver_manager.check_system_health()):
            queued_request = self.request_queue.pop(0)
            self.active_requests += 1
            spider.crawler.engine.schedule(queued_request, spider)

反检测策略

反检测配置

import random
import string
from typing import List, Dict, Any

class AntiDetectionConfig:
    """
    反检测配置类
    """
    
    @staticmethod
    def get_stealth_args() -> List[str]:
        """
        获取反检测启动参数
        """
        return [
            '--disable-blink-features=AutomationControlled',
            '--disable-dev-shm-usage',
            '--disable-gpu',
            '--disable-extensions',
            '--disable-plugins',
            '--disable-images',
            '--no-sandbox',
            '--disable-web-security',
            '--allow-running-insecure-content',
            '--disable-features=VizDisplayCompositor',
            '--disable-ipc-flooding-protection',
            '--disable-background-timer-throttling',
            '--disable-backgrounding-occluded-windows',
            '--disable-renderer-backgrounding'
        ]
    
    @staticmethod
    def get_stealth_script() -> str:
        """
        获取反检测JavaScript脚本
        """
        return """
        // 隐藏webdriver属性
        Object.defineProperty(navigator, 'webdriver', {
            get: () => undefined,
        });

        // 模拟插件
        Object.defineProperty(navigator, 'plugins', {
            get: () => [1, 2, 3, 4, 5],
        });

        // 模拟语言
        Object.defineProperty(navigator, 'languages', {
            get: () => ['zh-CN', 'zh', 'en'],
        });

        // 模拟平台
        Object.defineProperty(navigator, 'platform', {
            get: () => 'Win32',
        });

        // 模拟用户代理
        Object.defineProperty(navigator, 'userAgent', {
            get: () => navigator.userAgent.replace(/HeadlessChrome/i, 'Chrome'),
        });

        // 隐藏Chrome属性
        Object.defineProperty(window, 'chrome', {
            value: new Proxy({}, {
                get(target, prop) {
                    if (prop === 'runtime') return {};
                    return target[prop];
                }
            }),
            writable: false,
        });

        // 隐藏eval toString
        const originalToString = Function.prototype.toString;
        Function.prototype.toString = function() {
            if (this === window.cdc_adoQpoasnfa76pfcZLmcfl_Array) {
                return 'function Array() { [native code] }';
            }
            return originalToString.call(this);
        };
        """
    
    @staticmethod
    def get_realistic_user_agents() -> List[str]:
        """
        获取真实的用户代理字符串
        """
        return [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/121.0",
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15"
        ]

class StealthSeleniumMiddleware:
    """
    隐蔽Selenium中间件
    """
    
    def __init__(self):
        self.config = AntiDetectionConfig()
        self.driver = self._create_stealth_driver()
    
    def _create_stealth_driver(self):
        """
        创建隐蔽驱动
        """
        from selenium import webdriver
        from selenium.webdriver.chrome.options import Options
        
        chrome_options = Options()
        
        # 添加反检测参数
        for arg in self.config.get_stealth_args():
            chrome_options.add_argument(arg)
        
        # 设置随机用户代理
        user_agent = random.choice(self.config.get_realistic_user_agents())
        chrome_options.add_argument(f'--user-agent={user_agent}')
        
        # 设置窗口大小随机化
        window_sizes = [
            {"width": 1366, "height": 768},
            {"width": 1920, "height": 1080},
            {"width": 1440, "height": 900},
            {"width": 1536, "height": 864}
        ]
        size = random.choice(window_sizes)
        chrome_options.add_argument(f'--window-size={size["width"]},{size["height"]}')
        
        driver = webdriver.Chrome(options=chrome_options)
        
        # 执行反检测脚本
        driver.execute_script(self.config.get_stealth_script())
        
        return driver
    
    def process_request(self, request, spider):
        """
        处理请求
        """
        if request.meta.get('use_selenium'):
            try:
                self.driver.get(request.url)
                
                # 模拟人类行为
                self._simulate_human_behavior()
                
                body = self.driver.page_source.encode('utf-8')
                return HtmlResponse(
                    url=request.url,
                    body=body,
                    encoding='utf-8',
                    request=request
                )
            except Exception as e:
                spider.logger.error(f"Stealth Selenium error: {str(e)}")
                return request
    
    def _simulate_human_behavior(self):
        """
        模拟人类行为
        """
        import time
        from selenium.webdriver.common.action_chains import ActionChains
        
        # 随机移动鼠标
        actions = ActionChains(self.driver)
        actions.move_by_offset(random.randint(-10, 10), random.randint(-10, 10))
        actions.perform()
        
        # 随机等待时间
        time.sleep(random.uniform(1, 3))
    
    def spider_closed(self, spider):
        """
        清理资源
        """
        if self.driver:
            self.driver.quit()

class StealthPlaywrightMiddleware:
    """
    隐蔽Playwright中间件
    """
    
    def __init__(self):
        self.config = AntiDetectionConfig()
        self.playwright = None
        self.browser = None
        self.setup_stealth_browser()
    
    def setup_stealth_browser(self):
        """
        设置隐蔽浏览器
        """
        from playwright.sync_api import sync_playwright
        
        self.playwright = sync_playwright().start()
        self.browser = self.playwright.chromium.launch(
            headless=True,
            args=self.config.get_stealth_args()
        )
    
    def process_request(self, request, spider):
        """
        处理请求
        """
        if request.meta.get('use_playwright'):
            try:
                # 随机选择用户代理
                user_agent = random.choice(self.config.get_realistic_user_agents())
                
                context = self.browser.new_context(
                    user_agent=user_agent,
                    viewport={"width": 1920, "height": 1080},
                    locale="zh-CN",
                    timezone_id="Asia/Shanghai"
                )
                
                # 添加反检测脚本
                context.add_init_script(self.config.get_stealth_script())
                
                page = context.new_page()
                
                # 模拟人类行为
                self._simulate_human_behavior(page)
                
                page.goto(request.url, wait_until="networkidle")
                
                content = page.content()
                
                page.close()
                context.close()
                
                return HtmlResponse(
                    url=request.url,
                    body=content.encode('utf-8'),
                    encoding='utf-8',
                    request=request
                )
            except Exception as e:
                spider.logger.error(f"Stealth Playwright error: {str(e)}")
                return request
    
    def _simulate_human_behavior(self, page):
        """
        模拟人类行为
        """
        import time
        
        # 随机等待
        page.wait_for_timeout(random.randint(1000, 3000))
        
        # 模拟滚动
        page.evaluate(f"window.scrollTo(0, {random.randint(100, 500)});")
    
    def spider_closed(self, spider):
        """
        清理资源
        """
        if self.browser:
            self.browser.close()
        if self.playwright:
            self.playwright.stop()

性能优化技巧

性能优化配置

import time
from typing import Dict, Any, Optional
from dataclasses import dataclass

@dataclass
class PerformanceConfig:
    """
    性能配置类
    """
    # 资源限制
    max_pages: int = 10
    max_contexts: int = 5
    max_drivers: int = 3
    
    # 超时设置
    page_load_timeout: int = 30
    script_timeout: int = 10
    wait_timeout: int = 10
    
    # 缓存设置
    enable_cache: bool = True
    cache_size: int = 100
    cache_ttl: int = 300  # 5分钟
    
    # 并发设置
    max_concurrent: int = 2
    request_delay: float = 1.0

class PerformanceOptimizedMiddleware:
    """
    性能优化中间件
    """
    
    def __init__(self, config: Optional[PerformanceConfig] = None):
        self.config = config or PerformanceConfig()
        self.page_pool = []
        self.context_pool = []
        self.driver_pool = []
        self.cache = {}
        self.active_requests = 0
        self.request_timestamps = []
    
    def get_cached_result(self, url: str) -> Optional[str]:
        """
        获取缓存结果
        """
        if not self.config.enable_cache:
            return None
        
        cached = self.cache.get(url)
        if cached:
            content, timestamp = cached
            if time.time() - timestamp < self.config.cache_ttl:
                return content
            else:
                # 缓存过期,删除
                del self.cache[url]
        
        return None
    
    def cache_result(self, url: str, content: str):
        """
        缓存结果
        """
        if not self.config.enable_cache:
            return
        
        # 检查缓存大小
        if len(self.cache) >= self.config.cache_size:
            # 删除最旧的缓存项
            oldest_key = min(self.cache.keys(), key=lambda k: self.cache[k][1])
            del self.cache[oldest_key]
        
        self.cache[url] = (content, time.time())
    
    def process_request(self, request, spider):
        """
        处理请求
        """
        if not (request.meta.get('use_selenium') or request.meta.get('use_playwright')):
            return None
        
        # 检查缓存
        cached_content = self.get_cached_result(request.url)
        if cached_content:
            spider.logger.info(f"Using cached content for: {request.url}")
            return HtmlResponse(
                url=request.url,
                body=cached_content.encode('utf-8'),
                encoding='utf-8',
                request=request
            )
        
        # 检查并发限制
        if self.active_requests >= self.config.max_concurrent:
            spider.logger.info(f"Max concurrent limit reached, delaying request: {request.url}")
            time.sleep(self.config.request_delay)
        
        self.active_requests += 1
        
        try:
            if request.meta.get('use_selenium'):
                result = self._handle_selenium_request(request, spider)
            else:
                result = self._handle_playwright_request(request, spider)
            
            # 缓存结果
            if isinstance(result, HtmlResponse):
                self.cache_result(request.url, result.text)
            
            return result
        finally:
            self.active_requests = max(0, self.active_requests - 1)
    
    def _handle_selenium_request(self, request, spider):
        """
        处理Selenium请求
        """
        from selenium import webdriver
        from selenium.webdriver.chrome.options import Options
        
        # 获取或创建驱动
        if self.driver_pool:
            driver = self.driver_pool.pop()
        else:
            chrome_options = Options()
            chrome_options.add_argument('--headless')
            chrome_options.add_argument('--no-sandbox')
            chrome_options.add_argument('--disable-dev-shm-usage')
            chrome_options.add_argument('--disable-gpu')
            # 性能优化选项
            chrome_options.add_argument('--disable-images')
            chrome_options.add_argument('--disable-javascript')  # 根据需要启用
            chrome_options.add_argument('--blink-settings=imagesEnabled=false')
            
            driver = webdriver.Chrome(options=chrome_options)
        
        try:
            driver.set_page_load_timeout(self.config.page_load_timeout)
            driver.get(request.url)
            
            # 等待内容加载
            time.sleep(2)  # 根据实际需要调整
            
            content = driver.page_source
            
            return HtmlResponse(
                url=request.url,
                body=content.encode('utf-8'),
                encoding='utf-8',
                request=request
            )
        except Exception as e:
            spider.logger.error(f"Selenium error: {str(e)}")
            return request
        finally:
            # 归还驱动到池中
            if len(self.driver_pool) < self.config.max_drivers:
                self.driver_pool.append(driver)
            else:
                driver.quit()
    
    def _handle_playwright_request(self, request, spider):
        """
        处理Playwright请求
        """
        from playwright.sync_api import sync_playwright
        
        # 获取或创建上下文
        if self.context_pool:
            context = self.context_pool.pop()
        else:
            playwright = sync_playwright().start()
            browser = playwright.chromium.launch(headless=True)
            context = browser.new_context()
        
        try:
            page = context.new_page()
            page.set_default_timeout(self.config.page_load_timeout * 1000)
            
            page.goto(request.url, wait_until="networkidle")
            
            content = page.content()
            
            page.close()
            
            # 归还上下文到池中
            if len(self.context_pool) < self.config.max_contexts:
                self.context_pool.append(context)
            else:
                context.close()
            
            return HtmlResponse(
                url=request.url,
                body=content.encode('utf-8'),
                encoding='utf-8',
                request=request
            )
        except Exception as e:
            spider.logger.error(f"Playwright error: {str(e)}")
            
            # 确保上下文被关闭
            context.close()
            
            return request
    
    def spider_closed(self, spider):
        """
        清理资源
        """
        # 关闭所有驱动
        for driver in self.driver_pool:
            try:
                driver.quit()
            except:
                pass
        
        # 关闭所有上下文
        for context in self.context_pool:
            try:
                context.close()
            except:
                pass

资源管理与生命周期

资源管理器

import weakref
import gc
import threading
import time
from typing import Dict, Any, Callable
from contextlib import contextmanager

class ResourceManager:
    """
    资源管理器
    """
    
    def __init__(self):
        self.resources = {}
        self.cleanup_callbacks = {}
        self.monitoring_thread = None
        self.monitoring = False
        self.lock = threading.Lock()
    
    def register_resource(self, name: str, resource: Any, cleanup_callback: Callable):
        """
        注册资源
        """
        with self.lock:
            self.resources[name] = weakref.ref(resource, cleanup_callback)
            self.cleanup_callbacks[name] = cleanup_callback
    
    def unregister_resource(self, name: str):
        """
        注销资源
        """
        with self.lock:
            if name in self.resources:
                del self.resources[name]
            if name in self.cleanup_callbacks:
                del self.cleanup_callbacks[name]
    
    def cleanup_resources(self):
        """
        清理资源
        """
        with self.lock:
            for name, ref in list(self.resources.items()):
                if ref() is None:  # 对象已被垃圾回收
                    self.unregister_resource(name)
    
    def start_monitoring(self, interval: int = 60):
        """
        开始监控
        """
        self.monitoring = True
        self.monitoring_thread = threading.Thread(target=self._monitor_loop, args=(interval,), daemon=True)
        self.monitoring_thread.start()
    
    def stop_monitoring(self):
        """
        停止监控
        """
        self.monitoring = False
        if self.monitoring_thread:
            self.monitoring_thread.join()
    
    def _monitor_loop(self, interval: int):
        """
        监控循环
        """
        while self.monitoring:
            try:
                self.cleanup_resources()
                gc.collect()  # 强制垃圾回收
                time.sleep(interval)
            except Exception as e:
                print(f"Resource manager monitoring error: {e}")

class LifecycleAwareMiddleware:
    """
    生命周期感知中间件
    """
    
    def __init__(self):
        self.resource_manager = ResourceManager()
        self.active_sessions = {}
        self.session_locks = {}
    
    @contextmanager
    def session_context(self, session_id: str):
        """
        会话上下文管理器
        """
        if session_id not in self.session_locks:
            self.session_locks[session_id] = threading.Lock()
        
        lock = self.session_locks[session_id]
        lock.acquire()
        try:
            yield
        finally:
            lock.release()
    
    def process_request(self, request, spider):
        """
        处理请求
        """
        session_id = request.meta.get('session_id', 'default')
        
        with self.session_context(session_id):
            if request.meta.get('use_selenium'):
                return self._handle_selenium_session(request, spider, session_id)
            elif request.meta.get('use_playwright'):
                return self._handle_playwright_session(request, spider, session_id)
        
        return None
    
    def _handle_selenium_session(self, request, spider, session_id: str):
        """
        处理Selenium会话
        """
        from selenium import webdriver
        from selenium.webdriver.chrome.options import Options
        
        # 检查是否已有活跃会话
        if session_id in self.active_sessions:
            session_info = self.active_sessions[session_id]
            driver = session_info.get('driver')
            
            if driver:
                try:
                    driver.get(request.url)
                    content = driver.page_source
                    
                    return HtmlResponse(
                        url=request.url,
                        body=content.encode('utf-8'),
                        encoding='utf-8',
                        request=request
                    )
                except Exception as e:
                    spider.logger.error(f"Selenium session error: {str(e)}")
                    # 清理失败的会话
                    self._cleanup_session(session_id)
        
        # 创建新会话
        return self._create_selenium_session(request, spider, session_id)
    
    def _handle_playwright_session(self, request, spider, session_id: str):
        """
        处理Playwright会话
        """
        # 类似Selenium会话处理
        pass
    
    def _create_selenium_session(self, request, spider, session_id: str):
        """
        创建Selenium会话
        """
        from selenium import webdriver
        from selenium.webdriver.chrome.options import Options
        
        chrome_options = Options()
        chrome_options.add_argument('--headless')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        
        driver = webdriver.Chrome(options=chrome_options)
        
        # 存储会话信息
        self.active_sessions[session_id] = {
            'driver': driver,
            'created_at': time.time(),
            'last_used': time.time(),
            'request_count': 0
        }
        
        try:
            driver.get(request.url)
            content = driver.page_source
            
            # 更新会话统计
            session_info = self.active_sessions[session_id]
            session_info['last_used'] = time.time()
            session_info['request_count'] += 1
            
            return HtmlResponse(
                url=request.url,
                body=content.encode('utf-8'),
                encoding='utf-8',
                request=request
            )
        except Exception as e:
            spider.logger.error(f"Failed to create Selenium session: {str(e)}")
            self._cleanup_session(session_id)
            return request
    
    def _cleanup_session(self, session_id: str):
        """
        清理会话
        """
        if session_id in self.active_sessions:
            session_info = self.active_sessions[session_id]
            driver = session_info.get('driver')
            
            if driver:
                try:
                    driver.quit()
                except:
                    pass
            
            del self.active_sessions[session_id]
    
    def spider_opened(self, spider):
        """
        爬虫开启时
        """
        self.resource_manager.start_monitoring()
    
    def spider_closed(self, spider):
        """
        爬虫关闭时清理所有资源
        """
        self.resource_manager.stop_monitoring()
        
        # 清理所有活跃会话
        for session_id in list(self.active_sessions.keys()):
            self._cleanup_session(session_id)
        
        # 清理锁
        self.session_locks.clear()

错误处理与重试机制

错误处理中间件

import time
import random
from typing import List, Tuple
from enum import Enum

class RetryStrategy(Enum):
    LINEAR = "linear"
    EXPONENTIAL = "exponential"
    FIXED = "fixed"

class ErrorHandlingMiddleware:
    """
    错误处理中间件
    """
    
    def __init__(self):
        self.retry_config = {
            'max_retries': 3,
            'strategy': RetryStrategy.EXPONENTIAL,
            'base_delay': 1.0,
            'max_delay': 60.0,
            'jitter': True
        }
        self.failed_requests = {}
    
    def calculate_retry_delay(self, attempt: int) -> float:
        """
        计算重试延迟
        """
        if self.retry_config['strategy'] == RetryStrategy.LINEAR:
            delay = self.retry_config['base_delay'] * attempt
        elif self.retry_config['strategy'] == RetryStrategy.EXPONENTIAL:
            delay = self.retry_config['base_delay'] * (2 ** (attempt - 1))
        else:  # FIXED
            delay = self.retry_config['base_delay']
        
        # 应用最大延迟限制
        delay = min(delay, self.retry_config['max_delay'])
        
        # 添加抖动
        if self.retry_config['jitter']:
            jitter = random.uniform(0.1, 0.3) * delay
            delay += jitter
        
        return delay
    
    def process_request(self, request, spider):
        """
        处理请求,包含错误处理
        """
        if not (request.meta.get('use_selenium') or request.meta.get('use_playwright')):
            return None
        
        # 检查重试次数
        retry_count = request.meta.get('retry_times', 0)
        
        if retry_count >= self.retry_config['max_retries']:
            spider.logger.error(f"Max retries exceeded for {request.url}")
            return None
        
        # 在这里处理实际的浏览器请求
        # 由于我们无法在这里执行实际的浏览器操作,
        # 我们返回None让其他中间件处理,但如果发生错误,
        # 我们会在process_exception中处理重试逻辑
        
        return None
    
    def process_exception(self, request, exception, spider):
        """
        处理异常,实现重试逻辑
        """
        if not (request.meta.get('use_selenium') or request.meta.get('use_playwright')):
            return None
        
        retry_count = request.meta.get('retry_times', 0)
        
        if retry_count >= self.retry_config['max_retries']:
            spider.logger.error(f"Failed to process {request.url} after {retry_count} retries: {str(exception)}")
            return None
        
        # 计算重试延迟
        delay = self.calculate_retry_delay(retry_count + 1)
        
        spider.logger.warning(f"Retrying {request.url} in {delay:.2f}s (attempt {retry_count + 1}/{self.retry_config['max_retries']}): {str(exception)}")
        
        # 创建重试请求
        retry_request = request.copy()
        retry_request.meta['retry_times'] = retry_count + 1
        retry_request.meta['retry_delay'] = delay
        retry_request.dont_filter = True  # 允许重复请求
        
        # 延迟调度
        time.sleep(delay)
        
        return retry_request

class ComprehensiveErrorMiddleware:
    """
    综合错误处理中间件
    """
    
    def __init__(self):
        self.error_patterns = {
            'timeout': ['timeout', 'timed out', 'Timeout'],
            'connection': ['connection', 'Connection', 'refused', 'Refused'],
            'browser_crash': ['Browser closed', 'browser closed', 'crashed'],
            'javascript_error': ['JavaScript', 'script', 'eval'],
            'captcha': ['captcha', 'verification', 'verify']
        }
        self.error_handlers = {
            'timeout': self._handle_timeout,
            'connection': self._handle_connection_error,
            'browser_crash': self._handle_browser_crash,
            'javascript_error': self._handle_javascript_error,
            'captcha': self._handle_captcha
        }
    
    def categorize_error(self, error_message: str) -> str:
        """
        分类错误
        """
        error_message_lower = error_message.lower()
        
        for category, patterns in self.error_patterns.items():
            if any(pattern.lower() in error_message_lower for pattern in patterns):
                return category
        
        return 'unknown'
    
    def _handle_timeout(self, request, spider, error):
        """
        处理超时错误
        """
        spider.logger.info(f"Handling timeout for {request.url}")
        
        # 增加超时时间
        new_request = request.copy()
        new_request.meta['timeout'] = request.meta.get('timeout', 30) + 10
        new_request.dont_filter = True
        
        return new_request
    
    def _handle_connection_error(self, request, spider, error):
        """
        处理连接错误
        """
        spider.logger.info(f"Handling connection error for {request.url}")
        
        # 可能需要更换IP或代理
        new_request = request.copy()
        new_request.meta['need_new_ip'] = True
        new_request.dont_filter = True
        
        return new_request
    
    def _handle_browser_crash(self, request, spider, error):
        """
        处理浏览器崩溃
        """
        spider.logger.info(f"Handling browser crash for {request.url}")
        
        # 重新初始化浏览器
        new_request = request.copy()
        new_request.meta['reinit_browser'] = True
        new_request.dont_filter = True
        
        return new_request
    
    def _handle_javascript_error(self, request, spider, error):
        """
        处理JavaScript错误
        """
        spider.logger.info(f"Handling JS error for {request.url}")
        
        # 尝试不执行JavaScript
        new_request = request.copy()
        new_request.meta['disable_javascript'] = True
        new_request.dont_filter = True
        
        return new_request
    
    def _handle_captcha(self, request, spider, error):
        """
        处理验证码
        """
        spider.logger.info(f"Captcha detected for {request.url}")
        
        # 标记需要人工处理或跳过
        spider.logger.warning(f"CAPTCHA detected at {request.url}, skipping...")
        return None
    
    def process_exception(self, request, exception, spider):
        """
        处理异常
        """
        if not (request.meta.get('use_selenium') or request.meta.get('use_playwright')):
            return None
        
        error_category = self.categorize_error(str(exception))
        handler = self.error_handlers.get(error_category)
        
        if handler:
            return handler(request, spider, exception)
        else:
            # 默认处理方式
            retry_times = request.meta.get('retry_times', 0)
            if retry_times < 3:
                new_request = request.copy()
                new_request.meta['retry_times'] = retry_times + 1
                new_request.dont_filter = True
                return new_request
        
        return None

异步处理方案

异步浏览器中间件

import asyncio
import concurrent.futures
from typing import Optional
from scrapy.http import HtmlResponse
from playwright.async_api import async_playwright

class AsyncPlaywrightMiddleware:
    """
    异步Playwright中间件
    """
    
    def __init__(self):
        self.playwright = None
        self.browser = None
        self.semaphore = asyncio.Semaphore(3)  # 限制并发数
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    
    async def setup_browser(self):
        """
        异步设置浏览器
        """
        self.playwright = await async_playwright().start()
        self.browser = await self.playwright.chromium.launch(
            headless=True,
            args=['--no-sandbox', '--disable-dev-shm-usage']
        )
    
    async def process_request_async(self, request, spider):
        """
        异步处理请求
        """
        if not request.meta.get('use_playwright'):
            return None
        
        async with self.semaphore:  # 限制并发
            try:
                page = await self.browser.new_page()
                
                # 设置超时
                page.set_default_timeout(30000)
                
                # 访问页面
                await page.goto(request.url, wait_until="networkidle")
                
                # 获取内容
                content = await page.content()
                
                # 关闭页面
                await page.close()
                
                return HtmlResponse(
                    url=request.url,
                    body=content.encode('utf-8'),
                    encoding='utf-8',
                    request=request
                )
            except Exception as e:
                spider.logger.error(f"Async Playwright error: {str(e)}")
                return request
    
    def process_request(self, request, spider):
        """
        同步包装器
        """
        if request.meta.get('use_playwright'):
            # 在事件循环中运行异步方法
            try:
                loop = asyncio.get_event_loop()
            except RuntimeError:
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
            
            return loop.run_until_complete(self.process_request_async(request, spider))
        
        return None
    
    def spider_closed(self, spider):
        """
        清理资源
        """
        if self.browser:
            async def close_browser():
                await self.browser.close()
            
            try:
                loop = asyncio.get_event_loop()
                loop.run_until_complete(close_browser())
            except:
                pass
        
        if self.playwright:
            async def stop_playwright():
                await self.playwright.stop()
            
            try:
                loop = asyncio.get_event_loop()
                loop.run_until_complete(stop_playwright())
            except:
                pass
        
        self.executor.shutdown(wait=True)

class AsyncSeleniumMiddleware:
    """
    异步Selenium中间件(使用线程池)
    """
    
    def __init__(self):
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
        self.semaphore = threading.Semaphore(3)
    
    def process_request(self, request, spider):
        """
        处理Selenium请求(异步化)
        """
        if not request.meta.get('use_selenium'):
            return None
        
        # 使用线程池异步处理
        future = self.executor.submit(self._sync_process_request, request, spider)
        
        try:
            # 设置超时
            result = future.result(timeout=60)  # 60秒超时
            return result
        except concurrent.futures.TimeoutError:
            spider.logger.error(f"Selenium request timed out: {request.url}")
            return request
        except Exception as e:
            spider.logger.error(f"Selenium async error: {str(e)}")
            return request
    
    def _sync_process_request(self, request, spider):
        """
        同步处理请求(在单独线程中运行)
        """
        with self.semaphore:  # 限制并发
            from selenium import webdriver
            from selenium.webdriver.chrome.options import Options
            
            chrome_options = Options()
            chrome_options.add_argument('--headless')
            chrome_options.add_argument('--no-sandbox')
            chrome_options.add_argument('--disable-dev-shm-usage')
            
            driver = webdriver.Chrome(options=chrome_options)
            
            try:
                driver.get(request.url)
                time.sleep(2)  # 等待页面加载
                
                body = driver.page_source
                return HtmlResponse(
                    url=request.url,
                    body=body.encode('utf-8'),
                    encoding='utf-8',
                    request=request
                )
            except Exception as e:
                spider.logger.error(f"Selenium sync error: {str(e)}")
                return request
            finally:
                driver.quit()
    
    def spider_closed(self, spider):
        """
        清理资源
        """
        self.executor.shutdown(wait=True)

容器化部署

Docker配置

# Dockerfile
FROM python:3.9-slim

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    wget \
    unzip \
    xvfb \
    x11-utils \
    x11-xserver-utils \
    xdg-utils \
    libnss3 \
    libatk-bridge2.0-0 \
    libdrm2 \
    libxkbcommon0 \
    libxcomposite1 \
    libxdamage1 \
    libxrandr2 \
    libgbm1 \
    libxss1 \
    libasound2 \
    gnupg \
    ca-certificates \
    fonts-liberation \
    libappindicator3-1 \
    libsecret-1-0 \
    curl \
    && rm -rf /var/lib/apt/lists/*

# 安装Chrome
RUN wget -q -O - https://dl.google.com/linux/linux_signing_key.pub | apt-key add - \
    && echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google-chrome.list \
    && apt-get update \
    && apt-get install -y google-chrome-stable

# 设置工作目录
WORKDIR /app

# 复制依赖文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 创建非root用户
RUN groupadd -r selenium && useradd -r -g selenium selenium

# 设置权限
RUN chown -R selenium:selenium /app
USER selenium

# 启动命令
CMD ["scrapy", "crawl", "your_spider"]

容器化部署配置

# docker_compose_config.py
DOCKER_COMPOSE_CONFIG = {
    'version': '3.8',
    'services': {
        'scraper': {
            'build': '.',
            'volumes': [
                './data:/app/data',
                '/dev/shm:/dev/shm'  # 共享内存,提高性能
            ],
            'environment': [
                'SCRAPY_SETTINGS_MODULE=settings',
                'PLAYWRIGHT_BROWSERS_PATH=/app/.cache/ms-playwright'
            ],
            'shm_size': '2gb',  # 增加共享内存大小
            'restart': 'unless-stopped',
            'logging': {
                'driver': 'json-file',
                'options': {
                    'max-size': '10m',
                    'max-file': '3'
                }
            }
        }
    }
}

class ContainerizedMiddleware:
    """
    容器化部署中间件
    """
    
    def __init__(self):
        self.in_container = self._is_running_in_container()
        self.container_resources = self._get_container_resources()
    
    def _is_running_in_container(self) -> bool:
        """
        检查是否在容器中运行
        """
        try:
            with open('/proc/1/cgroup', 'r') as f:
                return 'docker' in f.read() or 'containerd' in f.read()
        except:
            return False
    
    def _get_container_resources(self) -> dict:
        """
        获取容器资源限制
        """
        resources = {
            'memory_limit': None,
            'cpu_limit': None
        }
        
        try:
            # 读取内存限制
            with open('/sys/fs/cgroup/memory/memory.limit_in_bytes', 'r') as f:
                memory_limit = int(f.read().strip())
                if memory_limit < 9223372036854771712:  # 不是默认的大数值
                    resources['memory_limit'] = memory_limit
            
            # 读取CPU份额
            with open('/sys/fs/cgroup/cpu/cpu.shares', 'r') as f:
                resources['cpu_limit'] = int(f.read().strip())
        except:
            pass
        
        return resources
    
    def process_request(self, request, spider):
        """
        处理请求,考虑容器资源限制
        """
        if not (request.meta.get('use_selenium') or request.meta.get('use_playwright')):
            return None
        
        # 根据容器资源调整行为
        if self.in_container:
            # 在容器中运行,可能需要更保守的资源使用
            if request.meta.get('selenium_wait_for'):
                # 增加等待时间以应对可能的性能限制
                original_wait = request.meta['selenium_wait_for']
                request.meta['selenium_wait_for'] = original_wait + 2
        
        return None

常见问题与解决方案

问题1: 浏览器启动失败

现象: ChromeDriver或Playwright无法启动浏览器 解决方案:

# Selenium解决方案
from selenium import webdriver
from selenium.webdriver.chrome.options import Options

def create_driver_with_fallback():
    """
    创建驱动的降级方案
    """
    chrome_options = Options()
    chrome_options.add_argument('--no-sandbox')
    chrome_options.add_argument('--disable-dev-shm-usage')
    chrome_options.add_argument('--disable-gpu')
    chrome_options.add_argument('--remote-debugging-port=9222')
    chrome_options.add_argument('--disable-extensions')
    chrome_options.add_argument('--disable-plugins')
    
    try:
        driver = webdriver.Chrome(options=chrome_options)
        return driver
    except Exception as e:
        print(f"Chrome failed: {e}")
        # 尝试使用无沙盒模式
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        try:
            driver = webdriver.Chrome(options=chrome_options)
            return driver
        except Exception as e2:
            print(f"Chrome fallback failed: {e2}")
            return None

问题2: 内存泄漏

现象: 长时间运行后内存使用不断增加 解决方案:

class MemoryEfficientMiddleware:
    """
    内存高效的中间件
    """
    
    def __init__(self):
        self.driver_pool = []
        self.max_pool_size = 5
    
    def process_request(self, request, spider):
        """
        处理请求,注意内存管理
        """
        if request.meta.get('use_selenium'):
            driver = self._get_driver()
            if driver:
                try:
                    driver.get(request.url)
                    content = driver.page_source
                    response = HtmlResponse(
                        url=request.url,
                        body=content.encode('utf-8'),
                        encoding='utf-8',
                        request=request
                    )
                    self._return_driver(driver)
                    return response
                except Exception as e:
                    spider.logger.error(f"Memory efficient error: {e}")
                    self._dispose_driver(driver)  # 出错时直接销毁
                    return request
    
    def _get_driver(self):
        """
        获取驱动
        """
        if self.driver_pool:
            return self.driver_pool.pop()
        else:
            # 创建新驱动
            from selenium import webdriver
            from selenium.webdriver.chrome.options import Options
            options = Options()
            options.add_argument('--headless')
            options.add_argument('--no-sandbox')
            options.add_argument('--disable-dev-shm-usage')
            return webdriver.Chrome(options=options)
    
    def _return_driver(self, driver):
        """
        归还驱动
        """
        if len(self.driver_pool) < self.max_pool_size:
            self.driver_pool.append(driver)
        else:
            self._dispose_driver(driver)
    
    def _dispose_driver(self, driver):
        """
        销毁驱动
        """
        try:
            driver.quit()
        except:
            pass

问题3: 反爬虫检测

现象: 被网站识别为自动化工具 解决方案:

class AntiDetectionMiddleware:
    """
    反检测中间件
    """
    
    def process_request(self, request, spider):
        """
        处理请求,减少被检测风险
        """
        if request.meta.get('use_selenium'):
            # 在这里实现反检测措施
            # 1. 随机化请求间隔
            import time
            import random
            time.sleep(random.uniform(1, 3))
            
            # 2. 设置真实的用户代理
            # 3. 模拟人类行为
            pass
        
        return None

问题4: 性能瓶颈

现象: 处理速度慢,成为爬虫瓶颈 解决方案:

class HighPerformanceMiddleware:
    """
    高性能中间件
    """
    
    def __init__(self):
        self.cache = {}  # 简单缓存
        self.max_cache_size = 100
    
    def process_request(self, request, spider):
        """
        高性能处理请求
        """
        # 检查缓存
        url_hash = hash(request.url)
        if url_hash in self.cache:
            cached_response = self.cache[url_hash]
            return HtmlResponse(
                url=request.url,
                body=cached_response,
                encoding='utf-8',
                request=request
            )
        
        # 处理请求...
        # 并缓存结果
        
        return None

最佳实践建议

选择合适的工具

  1. Selenium: 适合复杂交互,有大量现成的解决方案
  2. Playwright: 适合现代化Web应用,性能更好
  3. 优先级: 静态内容 → API接口 → Selenium/Playwright

性能优化

  1. 资源复用: 重用浏览器实例和页面
  2. 并发控制: 限制同时运行的浏览器数量
  3. 缓存策略: 对相同URL进行缓存
  4. 错误处理: 完善的异常处理和重试机制

💡 核心要点: Selenium和Playwright是处理JavaScript渲染内容的强大工具,但也是性能瓶颈。合理使用缓存、连接池和错误处理机制,可以显著提升爬虫的整体性能。


SEO优化建议

为了提高这篇Selenium/Playwright集成教程在搜索引擎中的排名,以下是几个关键的SEO优化建议:

标题优化

  • 主标题: 包含核心关键词"Selenium", "Playwright", "JavaScript渲染", "浏览器自动化"
  • 二级标题: 每个章节标题都包含相关的长尾关键词
  • H1-H6层次结构: 保持正确的标题层级,便于搜索引擎理解内容结构

内容优化

  • 关键词密度: 在内容中自然地融入关键词如"Scrapy", "Selenium", "Playwright", "JavaScript渲染", "动态页面", "浏览器自动化", "反检测", "爬虫优化"等
  • 元描述: 在文章开头的元数据中包含吸引人的描述
  • 内部链接: 链接到其他相关教程,如Downloader Middleware
  • 外部权威链接: 引用官方文档和权威资源

技术SEO

  • 页面加载速度: 优化代码块和图片加载
  • 移动端适配: 确保在移动设备上良好显示
  • 结构化数据: 使用适当的HTML标签和语义化元素

用户体验优化

  • 内容可读性: 使用清晰的段落结构和代码示例
  • 互动元素: 提供实际可运行的代码示例
  • 更新频率: 定期更新内容以保持时效性

🔗 相关教程推荐

🏷️ 标签云: Scrapy Selenium Playwright JavaScript渲染 动态页面 浏览器自动化 反检测 爬虫优化 Web自动化 前端爬虫