Scrapyrt实战指南 - 将爬虫转换为HTTP API服务

📂 所属阶段:第五阶段 — 战力升级(分布式与进阶篇)
🔗 相关章节:Scrapy-Redis分布式架构 · Docker容器化爬虫 · 抓取监控看板

目录

Scrapyrt简介

Scrapyrt(Scrapy Real-Time)是一个Scrapy插件,它可以将你的Scrapy爬虫转换为HTTP API服务,允许通过HTTP请求实时触发爬虫任务。这种架构模式特别适合需要按需爬取、微服务集成或API网关场景的应用。

Scrapyrt的核心优势

"""
Scrapyrt主要优势:

1. 服务化架构:
   - 将爬虫转换为HTTP服务
   - 支持RESTful API调用
   - 易于与其他系统集成

2. 按需爬取:
   - 实时触发爬虫任务
   - 避免持续运行的资源消耗
   - 支持动态参数传递

3. 弹性扩展:
   - 无状态设计便于水平扩展
   - 支持负载均衡部署
   - 与容器化技术完美结合

4. 开发友好:
   - 标准HTTP协议通信
   - JSON格式数据交换
   - 丰富的调试和监控功能
"""

适用场景分析

"""
Scrapyrt适用场景:

1. 微服务架构:
   - 作为数据采集微服务
   - 与API网关集成
   - 支持服务发现和负载均衡

2. 按需爬取:
   - 用户触发的即时爬取
   - 特定时间段的批量任务
   - 事件驱动的爬取需求

3. API集成:
   - 与Web应用集成
   - 第三方系统数据获取
   - 实时数据同步服务
"""

安装与配置

环境准备

# 确保Python环境已安装
python --version  # 需要Python 3.6+

# 创建虚拟环境(推荐)
python -m venv scrapyrt_env
source scrapyrt_env/bin/activate  # Linux/Mac
# 或
scrapyrt_env\Scripts\activate  # Windows

# 安装Scrapyrt
pip install scrapyrt

# 验证安装
scrapyrt --help

依赖检查

# check_dependencies.py
import sys
try:
    import scrapy
    import scrapyrt
    import treq
    import klein
    print("✅ All dependencies installed successfully")
except ImportError as e:
    print(f"❌ Missing dependency: {e}")
    sys.exit(1)

基础配置

# scrapyrt配置文件 - scrapyrt.conf
[scrapyrt]
# 服务端口
port = 6023

# 项目路径
project_path = /path/to/your/scrapy/project

# 最大并发请求数
max_concurrent_requests = 10

# 请求超时时间(秒)
timeout = 300

# 日志级别
log_level = INFO

# 是否启用调试模式
debug = false

# 允许的最大爬取深度
max_depth = 3

# 下载延迟
download_delay = 1

基本使用方法

启动Scrapyrt服务

# 方法1:直接启动(默认端口6023)
scrapyrt

# 方法2:指定端口
scrapyrt -p 8080

# 方法3:指定项目路径
scrapyrt -p 6023 -i /path/to/your/scrapy/project

# 方法4:使用配置文件
scrapyrt -c /path/to/scrapyrt.conf

# 方法5:后台运行
nohup scrapyrt -p 6023 > scrapyrt.log 2>&1 &

创建示例爬虫

# example_spider.py - 示例爬虫
import scrapy
from scrapy import Request

class ExampleSpider(scrapy.Spider):
    name = 'example'
    
    def start_requests(self):
        urls = getattr(self, 'url', None)
        if urls:
            if isinstance(urls, str):
                urls = [urls]
            for url in urls:
                yield Request(url, callback=self.parse)
    
    def parse(self, response):
        # 提取页面信息
        yield {
            'url': response.url,
            'title': response.css('title::text').get(),
            'status': response.status,
            'timestamp': self.crawler.stats.start_time.isoformat()
        }

验证服务启动

# 检查服务是否启动
curl http://localhost:6023/

# 预期返回
{
  "message": "ScrapyRT is ready!",
  "version": "0.11.0",
  "endpoints": [
    "/crawl.json",
    "/crawl.json/request",
    "/stats.json"
  ]
}

HTTP API接口详解

主要API端点

