#Scrapy与Selenium/Playwright集成完全指南 - JavaScript动态渲染处理与浏览器自动化技术详解
📂 所属阶段:第三阶段 — 攻防演练(中间件与反爬篇)
🔗 相关章节:Downloader Middleware · 反爬对抗实战 · 数据去重与增量更新
#目录
- Selenium与Playwright概述
- Selenium集成方案
- Playwright集成方案
- 浏览器驱动管理
- 反检测策略
- 性能优化技巧
- 资源管理与生命周期
- 错误处理与重试机制
- 异步处理方案
- 容器化部署
- 常见问题与解决方案
- SEO优化建议
#Selenium与Playwright概述
Selenium和Playwright是处理JavaScript动态渲染页面的两大主流工具,它们能够在Scrapy爬虫中处理复杂的前端交互场景。
#Selenium与Playwright对比
"""
Selenium vs Playwright 对比:
Selenium:
- 优势:成熟稳定,社区庞大,支持多种浏览器
- 劣势:启动较慢,资源占用高,API较为复杂
- 适用场景:已有Selenium代码,复杂浏览器兼容性需求
Playwright:
- 优势:启动快,性能好,API简洁,内置等待机制
- 劣势:相对较新,某些功能仍在完善
- 适用场景:新项目,高性能要求,现代化爬虫
"""#何时使用浏览器自动化
"""
需要使用浏览器自动化的场景:
1. JavaScript动态加载内容
2. 单页应用(SPA)内容获取
3. 复杂用户交互模拟
4. Canvas/WebGL内容抓取
5. 验证码处理
6. 动态表单提交
7. WebSocket通信监控
"""#Selenium集成方案
#基础Selenium集成
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from scrapy.http import HtmlResponse
import time
class SeleniumMiddleware:
"""
基础Selenium中间件
"""
def __init__(self):
self.driver = self._create_driver()
def _create_driver(self):
"""
创建Chrome驱动
"""
chrome_options = Options()
chrome_options.add_argument('--headless') # 无头模式
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
driver = webdriver.Chrome(options=chrome_options)
# 隐藏webdriver属性
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
return driver
def process_request(self, request, spider):
"""
处理需要Selenium的请求
"""
if request.meta.get('use_selenium'):
try:
self.driver.get(request.url)
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
# 等待动态内容加载
time.sleep(2) # 简单等待,实际应用中应使用更智能的方法
body = self.driver.page_source.encode('utf-8')
return HtmlResponse(
url=request.url,
body=body,
encoding='utf-8',
request=request
)
except Exception as e:
spider.logger.error(f"Selenium error for {request.url}: {str(e)}")
# 返回原始请求以便重试
return request
def spider_closed(self, spider):
"""
爬虫关闭时清理资源
"""
if self.driver:
self.driver.quit()#高级Selenium集成
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
from scrapy.http import HtmlResponse
import time
import random
class AdvancedSeleniumMiddleware:
"""
高级Selenium中间件
"""
def __init__(self):
self.drivers = [] # 多驱动管理
self.max_drivers = 5 # 最大驱动数
self.create_initial_drivers()
def create_initial_drivers(self):
"""
创建初始驱动池
"""
for _ in range(self.max_drivers):
driver = self._create_driver()
self.drivers.append(driver)
def _create_driver(self):
"""
创建配置优化的Chrome驱动
"""
chrome_options = Options()
# 基础选项
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--remote-debugging-port=9222')
# 反检测选项
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
# 性能优化选项
chrome_options.add_argument('--disable-background-timer-throttling')
chrome_options.add_argument('--disable-renderer-backgrounding')
chrome_options.add_argument('--disable-extensions')
chrome_options.add_argument('--disable-plugins')
chrome_options.add_argument('--disable-images')
chrome_options.add_argument('--disable-javascript')
chrome_options.add_argument('--blink-settings=imagesEnabled=false')
driver = webdriver.Chrome(options=chrome_options)
# 隐藏webdriver属性
driver.execute_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
""")
return driver
def get_available_driver(self):
"""
获取可用驱动
"""
if self.drivers:
return self.drivers.pop(0)
else:
# 如果没有可用驱动,创建新的
return self._create_driver()
def return_driver(self, driver):
"""
归还驱动到池中
"""
if len(self.drivers) < self.max_drivers:
self.drivers.append(driver)
else:
# 超过最大数量,直接关闭
driver.quit()
def process_request(self, request, spider):
"""
处理Selenium请求
"""
if not request.meta.get('use_selenium'):
return None
driver = self.get_available_driver()
try:
# 设置页面加载超时
driver.set_page_load_timeout(30)
# 访问页面
driver.get(request.url)
# 执行自定义JavaScript(如果需要)
js_scripts = request.meta.get('selenium_js_scripts', [])
for script in js_scripts:
driver.execute_script(script)
# 等待特定元素加载
wait_selector = request.meta.get('selenium_wait_for', None)
if wait_selector:
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, wait_selector)))
else:
# 默认等待页面加载
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# 滚动页面以触发懒加载
scroll_pause_time = request.meta.get('selenium_scroll_pause', 1)
scroll_times = request.meta.get('selenium_scroll_times', 0)
for _ in range(scroll_times):
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(scroll_pause_time)
# 点击按钮或执行交互
click_selectors = request.meta.get('selenium_click_selectors', [])
for selector in click_selectors:
try:
element = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, selector))
)
element.click()
time.sleep(1) # 等待点击后的响应
except:
spider.logger.warning(f"Could not click element: {selector}")
# 获取页面源码
body = driver.page_source.encode('utf-8')
# 创建响应对象
response = HtmlResponse(
url=request.url,
body=body,
encoding='utf-8',
request=request
)
# 归还驱动
self.return_driver(driver)
return response
except Exception as e:
spider.logger.error(f"Selenium error for {request.url}: {str(e)}")
# 确保驱动被归还
self.return_driver(driver)
# 返回原始请求以便重试或其他处理
return request
def spider_closed(self, spider):
"""
爬虫关闭时清理所有驱动
"""
for driver in self.drivers:
try:
driver.quit()
except:
pass
self.drivers.clear()#Playwright集成方案
#基础Playwright集成
from playwright.sync_api import sync_playwright
from scrapy.http import HtmlResponse
import asyncio
from concurrent.futures import ThreadPoolExecutor
class PlaywrightMiddleware:
"""
基础Playwright中间件
"""
def __init__(self):
self.playwright = None
self.browser = None
self.setup_browser()
def setup_browser(self):
"""
设置Playwright浏览器
"""
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(
headless=True,
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-accelerated-2d-canvas',
'--no-first-run',
'--no-zygote',
'--disable-gpu'
]
)
def process_request(self, request, spider):
"""
处理需要Playwright的请求
"""
if request.meta.get('use_playwright'):
try:
page = self.browser.new_page()
# 设置用户代理
page.set_extra_http_headers({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
})
# 访问页面
response = page.goto(request.url, wait_until="networkidle")
# 等待动态内容加载
page.wait_for_load_state("domcontentloaded")
# 获取页面内容
content = page.content()
# 关闭页面
page.close()
return HtmlResponse(
url=request.url,
body=content.encode('utf-8'),
encoding='utf-8',
request=request
)
except Exception as e:
spider.logger.error(f"Playwright error for {request.url}: {str(e)}")
return request
def spider_closed(self, spider):
"""
爬虫关闭时清理资源
"""
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()#高级Playwright集成
from playwright.sync_api import sync_playwright, Browser
from scrapy.http import HtmlResponse
import time
import random
from typing import Optional, Dict, Any
from dataclasses import dataclass
@dataclass
class BrowserConfig:
"""
浏览器配置类
"""
headless: bool = True
user_agent: str = ""
viewport: Dict[str, int] = None
locale: str = "en-US"
timezone_id: str = "Asia/Shanghai"
geolocation: Dict[str, float] = None
permissions: list = None
extra_http_headers: Dict[str, str] = None
def __post_init__(self):
if self.viewport is None:
self.viewport = {"width": 1920, "height": 1080}
if self.geolocation is None:
self.geolocation = {"longitude": 121.4737, "latitude": 31.2304}
if self.permissions is None:
self.permissions = ["geolocation", "notifications"]
class AdvancedPlaywrightMiddleware:
"""
高级Playwright中间件
"""
def __init__(self):
self.playwright = None
self.browser = None
self.contexts = [] # 浏览器上下文池
self.max_contexts = 10
self.setup_environment()
def setup_environment(self):
"""
设置Playwright环境
"""
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(
headless=True,
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-accelerated-2d-canvas',
'--no-first-run',
'--no-zygote',
'--disable-gpu',
'--disable-web-security',
'--disable-features=VizDisplayCompositor'
]
)
# 创建初始上下文池
for _ in range(3):
context = self._create_context()
self.contexts.append(context)
def _create_context(self, config: Optional[BrowserConfig] = None):
"""
创建浏览器上下文
"""
if config is None:
config = BrowserConfig()
context = self.browser.new_context(
user_agent=config.user_agent or "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
viewport=config.viewport,
locale=config.locale,
timezone_id=config.timezone_id,
geolocation=config.geolocation,
permissions=config.permissions,
extra_http_headers=config.extra_http_headers or {},
java_script_enabled=True,
)
# 添加反检测脚本
context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined,
});
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5],
});
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en'],
});
""")
return context
def get_available_context(self):
"""
获取可用的浏览器上下文
"""
if self.contexts:
return self.contexts.pop(0)
else:
# 如果没有可用上下文,创建新的
return self._create_context()
def return_context(self, context):
"""
归还浏览器上下文
"""
if len(self.contexts) < self.max_contexts:
# 清理上下文中的页面
for page in context.pages:
try:
page.close()
except:
pass
self.contexts.append(context)
else:
# 超过最大数量,直接关闭
context.close()
def process_request(self, request, spider):
"""
处理Playwright请求
"""
if not request.meta.get('use_playwright'):
return None
context = self.get_available_context()
try:
# 创建新页面
page = context.new_page()
# 设置请求拦截(如果需要)
block_resources = request.meta.get('playwright_block_resources', [])
if block_resources:
def handle_route(route):
if any(resource in route.request.url for resource in block_resources):
route.abort()
else:
route.continue_()
page.route("**/*", handle_route)
# 访问页面
page.goto(request.url, wait_until="networkidle")
# 执行自定义脚本
custom_scripts = request.meta.get('playwright_custom_scripts', [])
for script in custom_scripts:
page.evaluate(script)
# 等待特定元素
wait_for_selector = request.meta.get('playwright_wait_for_selector', None)
if wait_for_selector:
page.wait_for_selector(wait_for_selector, timeout=10000)
# 滚动页面
scroll_pause = request.meta.get('playwright_scroll_pause', 0)
if scroll_pause > 0:
# 模拟滚动
for _ in range(3):
page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
page.wait_for_timeout(scroll_pause * 1000)
# 点击元素
click_selectors = request.meta.get('playwright_click_selectors', [])
for selector in click_selectors:
try:
page.click(selector)
page.wait_for_timeout(1000) # 等待点击响应
except:
spider.logger.warning(f"Could not click element: {selector}")
# 获取内容
content = page.content()
# 关闭页面
page.close()
# 归还上下文
self.return_context(context)
return HtmlResponse(
url=request.url,
body=content.encode('utf-8'),
encoding='utf-8',
request=request
)
except Exception as e:
spider.logger.error(f"Playwright error for {request.url}: {str(e)}")
# 确保上下文被归还
self.return_context(context)
return request
def spider_closed(self, spider):
"""
爬虫关闭时清理资源
"""
for context in self.contexts:
try:
context.close()
except:
pass
self.contexts.clear()
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()#浏览器驱动管理
#智能驱动管理器
import subprocess
import psutil
import os
import time
from typing import List, Optional
from dataclasses import dataclass
@dataclass
class DriverInfo:
"""
驱动信息类
"""
pid: int
name: str
memory_usage: float
cpu_usage: float
uptime: float
status: str
class DriverManager:
"""
浏览器驱动管理器
"""
def __init__(self, max_memory_percent=80.0, max_cpu_percent=80.0):
self.max_memory_percent = max_memory_percent
self.max_cpu_percent = max_cpu_percent
self.managed_processes = set()
def monitor_system_resources(self):
"""
监控系统资源使用情况
"""
memory_percent = psutil.virtual_memory().percent
cpu_percent = psutil.cpu_percent(interval=1)
return {
'memory_percent': memory_percent,
'cpu_percent': cpu_percent,
'available_memory': psutil.virtual_memory().available,
'total_memory': psutil.virtual_memory().total
}
def get_browser_processes(self) -> List[DriverInfo]:
"""
获取浏览器相关进程信息
"""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'memory_percent', 'cpu_percent']):
try:
if any(browser in proc.info['name'].lower()
for browser in ['chrome', 'chromium', 'firefox', 'edge']):
process = psutil.Process(proc.info['pid'])
uptime = time.time() - process.create_time()
info = DriverInfo(
pid=proc.info['pid'],
name=proc.info['name'],
memory_usage=proc.info['memory_percent'],
cpu_usage=proc.info['cpu_percent'],
uptime=uptime,
status=process.status()
)
processes.append(info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return processes
def cleanup_hanging_processes(self):
"""
清理挂起的浏览器进程
"""
browser_processes = self.get_browser_processes()
cleaned_count = 0
for proc_info in browser_processes:
try:
process = psutil.Process(proc_info.pid)
# 如果进程挂起或资源使用过高,终止它
if (proc_info.status == psutil.STATUS_ZOMBIE or
proc_info.memory_usage > 50.0 or # 内存使用超过50%
proc_info.uptime > 3600): # 运行超过1小时
process.terminate()
process.wait(timeout=5)
cleaned_count += 1
print(f"Terminated hanging process: {proc_info.name} (PID: {proc_info.pid})")
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.TimeoutExpired):
continue
return cleaned_count
def check_system_health(self) -> bool:
"""
检查系统健康状况
"""
resources = self.monitor_system_resources()
if (resources['memory_percent'] > self.max_memory_percent or
resources['cpu_percent'] > self.max_cpu_percent):
return False
return True
def optimize_system_resources(self):
"""
优化系统资源
"""
# 清理挂起进程
cleaned = self.cleanup_hanging_processes()
# 检查是否需要限制并发
if not self.check_system_health():
print("System resources are under pressure, consider reducing concurrency")
return cleaned
class ResourceAwareMiddleware:
"""
资源感知中间件
"""
def __init__(self):
self.driver_manager = DriverManager()
self.request_queue = []
self.active_requests = 0
self.max_active_requests = 5
def process_request(self, request, spider):
"""
处理请求,考虑系统资源
"""
if not (request.meta.get('use_selenium') or request.meta.get('use_playwright')):
return None
# 检查系统健康状况
if not self.driver_manager.check_system_health():
# 系统资源紧张,将请求加入队列
self.request_queue.append(request)
spider.logger.info(f"System under pressure, queuing request: {request.url}")
return None
# 检查活跃请求数量
if self.active_requests >= self.max_active_requests:
self.request_queue.append(request)
spider.logger.info(f"Max active requests reached, queuing request: {request.url}")
return None
# 允许请求继续
self.active_requests += 1
return None
def process_response(self, request, response, spider):
"""
处理响应,更新活跃请求数量
"""
if (request.meta.get('use_selenium') or request.meta.get('use_playwright')):
self.active_requests = max(0, self.active_requests - 1)
# 尝试处理队列中的请求
if (self.request_queue and
self.active_requests < self.max_active_requests and
self.driver_manager.check_system_health()):
queued_request = self.request_queue.pop(0)
self.active_requests += 1
# 重新调度请求
return queued_request
return response
def spider_idle(self, spider):
"""
爬虫空闲时处理队列
"""
if (self.request_queue and
self.active_requests < self.max_active_requests and
self.driver_manager.check_system_health()):
queued_request = self.request_queue.pop(0)
self.active_requests += 1
spider.crawler.engine.schedule(queued_request, spider)#反检测策略
#反检测配置
import random
import string
from typing import List, Dict, Any
class AntiDetectionConfig:
"""
反检测配置类
"""
@staticmethod
def get_stealth_args() -> List[str]:
"""
获取反检测启动参数
"""
return [
'--disable-blink-features=AutomationControlled',
'--disable-dev-shm-usage',
'--disable-gpu',
'--disable-extensions',
'--disable-plugins',
'--disable-images',
'--no-sandbox',
'--disable-web-security',
'--allow-running-insecure-content',
'--disable-features=VizDisplayCompositor',
'--disable-ipc-flooding-protection',
'--disable-background-timer-throttling',
'--disable-backgrounding-occluded-windows',
'--disable-renderer-backgrounding'
]
@staticmethod
def get_stealth_script() -> str:
"""
获取反检测JavaScript脚本
"""
return """
// 隐藏webdriver属性
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined,
});
// 模拟插件
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5],
});
// 模拟语言
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'zh', 'en'],
});
// 模拟平台
Object.defineProperty(navigator, 'platform', {
get: () => 'Win32',
});
// 模拟用户代理
Object.defineProperty(navigator, 'userAgent', {
get: () => navigator.userAgent.replace(/HeadlessChrome/i, 'Chrome'),
});
// 隐藏Chrome属性
Object.defineProperty(window, 'chrome', {
value: new Proxy({}, {
get(target, prop) {
if (prop === 'runtime') return {};
return target[prop];
}
}),
writable: false,
});
// 隐藏eval toString
const originalToString = Function.prototype.toString;
Function.prototype.toString = function() {
if (this === window.cdc_adoQpoasnfa76pfcZLmcfl_Array) {
return 'function Array() { [native code] }';
}
return originalToString.call(this);
};
"""
@staticmethod
def get_realistic_user_agents() -> List[str]:
"""
获取真实的用户代理字符串
"""
return [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15"
]
class StealthSeleniumMiddleware:
"""
隐蔽Selenium中间件
"""
def __init__(self):
self.config = AntiDetectionConfig()
self.driver = self._create_stealth_driver()
def _create_stealth_driver(self):
"""
创建隐蔽驱动
"""
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
chrome_options = Options()
# 添加反检测参数
for arg in self.config.get_stealth_args():
chrome_options.add_argument(arg)
# 设置随机用户代理
user_agent = random.choice(self.config.get_realistic_user_agents())
chrome_options.add_argument(f'--user-agent={user_agent}')
# 设置窗口大小随机化
window_sizes = [
{"width": 1366, "height": 768},
{"width": 1920, "height": 1080},
{"width": 1440, "height": 900},
{"width": 1536, "height": 864}
]
size = random.choice(window_sizes)
chrome_options.add_argument(f'--window-size={size["width"]},{size["height"]}')
driver = webdriver.Chrome(options=chrome_options)
# 执行反检测脚本
driver.execute_script(self.config.get_stealth_script())
return driver
def process_request(self, request, spider):
"""
处理请求
"""
if request.meta.get('use_selenium'):
try:
self.driver.get(request.url)
# 模拟人类行为
self._simulate_human_behavior()
body = self.driver.page_source.encode('utf-8')
return HtmlResponse(
url=request.url,
body=body,
encoding='utf-8',
request=request
)
except Exception as e:
spider.logger.error(f"Stealth Selenium error: {str(e)}")
return request
def _simulate_human_behavior(self):
"""
模拟人类行为
"""
import time
from selenium.webdriver.common.action_chains import ActionChains
# 随机移动鼠标
actions = ActionChains(self.driver)
actions.move_by_offset(random.randint(-10, 10), random.randint(-10, 10))
actions.perform()
# 随机等待时间
time.sleep(random.uniform(1, 3))
def spider_closed(self, spider):
"""
清理资源
"""
if self.driver:
self.driver.quit()
class StealthPlaywrightMiddleware:
"""
隐蔽Playwright中间件
"""
def __init__(self):
self.config = AntiDetectionConfig()
self.playwright = None
self.browser = None
self.setup_stealth_browser()
def setup_stealth_browser(self):
"""
设置隐蔽浏览器
"""
from playwright.sync_api import sync_playwright
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(
headless=True,
args=self.config.get_stealth_args()
)
def process_request(self, request, spider):
"""
处理请求
"""
if request.meta.get('use_playwright'):
try:
# 随机选择用户代理
user_agent = random.choice(self.config.get_realistic_user_agents())
context = self.browser.new_context(
user_agent=user_agent,
viewport={"width": 1920, "height": 1080},
locale="zh-CN",
timezone_id="Asia/Shanghai"
)
# 添加反检测脚本
context.add_init_script(self.config.get_stealth_script())
page = context.new_page()
# 模拟人类行为
self._simulate_human_behavior(page)
page.goto(request.url, wait_until="networkidle")
content = page.content()
page.close()
context.close()
return HtmlResponse(
url=request.url,
body=content.encode('utf-8'),
encoding='utf-8',
request=request
)
except Exception as e:
spider.logger.error(f"Stealth Playwright error: {str(e)}")
return request
def _simulate_human_behavior(self, page):
"""
模拟人类行为
"""
import time
# 随机等待
page.wait_for_timeout(random.randint(1000, 3000))
# 模拟滚动
page.evaluate(f"window.scrollTo(0, {random.randint(100, 500)});")
def spider_closed(self, spider):
"""
清理资源
"""
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()#性能优化技巧
#性能优化配置
import time
from typing import Dict, Any, Optional
from dataclasses import dataclass
@dataclass
class PerformanceConfig:
"""
性能配置类
"""
# 资源限制
max_pages: int = 10
max_contexts: int = 5
max_drivers: int = 3
# 超时设置
page_load_timeout: int = 30
script_timeout: int = 10
wait_timeout: int = 10
# 缓存设置
enable_cache: bool = True
cache_size: int = 100
cache_ttl: int = 300 # 5分钟
# 并发设置
max_concurrent: int = 2
request_delay: float = 1.0
class PerformanceOptimizedMiddleware:
"""
性能优化中间件
"""
def __init__(self, config: Optional[PerformanceConfig] = None):
self.config = config or PerformanceConfig()
self.page_pool = []
self.context_pool = []
self.driver_pool = []
self.cache = {}
self.active_requests = 0
self.request_timestamps = []
def get_cached_result(self, url: str) -> Optional[str]:
"""
获取缓存结果
"""
if not self.config.enable_cache:
return None
cached = self.cache.get(url)
if cached:
content, timestamp = cached
if time.time() - timestamp < self.config.cache_ttl:
return content
else:
# 缓存过期,删除
del self.cache[url]
return None
def cache_result(self, url: str, content: str):
"""
缓存结果
"""
if not self.config.enable_cache:
return
# 检查缓存大小
if len(self.cache) >= self.config.cache_size:
# 删除最旧的缓存项
oldest_key = min(self.cache.keys(), key=lambda k: self.cache[k][1])
del self.cache[oldest_key]
self.cache[url] = (content, time.time())
def process_request(self, request, spider):
"""
处理请求
"""
if not (request.meta.get('use_selenium') or request.meta.get('use_playwright')):
return None
# 检查缓存
cached_content = self.get_cached_result(request.url)
if cached_content:
spider.logger.info(f"Using cached content for: {request.url}")
return HtmlResponse(
url=request.url,
body=cached_content.encode('utf-8'),
encoding='utf-8',
request=request
)
# 检查并发限制
if self.active_requests >= self.config.max_concurrent:
spider.logger.info(f"Max concurrent limit reached, delaying request: {request.url}")
time.sleep(self.config.request_delay)
self.active_requests += 1
try:
if request.meta.get('use_selenium'):
result = self._handle_selenium_request(request, spider)
else:
result = self._handle_playwright_request(request, spider)
# 缓存结果
if isinstance(result, HtmlResponse):
self.cache_result(request.url, result.text)
return result
finally:
self.active_requests = max(0, self.active_requests - 1)
def _handle_selenium_request(self, request, spider):
"""
处理Selenium请求
"""
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
# 获取或创建驱动
if self.driver_pool:
driver = self.driver_pool.pop()
else:
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
# 性能优化选项
chrome_options.add_argument('--disable-images')
chrome_options.add_argument('--disable-javascript') # 根据需要启用
chrome_options.add_argument('--blink-settings=imagesEnabled=false')
driver = webdriver.Chrome(options=chrome_options)
try:
driver.set_page_load_timeout(self.config.page_load_timeout)
driver.get(request.url)
# 等待内容加载
time.sleep(2) # 根据实际需要调整
content = driver.page_source
return HtmlResponse(
url=request.url,
body=content.encode('utf-8'),
encoding='utf-8',
request=request
)
except Exception as e:
spider.logger.error(f"Selenium error: {str(e)}")
return request
finally:
# 归还驱动到池中
if len(self.driver_pool) < self.config.max_drivers:
self.driver_pool.append(driver)
else:
driver.quit()
def _handle_playwright_request(self, request, spider):
"""
处理Playwright请求
"""
from playwright.sync_api import sync_playwright
# 获取或创建上下文
if self.context_pool:
context = self.context_pool.pop()
else:
playwright = sync_playwright().start()
browser = playwright.chromium.launch(headless=True)
context = browser.new_context()
try:
page = context.new_page()
page.set_default_timeout(self.config.page_load_timeout * 1000)
page.goto(request.url, wait_until="networkidle")
content = page.content()
page.close()
# 归还上下文到池中
if len(self.context_pool) < self.config.max_contexts:
self.context_pool.append(context)
else:
context.close()
return HtmlResponse(
url=request.url,
body=content.encode('utf-8'),
encoding='utf-8',
request=request
)
except Exception as e:
spider.logger.error(f"Playwright error: {str(e)}")
# 确保上下文被关闭
context.close()
return request
def spider_closed(self, spider):
"""
清理资源
"""
# 关闭所有驱动
for driver in self.driver_pool:
try:
driver.quit()
except:
pass
# 关闭所有上下文
for context in self.context_pool:
try:
context.close()
except:
pass#资源管理与生命周期
#资源管理器
import weakref
import gc
import threading
import time
from typing import Dict, Any, Callable
from contextlib import contextmanager
class ResourceManager:
"""
资源管理器
"""
def __init__(self):
self.resources = {}
self.cleanup_callbacks = {}
self.monitoring_thread = None
self.monitoring = False
self.lock = threading.Lock()
def register_resource(self, name: str, resource: Any, cleanup_callback: Callable):
"""
注册资源
"""
with self.lock:
self.resources[name] = weakref.ref(resource, cleanup_callback)
self.cleanup_callbacks[name] = cleanup_callback
def unregister_resource(self, name: str):
"""
注销资源
"""
with self.lock:
if name in self.resources:
del self.resources[name]
if name in self.cleanup_callbacks:
del self.cleanup_callbacks[name]
def cleanup_resources(self):
"""
清理资源
"""
with self.lock:
for name, ref in list(self.resources.items()):
if ref() is None: # 对象已被垃圾回收
self.unregister_resource(name)
def start_monitoring(self, interval: int = 60):
"""
开始监控
"""
self.monitoring = True
self.monitoring_thread = threading.Thread(target=self._monitor_loop, args=(interval,), daemon=True)
self.monitoring_thread.start()
def stop_monitoring(self):
"""
停止监控
"""
self.monitoring = False
if self.monitoring_thread:
self.monitoring_thread.join()
def _monitor_loop(self, interval: int):
"""
监控循环
"""
while self.monitoring:
try:
self.cleanup_resources()
gc.collect() # 强制垃圾回收
time.sleep(interval)
except Exception as e:
print(f"Resource manager monitoring error: {e}")
class LifecycleAwareMiddleware:
"""
生命周期感知中间件
"""
def __init__(self):
self.resource_manager = ResourceManager()
self.active_sessions = {}
self.session_locks = {}
@contextmanager
def session_context(self, session_id: str):
"""
会话上下文管理器
"""
if session_id not in self.session_locks:
self.session_locks[session_id] = threading.Lock()
lock = self.session_locks[session_id]
lock.acquire()
try:
yield
finally:
lock.release()
def process_request(self, request, spider):
"""
处理请求
"""
session_id = request.meta.get('session_id', 'default')
with self.session_context(session_id):
if request.meta.get('use_selenium'):
return self._handle_selenium_session(request, spider, session_id)
elif request.meta.get('use_playwright'):
return self._handle_playwright_session(request, spider, session_id)
return None
def _handle_selenium_session(self, request, spider, session_id: str):
"""
处理Selenium会话
"""
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
# 检查是否已有活跃会话
if session_id in self.active_sessions:
session_info = self.active_sessions[session_id]
driver = session_info.get('driver')
if driver:
try:
driver.get(request.url)
content = driver.page_source
return HtmlResponse(
url=request.url,
body=content.encode('utf-8'),
encoding='utf-8',
request=request
)
except Exception as e:
spider.logger.error(f"Selenium session error: {str(e)}")
# 清理失败的会话
self._cleanup_session(session_id)
# 创建新会话
return self._create_selenium_session(request, spider, session_id)
def _handle_playwright_session(self, request, spider, session_id: str):
"""
处理Playwright会话
"""
# 类似Selenium会话处理
pass
def _create_selenium_session(self, request, spider, session_id: str):
"""
创建Selenium会话
"""
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=chrome_options)
# 存储会话信息
self.active_sessions[session_id] = {
'driver': driver,
'created_at': time.time(),
'last_used': time.time(),
'request_count': 0
}
try:
driver.get(request.url)
content = driver.page_source
# 更新会话统计
session_info = self.active_sessions[session_id]
session_info['last_used'] = time.time()
session_info['request_count'] += 1
return HtmlResponse(
url=request.url,
body=content.encode('utf-8'),
encoding='utf-8',
request=request
)
except Exception as e:
spider.logger.error(f"Failed to create Selenium session: {str(e)}")
self._cleanup_session(session_id)
return request
def _cleanup_session(self, session_id: str):
"""
清理会话
"""
if session_id in self.active_sessions:
session_info = self.active_sessions[session_id]
driver = session_info.get('driver')
if driver:
try:
driver.quit()
except:
pass
del self.active_sessions[session_id]
def spider_opened(self, spider):
"""
爬虫开启时
"""
self.resource_manager.start_monitoring()
def spider_closed(self, spider):
"""
爬虫关闭时清理所有资源
"""
self.resource_manager.stop_monitoring()
# 清理所有活跃会话
for session_id in list(self.active_sessions.keys()):
self._cleanup_session(session_id)
# 清理锁
self.session_locks.clear()#错误处理与重试机制
#错误处理中间件
import time
import random
from typing import List, Tuple
from enum import Enum
class RetryStrategy(Enum):
LINEAR = "linear"
EXPONENTIAL = "exponential"
FIXED = "fixed"
class ErrorHandlingMiddleware:
"""
错误处理中间件
"""
def __init__(self):
self.retry_config = {
'max_retries': 3,
'strategy': RetryStrategy.EXPONENTIAL,
'base_delay': 1.0,
'max_delay': 60.0,
'jitter': True
}
self.failed_requests = {}
def calculate_retry_delay(self, attempt: int) -> float:
"""
计算重试延迟
"""
if self.retry_config['strategy'] == RetryStrategy.LINEAR:
delay = self.retry_config['base_delay'] * attempt
elif self.retry_config['strategy'] == RetryStrategy.EXPONENTIAL:
delay = self.retry_config['base_delay'] * (2 ** (attempt - 1))
else: # FIXED
delay = self.retry_config['base_delay']
# 应用最大延迟限制
delay = min(delay, self.retry_config['max_delay'])
# 添加抖动
if self.retry_config['jitter']:
jitter = random.uniform(0.1, 0.3) * delay
delay += jitter
return delay
def process_request(self, request, spider):
"""
处理请求,包含错误处理
"""
if not (request.meta.get('use_selenium') or request.meta.get('use_playwright')):
return None
# 检查重试次数
retry_count = request.meta.get('retry_times', 0)
if retry_count >= self.retry_config['max_retries']:
spider.logger.error(f"Max retries exceeded for {request.url}")
return None
# 在这里处理实际的浏览器请求
# 由于我们无法在这里执行实际的浏览器操作,
# 我们返回None让其他中间件处理,但如果发生错误,
# 我们会在process_exception中处理重试逻辑
return None
def process_exception(self, request, exception, spider):
"""
处理异常,实现重试逻辑
"""
if not (request.meta.get('use_selenium') or request.meta.get('use_playwright')):
return None
retry_count = request.meta.get('retry_times', 0)
if retry_count >= self.retry_config['max_retries']:
spider.logger.error(f"Failed to process {request.url} after {retry_count} retries: {str(exception)}")
return None
# 计算重试延迟
delay = self.calculate_retry_delay(retry_count + 1)
spider.logger.warning(f"Retrying {request.url} in {delay:.2f}s (attempt {retry_count + 1}/{self.retry_config['max_retries']}): {str(exception)}")
# 创建重试请求
retry_request = request.copy()
retry_request.meta['retry_times'] = retry_count + 1
retry_request.meta['retry_delay'] = delay
retry_request.dont_filter = True # 允许重复请求
# 延迟调度
time.sleep(delay)
return retry_request
class ComprehensiveErrorMiddleware:
"""
综合错误处理中间件
"""
def __init__(self):
self.error_patterns = {
'timeout': ['timeout', 'timed out', 'Timeout'],
'connection': ['connection', 'Connection', 'refused', 'Refused'],
'browser_crash': ['Browser closed', 'browser closed', 'crashed'],
'javascript_error': ['JavaScript', 'script', 'eval'],
'captcha': ['captcha', 'verification', 'verify']
}
self.error_handlers = {
'timeout': self._handle_timeout,
'connection': self._handle_connection_error,
'browser_crash': self._handle_browser_crash,
'javascript_error': self._handle_javascript_error,
'captcha': self._handle_captcha
}
def categorize_error(self, error_message: str) -> str:
"""
分类错误
"""
error_message_lower = error_message.lower()
for category, patterns in self.error_patterns.items():
if any(pattern.lower() in error_message_lower for pattern in patterns):
return category
return 'unknown'
def _handle_timeout(self, request, spider, error):
"""
处理超时错误
"""
spider.logger.info(f"Handling timeout for {request.url}")
# 增加超时时间
new_request = request.copy()
new_request.meta['timeout'] = request.meta.get('timeout', 30) + 10
new_request.dont_filter = True
return new_request
def _handle_connection_error(self, request, spider, error):
"""
处理连接错误
"""
spider.logger.info(f"Handling connection error for {request.url}")
# 可能需要更换IP或代理
new_request = request.copy()
new_request.meta['need_new_ip'] = True
new_request.dont_filter = True
return new_request
def _handle_browser_crash(self, request, spider, error):
"""
处理浏览器崩溃
"""
spider.logger.info(f"Handling browser crash for {request.url}")
# 重新初始化浏览器
new_request = request.copy()
new_request.meta['reinit_browser'] = True
new_request.dont_filter = True
return new_request
def _handle_javascript_error(self, request, spider, error):
"""
处理JavaScript错误
"""
spider.logger.info(f"Handling JS error for {request.url}")
# 尝试不执行JavaScript
new_request = request.copy()
new_request.meta['disable_javascript'] = True
new_request.dont_filter = True
return new_request
def _handle_captcha(self, request, spider, error):
"""
处理验证码
"""
spider.logger.info(f"Captcha detected for {request.url}")
# 标记需要人工处理或跳过
spider.logger.warning(f"CAPTCHA detected at {request.url}, skipping...")
return None
def process_exception(self, request, exception, spider):
"""
处理异常
"""
if not (request.meta.get('use_selenium') or request.meta.get('use_playwright')):
return None
error_category = self.categorize_error(str(exception))
handler = self.error_handlers.get(error_category)
if handler:
return handler(request, spider, exception)
else:
# 默认处理方式
retry_times = request.meta.get('retry_times', 0)
if retry_times < 3:
new_request = request.copy()
new_request.meta['retry_times'] = retry_times + 1
new_request.dont_filter = True
return new_request
return None#异步处理方案
#异步浏览器中间件
import asyncio
import concurrent.futures
from typing import Optional
from scrapy.http import HtmlResponse
from playwright.async_api import async_playwright
class AsyncPlaywrightMiddleware:
"""
异步Playwright中间件
"""
def __init__(self):
self.playwright = None
self.browser = None
self.semaphore = asyncio.Semaphore(3) # 限制并发数
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
async def setup_browser(self):
"""
异步设置浏览器
"""
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(
headless=True,
args=['--no-sandbox', '--disable-dev-shm-usage']
)
async def process_request_async(self, request, spider):
"""
异步处理请求
"""
if not request.meta.get('use_playwright'):
return None
async with self.semaphore: # 限制并发
try:
page = await self.browser.new_page()
# 设置超时
page.set_default_timeout(30000)
# 访问页面
await page.goto(request.url, wait_until="networkidle")
# 获取内容
content = await page.content()
# 关闭页面
await page.close()
return HtmlResponse(
url=request.url,
body=content.encode('utf-8'),
encoding='utf-8',
request=request
)
except Exception as e:
spider.logger.error(f"Async Playwright error: {str(e)}")
return request
def process_request(self, request, spider):
"""
同步包装器
"""
if request.meta.get('use_playwright'):
# 在事件循环中运行异步方法
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(self.process_request_async(request, spider))
return None
def spider_closed(self, spider):
"""
清理资源
"""
if self.browser:
async def close_browser():
await self.browser.close()
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(close_browser())
except:
pass
if self.playwright:
async def stop_playwright():
await self.playwright.stop()
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(stop_playwright())
except:
pass
self.executor.shutdown(wait=True)
class AsyncSeleniumMiddleware:
"""
异步Selenium中间件(使用线程池)
"""
def __init__(self):
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
self.semaphore = threading.Semaphore(3)
def process_request(self, request, spider):
"""
处理Selenium请求(异步化)
"""
if not request.meta.get('use_selenium'):
return None
# 使用线程池异步处理
future = self.executor.submit(self._sync_process_request, request, spider)
try:
# 设置超时
result = future.result(timeout=60) # 60秒超时
return result
except concurrent.futures.TimeoutError:
spider.logger.error(f"Selenium request timed out: {request.url}")
return request
except Exception as e:
spider.logger.error(f"Selenium async error: {str(e)}")
return request
def _sync_process_request(self, request, spider):
"""
同步处理请求(在单独线程中运行)
"""
with self.semaphore: # 限制并发
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=chrome_options)
try:
driver.get(request.url)
time.sleep(2) # 等待页面加载
body = driver.page_source
return HtmlResponse(
url=request.url,
body=body.encode('utf-8'),
encoding='utf-8',
request=request
)
except Exception as e:
spider.logger.error(f"Selenium sync error: {str(e)}")
return request
finally:
driver.quit()
def spider_closed(self, spider):
"""
清理资源
"""
self.executor.shutdown(wait=True)#容器化部署
#Docker配置
# Dockerfile
FROM python:3.9-slim
# 安装系统依赖
RUN apt-get update && apt-get install -y \
wget \
unzip \
xvfb \
x11-utils \
x11-xserver-utils \
xdg-utils \
libnss3 \
libatk-bridge2.0-0 \
libdrm2 \
libxkbcommon0 \
libxcomposite1 \
libxdamage1 \
libxrandr2 \
libgbm1 \
libxss1 \
libasound2 \
gnupg \
ca-certificates \
fonts-liberation \
libappindicator3-1 \
libsecret-1-0 \
curl \
&& rm -rf /var/lib/apt/lists/*
# 安装Chrome
RUN wget -q -O - https://dl.google.com/linux/linux_signing_key.pub | apt-key add - \
&& echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google-chrome.list \
&& apt-get update \
&& apt-get install -y google-chrome-stable
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建非root用户
RUN groupadd -r selenium && useradd -r -g selenium selenium
# 设置权限
RUN chown -R selenium:selenium /app
USER selenium
# 启动命令
CMD ["scrapy", "crawl", "your_spider"]#容器化部署配置
# docker_compose_config.py
DOCKER_COMPOSE_CONFIG = {
'version': '3.8',
'services': {
'scraper': {
'build': '.',
'volumes': [
'./data:/app/data',
'/dev/shm:/dev/shm' # 共享内存,提高性能
],
'environment': [
'SCRAPY_SETTINGS_MODULE=settings',
'PLAYWRIGHT_BROWSERS_PATH=/app/.cache/ms-playwright'
],
'shm_size': '2gb', # 增加共享内存大小
'restart': 'unless-stopped',
'logging': {
'driver': 'json-file',
'options': {
'max-size': '10m',
'max-file': '3'
}
}
}
}
}
class ContainerizedMiddleware:
"""
容器化部署中间件
"""
def __init__(self):
self.in_container = self._is_running_in_container()
self.container_resources = self._get_container_resources()
def _is_running_in_container(self) -> bool:
"""
检查是否在容器中运行
"""
try:
with open('/proc/1/cgroup', 'r') as f:
return 'docker' in f.read() or 'containerd' in f.read()
except:
return False
def _get_container_resources(self) -> dict:
"""
获取容器资源限制
"""
resources = {
'memory_limit': None,
'cpu_limit': None
}
try:
# 读取内存限制
with open('/sys/fs/cgroup/memory/memory.limit_in_bytes', 'r') as f:
memory_limit = int(f.read().strip())
if memory_limit < 9223372036854771712: # 不是默认的大数值
resources['memory_limit'] = memory_limit
# 读取CPU份额
with open('/sys/fs/cgroup/cpu/cpu.shares', 'r') as f:
resources['cpu_limit'] = int(f.read().strip())
except:
pass
return resources
def process_request(self, request, spider):
"""
处理请求,考虑容器资源限制
"""
if not (request.meta.get('use_selenium') or request.meta.get('use_playwright')):
return None
# 根据容器资源调整行为
if self.in_container:
# 在容器中运行,可能需要更保守的资源使用
if request.meta.get('selenium_wait_for'):
# 增加等待时间以应对可能的性能限制
original_wait = request.meta['selenium_wait_for']
request.meta['selenium_wait_for'] = original_wait + 2
return None#常见问题与解决方案
#问题1: 浏览器启动失败
现象: ChromeDriver或Playwright无法启动浏览器 解决方案:
# Selenium解决方案
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
def create_driver_with_fallback():
"""
创建驱动的降级方案
"""
chrome_options = Options()
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--remote-debugging-port=9222')
chrome_options.add_argument('--disable-extensions')
chrome_options.add_argument('--disable-plugins')
try:
driver = webdriver.Chrome(options=chrome_options)
return driver
except Exception as e:
print(f"Chrome failed: {e}")
# 尝试使用无沙盒模式
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
try:
driver = webdriver.Chrome(options=chrome_options)
return driver
except Exception as e2:
print(f"Chrome fallback failed: {e2}")
return None#问题2: 内存泄漏
现象: 长时间运行后内存使用不断增加 解决方案:
class MemoryEfficientMiddleware:
"""
内存高效的中间件
"""
def __init__(self):
self.driver_pool = []
self.max_pool_size = 5
def process_request(self, request, spider):
"""
处理请求,注意内存管理
"""
if request.meta.get('use_selenium'):
driver = self._get_driver()
if driver:
try:
driver.get(request.url)
content = driver.page_source
response = HtmlResponse(
url=request.url,
body=content.encode('utf-8'),
encoding='utf-8',
request=request
)
self._return_driver(driver)
return response
except Exception as e:
spider.logger.error(f"Memory efficient error: {e}")
self._dispose_driver(driver) # 出错时直接销毁
return request
def _get_driver(self):
"""
获取驱动
"""
if self.driver_pool:
return self.driver_pool.pop()
else:
# 创建新驱动
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
return webdriver.Chrome(options=options)
def _return_driver(self, driver):
"""
归还驱动
"""
if len(self.driver_pool) < self.max_pool_size:
self.driver_pool.append(driver)
else:
self._dispose_driver(driver)
def _dispose_driver(self, driver):
"""
销毁驱动
"""
try:
driver.quit()
except:
pass#问题3: 反爬虫检测
现象: 被网站识别为自动化工具 解决方案:
class AntiDetectionMiddleware:
"""
反检测中间件
"""
def process_request(self, request, spider):
"""
处理请求,减少被检测风险
"""
if request.meta.get('use_selenium'):
# 在这里实现反检测措施
# 1. 随机化请求间隔
import time
import random
time.sleep(random.uniform(1, 3))
# 2. 设置真实的用户代理
# 3. 模拟人类行为
pass
return None#问题4: 性能瓶颈
现象: 处理速度慢,成为爬虫瓶颈 解决方案:
class HighPerformanceMiddleware:
"""
高性能中间件
"""
def __init__(self):
self.cache = {} # 简单缓存
self.max_cache_size = 100
def process_request(self, request, spider):
"""
高性能处理请求
"""
# 检查缓存
url_hash = hash(request.url)
if url_hash in self.cache:
cached_response = self.cache[url_hash]
return HtmlResponse(
url=request.url,
body=cached_response,
encoding='utf-8',
request=request
)
# 处理请求...
# 并缓存结果
return None#最佳实践建议
#选择合适的工具
- Selenium: 适合复杂交互,有大量现成的解决方案
- Playwright: 适合现代化Web应用,性能更好
- 优先级: 静态内容 → API接口 → Selenium/Playwright
#性能优化
- 资源复用: 重用浏览器实例和页面
- 并发控制: 限制同时运行的浏览器数量
- 缓存策略: 对相同URL进行缓存
- 错误处理: 完善的异常处理和重试机制
💡 核心要点: Selenium和Playwright是处理JavaScript渲染内容的强大工具,但也是性能瓶颈。合理使用缓存、连接池和错误处理机制,可以显著提升爬虫的整体性能。
#SEO优化建议
为了提高这篇Selenium/Playwright集成教程在搜索引擎中的排名,以下是几个关键的SEO优化建议:
#标题优化
- 主标题: 包含核心关键词"Selenium", "Playwright", "JavaScript渲染", "浏览器自动化"
- 二级标题: 每个章节标题都包含相关的长尾关键词
- H1-H6层次结构: 保持正确的标题层级,便于搜索引擎理解内容结构
#内容优化
- 关键词密度: 在内容中自然地融入关键词如"Scrapy", "Selenium", "Playwright", "JavaScript渲染", "动态页面", "浏览器自动化", "反检测", "爬虫优化"等
- 元描述: 在文章开头的元数据中包含吸引人的描述
- 内部链接: 链接到其他相关教程,如Downloader Middleware等
- 外部权威链接: 引用官方文档和权威资源
#技术SEO
- 页面加载速度: 优化代码块和图片加载
- 移动端适配: 确保在移动设备上良好显示
- 结构化数据: 使用适当的HTML标签和语义化元素
#用户体验优化
- 内容可读性: 使用清晰的段落结构和代码示例
- 互动元素: 提供实际可运行的代码示例
- 更新频率: 定期更新内容以保持时效性
🔗 相关教程推荐
- Downloader Middleware - 中间件基础
- 反爬对抗实战 - 反爬策略
- 数据去重与增量更新 - 数据处理
- 自动限速AutoThrottle - 请求控制
- 代理IP池集成 - 代理管理
🏷️ 标签云: Scrapy Selenium Playwright JavaScript渲染 动态页面 浏览器自动化 反检测 爬虫优化 Web自动化 前端爬虫