"""
Scrapyrt提供的主要API端点:

1. /crawl.json - 同步爬取端点
   - GET请求:触发爬虫任务
   - 参数:spider_name, url, max_requests等
   
2. /crawl.json/request - 异步爬取端点
   - POST请求:发送更复杂的爬虫请求
   - 支持自定义Request对象
   
3. /stats.json - 统计信息端点
   - GET请求:获取服务运行状态
   - 返回性能指标和统计信息
"""

同步爬取API (/crawl.json)

# 基本GET请求
curl "http://localhost:6023/crawl.json?spider_name=example&url=https://example.com"

# 带参数的请求
curl "http://localhost:6023/crawl.json?spider_name=example&url=https://example.com&max_requests=5&start_requests=true"

# 返回结果示例
{
  "status": "ok",
  "items": [
    {
      "url": "https://example.com",
      "title": "Example Domain",
      "status": 200,
      "timestamp": "2024-01-15T10:30:00.123456"
    }
  ],
  "stats": {
    "start_time": "2024-01-15 10:30:00",
    "finish_time": "2024-01-15 10:30:05",
    "item_scraped_count": 1,
    "response_received_count": 1,
    "finish_reason": "finished"
  }
}

异步爬取API (/crawl.json/request)

# POST请求示例
curl -X POST http://localhost:6023/crawl.json/request \
  -H "Content-Type: application/json" \
  -d '{
    "spider_name": "example",
    "request": {
      "url": "https://example.com",
      "method": "GET",
      "headers": {
        "User-Agent": "Custom Bot 1.0"
      },
      "meta": {
        "custom_param": "value"
      }
    }
  }'

统计信息API (/stats.json)

# 获取服务统计信息
curl http://localhost:6023/stats.json

# 返回示例
{
  "scrapy_version": "2.6.1",
  "scrapyrt_version": "0.11.0",
  "active_requests": 2,
  "total_requests": 150,
  "total_items": 89,
  "uptime": "2 hours 30 minutes",
  "memory_usage": "128 MB"
}

Python客户端调用

基础调用示例

# scrapyrt_client.py - Scrapyrt Python客户端
import requests
import json
from typing import Dict, List, Optional

class ScrapyrtClient:
    """
    Scrapyrt客户端类
    """
    
    def __init__(self, base_url: str = "http://localhost:6023"):
        self.base_url = base_url.rstrip('/')
        self.session = requests.Session()
    
    def crawl_sync(self, spider_name: str, url: str, **params) -> Dict:
        """
        同步爬取
        """
        endpoint = f"{self.base_url}/crawl.json"
        params.update({
            'spider_name': spider_name,
            'url': url
        })
        
        response = self.session.get(endpoint, params=params)
        response.raise_for_status()
        
        return response.json()
    
    def crawl_async(self, spider_name: str, request_data: Dict) -> Dict:
        """
        异步爬取
        """
        endpoint = f"{self.base_url}/crawl.json/request"
        payload = {
            'spider_name': spider_name,
            'request': request_data
        }
        
        response = self.session.post(
            endpoint,
            json=payload,
            headers={'Content-Type': 'application/json'}
        )
        response.raise_for_status()
        
        return response.json()
    
    def get_stats(self) -> Dict:
        """
        获取统计信息
        """
        endpoint = f"{self.base_url}/stats.json"
        response = self.session.get(endpoint)
        response.raise_for_status()
        
        return response.json()

# 使用示例
client = ScrapyrtClient()

# 同步调用
result = client.crawl_sync(
    spider_name='example',
    url='https://httpbin.org/get',
    max_requests=1
)
print(json.dumps(result, indent=2, ensure_ascii=False))

高级调用示例

# advanced_client.py - 高级客户端功能
import asyncio
import aiohttp
import time
from concurrent.futures import ThreadPoolExecutor
import logging

class AsyncScrapyrtClient:
    """
    异步Scrapyrt客户端
    """
    
    def __init__(self, base_url: str = "http://localhost:6023"):
        self.base_url = base_url.rstrip('/')
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def crawl_batch(self, requests_list: List[Dict]) -> List[Dict]:
        """
        批量爬取
        """
        tasks = []
        for req_data in requests_list:
            task = self._single_crawl(req_data)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results
    
    async def _single_crawl(self, req_data: Dict) -> Dict:
        """
        单次爬取
        """
        try:
            start_time = time.time()
            async with self.session.get(
                f"{self.base_url}/crawl.json",
                params=req_data
            ) as response:
                result = await response.json()
                result['request_time'] = time.time() - start_time
                return result
        except Exception as e:
            return {'error': str(e), 'request_data': req_data}

# 批量调用示例
async def batch_example():
    async with AsyncScrapyrtClient() as client:
        requests_list = [
            {'spider_name': 'example', 'url': 'https://httpbin.org/get', 'max_requests': 1},
            {'spider_name': 'example', 'url': 'https://httpbin.org/user-agent', 'max_requests': 1},
            {'spider_name': 'example', 'url': 'https://httpbin.org/headers', 'max_requests': 1}
        ]
        
        results = await client.crawl_batch(requests_list)
        for result in results:
            print(json.dumps(result, indent=2, ensure_ascii=False))

# 运行示例
# asyncio.run(batch_example())

错误处理和重试机制

# error_handling.py - 错误处理和重试
import time
from functools import wraps

def retry_on_failure(max_retries=3, delay=1, backoff=2):
    """
    重试装饰器
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            current_delay = delay
            
            while retries < max_retries:
                try:
                    return func(*args, **kwargs)
                except requests.RequestException as e:
                    retries += 1
                    if retries >= max_retries:
                        raise e
                    
                    time.sleep(current_delay)
                    current_delay *= backoff
            
            return None
        return wrapper
    return decorator

class RobustScrapyrtClient(ScrapyrtClient):
    """
    具备重试机制的客户端
    """
    
    @retry_on_failure(max_retries=3, delay=1, backoff=2)
    def crawl_with_retry(self, spider_name: str, url: str, **params) -> Dict:
        """
        带重试的爬取方法
        """
        return self.crawl_sync(spider_name, url, **params)
    
    def crawl_with_timeout(self, spider_name: str, url: str, timeout: int = 300, **params) -> Dict:
        """
        带超时控制的爬取方法
        """
        endpoint = f"{self.base_url}/crawl.json"
        params.update({
            'spider_name': spider_name,
            'url': url
        })
        
        response = self.session.get(endpoint, params=params, timeout=timeout)
        response.raise_for_status()
        
        return response.json()

# 使用示例
robust_client = RobustScrapyrtClient()
try:
    result = robust_client.crawl_with_retry(
        spider_name='example',
        url='https://httpbin.org/delay/2',
        max_requests=1
    )
    print("✅ Crawling successful:", result.get('status'))
except Exception as e:
    print(f"❌ Crawling failed after retries: {e}")

高级配置选项

服务配置优化

# advanced_config.py - 高级配置选项
import os
from dataclasses import dataclass
from typing import Optional

@dataclass
class ScrapyrtConfig:
    """
    Scrapyrt配置类
    """
    port: int = 6023
    bind_address: str = '0.0.0.0'
    project_path: str = './'
    max_concurrent_requests: int = 10
    timeout: int = 300
    log_level: str = 'INFO'
    debug: bool = False
    max_depth: int = 3
    download_delay: float = 1.0
    concurrent_items: int = 100
    memusage_limit_mb: int = 1024
    memusage_warning_mb: int = 512
    retry_times: int = 3
    retry_http_codes: list = None
    
    def __post_init__(self):
        if self.retry_http_codes is None:
            self.retry_http_codes = [500, 502, 503, 504, 408, 429]
    
    def to_dict(self) -> dict:
        """
        转换为字典格式
        """
        return {
            'port': self.port,
            'bind_address': self.bind_address,
            'project_path': self.project_path,
            'max_concurrent_requests': self.max_concurrent_requests,
            'timeout': self.timeout,
            'log_level': self.log_level,
            'debug': self.debug,
            'max_depth': self.max_depth,
            'download_delay': self.download_delay,
            'concurrent_items': self.concurrent_items,
            'memusage_limit_mb': self.memusage_limit_mb,
            'memusage_warning_mb': self.memusage_warning_mb,
            'retry_times': self.retry_times,
            'retry_http_codes': self.retry_http_codes
        }

# 配置示例
config = ScrapyrtConfig(
    port=8080,
    max_concurrent_requests=20,
    timeout=600,
    log_level='DEBUG',
    memusage_limit_mb=2048
)

print("Scrapyrt配置:", json.dumps(config.to_dict(), indent=2, ensure_ascii=False))

性能调优配置

# performance_config.py - 性能调优配置
PERFORMANCE_CONFIG = {
    # 网络性能优化
    'CONCURRENT_REQUESTS': 32,
    'CONCURRENT_REQUESTS_PER_DOMAIN': 8,
    'DOWNLOAD_DELAY': 0.5,
    'RANDOMIZE_DOWNLOAD_DELAY': 0.1,
    
    # 内存优化
    'MEMUSAGE_LIMIT_MB': 2048,
    'MEMUSAGE_WARNING_MB': 1024,
    'CLOSESPIDER_TIMEOUT': 300,
    
    # 下载器优化
    'DOWNLOAD_TIMEOUT': 60,
    'DOWNLOAD_MAXSIZE': 1024*1024*100,  # 100MB
    'DOWNLOAD_WARNSIZE': 1024*1024*32,  # 32MB
    
    # 并发优化
    'REACTOR_THREADPOOL_MAXSIZE': 20,
    'DNSCACHE_ENABLED': True,
    'DNSCACHE_SIZE': 10000,
    
    # 缓存优化
    'HTTPCACHE_ENABLED': False,  # Scrapyrt通常不需要缓存
    'REDIRECT_ENABLED': True,
    'COOKIES_ENABLED': True,
    
    # 重试优化
    'RETRY_TIMES': 3,
    'RETRY_HTTP_CODES': [500, 502, 503, 504, 408, 429, 403],
    
    # 日志优化
    'LOG_LEVEL': 'INFO',
    'LOG_SHORT_NAMES': True
}

# 环境特定配置
ENV_CONFIGS = {
    'development': {
        'LOG_LEVEL': 'DEBUG',
        'CONCURRENT_REQUESTS': 8,
        'DOWNLOAD_DELAY': 1.0
    },
    'production': {
        'LOG_LEVEL': 'WARNING',
        'CONCURRENT_REQUESTS': 32,
        'DOWNLOAD_DELAY': 0.5,
        'MEMUSAGE_LIMIT_MB': 4096
    },
    'testing': {
        'LOG_LEVEL': 'ERROR',
        'CONCURRENT_REQUESTS': 4,
        'DOWNLOAD_DELAY': 0.1
    }
}

性能优化策略

并发控制优化

# concurrency_optimization.py - 并发控制优化
import asyncio
import time
from asyncio import Semaphore
from typing import List, Dict

class ConcurrencyController:
    """
    并发控制器
    """
    
    def __init__(self, max_concurrent: int = 10):
        self.semaphore = Semaphore(max_concurrent)
        self.active_tasks = 0
        self.max_concurrent = max_concurrent
    
    async def execute_with_limit(self, func, *args, **kwargs):
        """
        限制并发执行
        """
        async with self.semaphore:
            self.active_tasks += 1
            try:
                result = await func(*args, **kwargs)
                return result
            finally:
                self.active_tasks -= 1

class OptimizedScrapyrtClient(AsyncScrapyrtClient):
    """
    优化的Scrapyrt客户端
    """
    
    def __init__(self, base_url: str = "http://localhost:6023", max_concurrent: int = 5):
        super().__init__(base_url)
        self.concurrency_controller = ConcurrencyController(max_concurrent)
    
    async def optimized_batch_crawl(self, requests_list: List[Dict], batch_size: int = 10) -> List[Dict]:
        """
        优化的批量爬取,支持批处理
        """
        results = []
        
        # 分批处理
        for i in range(0, len(requests_list), batch_size):
            batch = requests_list[i:i + batch_size]
            batch_results = await self._process_batch(batch)
            results.extend(batch_results)
            
            # 批次间短暂延迟,避免过载
            if i + batch_size < len(requests_list):
                await asyncio.sleep(0.1)
        
        return results
    
    async def _process_batch(self, batch: List[Dict]) -> List[Dict]:
        """
        处理单个批次
        """
        tasks = []
        for req_data in batch:
            task = self.concurrency_controller.execute_with_limit(
                self._single_crawl, req_data
            )
            tasks.append(task)
        
        return await asyncio.gather(*tasks, return_exceptions=True)

# 性能监控
class PerformanceMonitor:
    """
    性能监控器
    """
    
    def __init__(self):
        self.requests_count = 0
        self.items_count = 0
        self.errors_count = 0
        self.total_time = 0
        self.start_time = time.time()
    
    def record_request(self, items_count: int = 0, error: bool = False):
        """
        记录请求统计
        """
        self.requests_count += 1
        self.items_count += items_count
        if error:
            self.errors_count += 1
    
    def get_stats(self) -> Dict:
        """
        获取性能统计
        """
        elapsed_time = time.time() - self.start_time
        return {
            'requests_per_second': self.requests_count / elapsed_time if elapsed_time > 0 else 0,
            'items_per_second': self.items_count / elapsed_time if elapsed_time > 0 else 0,
            'error_rate': self.errors_count / self.requests_count if self.requests_count > 0 else 0,
            'total_time': elapsed_time,
            'total_requests': self.requests_count,
            'total_items': self.items_count,
            'total_errors': self.errors_count
        }

内存优化策略

# memory_optimization.py - 内存优化策略
import gc
import psutil
import sys
from weakref import WeakValueDictionary

class MemoryOptimizer:
    """
    内存优化器
    """
    
    def __init__(self, max_memory_mb: int = 1024):
        self.max_memory_mb = max_memory_mb
        self.object_cache = WeakValueDictionary()
        self.collection_threshold = 1000  # 每处理1000个对象进行一次垃圾回收
    
    def monitor_memory(self) -> Dict:
        """
        监控内存使用情况
        """
        process = psutil.Process()
        memory_info = process.memory_info()
        
        return {
            'rss': memory_info.rss / 1024 / 1024,  # MB
            'vms': memory_info.vms / 1024 / 1024,  # MB
            'percent': process.memory_percent()
        }
    
    def optimize_memory_usage(self, force_gc: bool = False):
        """
        优化内存使用
        """
        memory_info = self.monitor_memory()
        
        if memory_info['rss'] > self.max_memory_mb or force_gc:
            # 执行垃圾回收
            collected = gc.collect()
            print(f"Garbage collected: {collected} objects")
            
            # 清理缓存
            self.object_cache.clear()
    
    def cache_object(self, key: str, obj):
        """
        缓存对象(使用弱引用避免内存泄漏)
        """
        self.object_cache[key] = obj
    
    def get_cached_object(self, key: str):
        """
        获取缓存对象
        """
        return self.object_cache.get(key)

class OptimizedResponseHandler:
    """
    优化的响应处理器
    """
    
    def __init__(self):
        self.memory_optimizer = MemoryOptimizer(max_memory_mb=512)
    
    def process_large_response(self, response_data: Dict, max_items: int = 1000):
        """
        处理大响应数据
        """
        if 'items' in response_data:
            items = response_data['items']
            
            # 限制返回的项目数量
            if len(items) > max_items:
                response_data['items'] = items[:max_items]
                response_data['truncated'] = True
                response_data['original_count'] = len(items)
            
            # 限制单个项目大小
            for item in response_data['items']:
                self._limit_item_size(item)
        
        return response_data
    
    def _limit_item_size(self, item: Dict, max_string_length: int = 10000):
        """
        限制项目大小
        """
        for key, value in item.items():
            if isinstance(value, str) and len(value) > max_string_length:
                item[key] = value[:max_string_length] + "...[TRUNCATED]"
    
    def cleanup_response(self, response_data: Dict):
        """
        清理响应数据
        """
        # 移除不必要的元数据
        cleanup_keys = ['_response_time', '_process_info']
        for key in cleanup_keys:
            response_data.pop(key, None)
        
        # 优化内存使用
        self.memory_optimizer.optimize_memory_usage()

安全性配置

访问控制中间件

# security_config.py - 安全性配置
import hmac
import hashlib
import time
from functools import wraps
from typing import List

class SecurityConfig:
    """
    安全配置类
    """
    
    def __init__(self):
        self.api_keys = {}  # API密钥存储
        self.rate_limits = {}  # 速率限制
        self.whitelist_ips = []  # IP白名单
        self.blacklist_ips = []  # IP黑名单
        self.request_timeout = 300  # 请求超时
        self.max_request_size = 1024 * 1024  # 最大请求大小1MB
    
    def add_api_key(self, api_key: str, permissions: List[str] = None):
        """
        添加API密钥
        """
        if permissions is None:
            permissions = ['read', 'write']
        
        self.api_keys[api_key] = {
            'permissions': permissions,
            'created_at': time.time(),
            'last_used': None
        }
    
    def verify_api_key(self, api_key: str, required_permission: str = 'read') -> bool:
        """
        验证API密钥
        """
        if api_key not in self.api_keys:
            return False
        
        key_info = self.api_keys[api_key]
        if required_permission not in key_info['permissions']:
            return False
        
        # 更新最后使用时间
        key_info['last_used'] = time.time()
        return True
    
    def verify_signature(self, data: str, signature: str, secret_key: str) -> bool:
        """
        验证请求签名
        """
        expected_signature = hmac.new(
            secret_key.encode(),
            data.encode(),
            hashlib.sha256
        ).hexdigest()
        
        return hmac.compare_digest(signature, expected_signature)
    
    def check_rate_limit(self, identifier: str, max_requests: int = 100, window: int = 3600) -> bool:
        """
        检查速率限制
        """
        current_time = time.time()
        
        if identifier not in self.rate_limits:
            self.rate_limits[identifier] = []
        
        # 清理过期的请求记录
        self.rate_limits[identifier] = [
            req_time for req_time in self.rate_limits[identifier]
            if current_time - req_time < window
        ]
        
        # 检查是否超过限制
        if len(self.rate_limits[identifier]) >= max_requests:
            return False
        
        # 记录当前请求
        self.rate_limits[identifier].append(current_time)
        return True

# 装饰器实现认证
def require_auth(security_config: SecurityConfig, permission: str = 'read'):
    """
    认证装饰器
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 从请求中获取API密钥
            api_key = kwargs.get('api_key') or args[0].get('api_key')
            
            if not api_key or not security_config.verify_api_key(api_key, permission):
                raise PermissionError("Invalid or insufficient API key")
            
            return func(*args, **kwargs)
        return wrapper
    return decorator

# 使用示例
security = SecurityConfig()
security.add_api_key('your-secret-api-key', ['read', 'write'])

SSL/TLS配置

# ssl_config.py - SSL/TLS配置
SSL_CONFIG = {
    # SSL证书路径
    'ssl_cert_file': '/path/to/certificate.crt',
    'ssl_key_file': '/path/to/private.key',
    
    # SSL协议版本
    'ssl_protocol': 'TLSv1.2',
    
    # 密码套件
    'ssl_ciphers': [
        'ECDHE-RSA-AES256-GCM-SHA384',
        'ECDHE-RSA-AES128-GCM-SHA256',
        'ECDHE-RSA-AES256-SHA384',
        'ECDHE-RSA-AES128-SHA256'
    ],
    
    # 安全头
    'security_headers': {
        'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
        'X-Content-Type-Options': 'nosniff',
        'X-Frame-Options': 'DENY',
        'X-XSS-Protection': '1; mode=block'
    }
}

部署与运维

Docker部署方案

# Dockerfile.scrapyrt - Scrapyrt Docker镜像
FROM python:3.9-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    libxml2-dev \
    libxslt1-dev \
    libffi-dev \
    libssl-dev \
    libpq-dev \
    libsqlite3-dev \
    libreadline-dev \
    libbz2-dev \
    libncursesw5-dev \
    libexpat1-dev \
    libjpeg-dev \
    libpng-dev \
    libfreetype6-dev \
    zlib1g-dev \
    && rm -rf /var/lib/apt/lists/*

# 复制项目文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 安装Scrapyrt
RUN pip install scrapyrt

# 复制应用代码
COPY . .

# 创建非root用户
RUN groupadd -r appuser && useradd -r -g appuser appuser
RUN chown -R appuser:appuser /app
USER appuser

# 暴露端口
EXPOSE 6023

# 启动命令
CMD ["scrapyrt", "-p", "6023", "-i", "/app"]

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:6023/ || exit 1

Docker Compose配置

# docker-compose-scrapyrt.yml
version: '3.8'

services:
  scrapyrt:
    build:
      context: .
      dockerfile: Dockerfile.scrapyrt
    ports:
      - "6023:6023"
    environment:
      - PYTHONUNBUFFERED=1
      - SCRAPY_SETTINGS_MODULE=myproject.settings
    volumes:
      - ./logs:/app/logs
      - ./config:/app/config
    restart: unless-stopped
    depends_on:
      - redis
    networks:
      - crawler-network
  
  redis:
    image: redis:6.2-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
    command: redis-server --appendonly yes
    networks:
      - crawler-network

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - scrapyrt
    networks:
      - crawler-network

volumes:
  redis-data:

networks:
  crawler-network:
    driver: bridge

系统服务配置

# scrapyrt.service - systemd服务配置
[Unit]
Description=Scrapyrt Service
After=network.target

[Service]
Type=simple
User=scrapyrt
Group=scrapyrt
WorkingDirectory=/opt/scrapyrt
ExecStart=/opt/scrapyrt/venv/bin/scrapyrt -p 6023 -i /opt/scrapyrt/project
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
SyslogIdentifier=scrapyrt

[Install]
WantedBy=multi-user.target

监控与日志

日志配置

# logging_config.py - 日志配置
import logging
import logging.handlers
import json
from datetime import datetime

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'standard': {
            'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
        },
        'json': {
            '()': 'pythonjsonlogger.jsonlogger.JsonFormatter',
            'format': '%(asctime)s %(name)s %(levelname)s %(message)s'
        }
    },
    'handlers': {
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'standard'
        },
        'file': {
            'level': 'DEBUG',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': 'scrapyrt.log',
            'maxBytes': 1024*1024*10,  # 10MB
            'backupCount': 5,
            'formatter': 'standard'
        },
        'error_file': {
            'level': 'ERROR',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': 'scrapyrt_error.log',
            'maxBytes': 1024*1024*10,  # 10MB
            'backupCount': 5,
            'formatter': 'json'
        }
    },
    'loggers': {
        'scrapyrt': {
            'handlers': ['console', 'file', 'error_file'],
            'level': 'DEBUG',
            'propagate': False
        },
        'twisted': {
            'handlers': ['file'],
            'level': 'WARNING',
            'propagate': False
        }
    }
}

def setup_logging():
    """
    设置日志配置
    """
    import logging.config
    logging.config.dictConfig(LOGGING_CONFIG)

# 使用示例
setup_logging()
logger = logging.getLogger('scrapyrt')
logger.info("Scrapyrt service started")

监控指标收集

# monitoring.py - 监控指标收集
import time
import psutil
from prometheus_client import Counter, Gauge, Histogram, start_http_server
from functools import wraps

# Prometheus指标定义
REQUEST_COUNT = Counter('scrapyrt_requests_total', 'Total requests processed', ['endpoint', 'method'])
ERROR_COUNT = Counter('scrapyrt_errors_total', 'Total errors', ['error_type'])
RESPONSE_TIME = Histogram('scrapyrt_response_time_seconds', 'Response time in seconds')
ACTIVE_REQUESTS = Gauge('scrapyrt_active_requests', 'Number of active requests')

class MetricsCollector:
    """
    指标收集器
    """
    
    def __init__(self, port: int = 8000):
        self.port = port
        start_http_server(port)
        self.start_time = time.time()
    
    def collect_system_metrics(self):
        """
        收集系统指标
        """
        return {
            'cpu_percent': psutil.cpu_percent(),
            'memory_percent': psutil.virtual_memory().percent,
            'disk_usage': psutil.disk_usage('/').percent,
            'uptime': time.time() - self.start_time
        }
    
    def instrument_endpoint(self, endpoint: str, method: str = 'GET'):
        """
        为端点添加监控装饰器
        """
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                start_time = time.time()
                ACTIVE_REQUESTS.inc()
                
                try:
                    result = func(*args, **kwargs)
                    REQUEST_COUNT.labels(endpoint=endpoint, method=method).inc()
                    return result
                except Exception as e:
                    ERROR_COUNT.labels(error_type=type(e).__name__).inc()
                    raise
                finally:
                    RESPONSE_TIME.observe(time.time() - start_time)
                    ACTIVE_REQUESTS.dec()
            
            return wrapper
        return decorator

# 使用示例
metrics = MetricsCollector(port=8000)

@metrics.instrument_endpoint('/crawl.json', 'GET')
def monitored_crawl():
    """
    受监控的爬取方法
    """
    time.sleep(0.1)  # 模拟处理时间
    return {"status": "ok"}

# 获取监控数据
print("System metrics:", metrics.collect_system_metrics())

常见问题与解决方案

问题1: 服务启动失败

现象: Scrapyrt服务无法启动,报错"Address already in use" 解决方案:

# 检查端口占用
netstat -tulpn | grep 6023

# 或使用lsof
lsof -i :6023

# 终止占用进程
kill -9 <PID>

# 或使用不同端口启动
scrapyrt -p 6024

问题2: 爬虫超时

现象: 爬虫请求长时间无响应 解决方案:

# 增加超时时间
TIMEOUT_CONFIG = {
    'timeout': 600,  # 10分钟
    'download_timeout': 120,  # 下载超时
    'connect_timeout': 30  # 连接超时
}

# 在爬虫中设置超时
CUSTOM_SETTINGS = {
    'DOWNLOAD_TIMEOUT': 120,
    'CONNECTION_TIMEOUT': 30
}

问题3: 内存溢出

现象: 服务运行一段时间后内存占用过高 解决方案:

# 设置内存限制
MEMORY_CONFIG = {
    'MEMUSAGE_LIMIT_MB': 2048,
    'MEMUSAGE_WARNING_MB': 1024,
    'CLOSESPIDER_TIMEOUT': 300
}

# 优化响应处理
def process_large_response(response):
    # 限制返回的数据量
    items = response.get('items', [])[:1000]  # 只返回前1000个项目
    return {'items': items, 'truncated': len(response.get('items', [])) > 1000}

问题4: 并发限制

现象: 高并发请求时服务响应缓慢或失败 解决方案:

# 调整并发参数
CONCURRENCY_CONFIG = {
    'CONCURRENT_REQUESTS': 32,
    'CONCURRENT_REQUESTS_PER_DOMAIN': 8,
    'REACTOR_THREADPOOL_MAXSIZE': 20,
    'max_concurrent_requests': 10  # Scrapyrt级别的并发限制
}

最佳实践总结

部署最佳实践

  1. 容器化部署:

    • 使用Docker进行标准化部署
    • 通过Docker Compose管理多服务
    • 实现环境隔离和快速扩展
  2. 配置管理:

    • 使用配置文件管理环境变量
    • 实现配置的版本控制
    • 支持多环境配置切换
  3. 监控告警:

    • 集成Prometheus等监控系统
    • 设置关键指标告警
    • 实现日志集中管理

安全最佳实践

  1. 访问控制:

    • 实施API密钥认证
    • 配置IP白名单/黑名单
    • 启用请求签名验证
  2. 数据保护:

    • 使用HTTPS加密传输
    • 实施速率限制
    • 验证输入参数合法性
  3. 权限管理:

    • 最小权限原则
    • 定期轮换密钥
    • 记录访问日志

性能最佳实践

  1. 资源优化:

    • 合理设置并发数
    • 优化内存使用
    • 实现连接池复用
  2. 缓存策略:

    • 适当使用响应缓存
    • 实现数据预热
    • 避免重复计算
  3. 扩展性:

    • 无状态设计
    • 支持水平扩展
    • 实现负载均衡

💡 核心要点: Scrapyrt将传统爬虫转换为HTTP API服务,实现了服务化架构。通过合理配置和优化,可以构建高可用、高性能的实时爬取服务。

SEO优化策略

  1. 关键词优化: 在标题、内容中合理布局"Scrapyrt", "HTTP API", "爬虫服务", "实时爬取", "微服务"等关键词
  2. 内容结构: 使用清晰的标题层级(H1-H6),便于搜索引擎理解内容结构
  3. 内部链接: 建立与其他相关教程的内部链接,提升页面权重
  4. 元数据优化: 在页面头部包含描述性的标题、描述和标签

🔗 相关教程推荐

🏷️ 标签云: Scrapyrt HTTP API 实时爬虫 微服务 按需爬取 API服务 爬虫架构 容器化部署 监控系统 安全性配置