#Scrapyrt实战指南 - 将爬虫转换为HTTP API服务
📂 所属阶段:第五阶段 — 战力升级(分布式与进阶篇)
🔗 相关章节:Scrapy-Redis分布式架构 · Docker容器化爬虫 · 抓取监控看板
#目录
#Scrapyrt简介
Scrapyrt(Scrapy Real-Time)是一个Scrapy插件,它可以将你的Scrapy爬虫转换为HTTP API服务,允许通过HTTP请求实时触发爬虫任务。这种架构模式特别适合需要按需爬取、微服务集成或API网关场景的应用。
#Scrapyrt的核心优势
"""
Scrapyrt主要优势:
1. 服务化架构:
- 将爬虫转换为HTTP服务
- 支持RESTful API调用
- 易于与其他系统集成
2. 按需爬取:
- 实时触发爬虫任务
- 避免持续运行的资源消耗
- 支持动态参数传递
3. 弹性扩展:
- 无状态设计便于水平扩展
- 支持负载均衡部署
- 与容器化技术完美结合
4. 开发友好:
- 标准HTTP协议通信
- JSON格式数据交换
- 丰富的调试和监控功能
"""#适用场景分析
"""
Scrapyrt适用场景:
1. 微服务架构:
- 作为数据采集微服务
- 与API网关集成
- 支持服务发现和负载均衡
2. 按需爬取:
- 用户触发的即时爬取
- 特定时间段的批量任务
- 事件驱动的爬取需求
3. API集成:
- 与Web应用集成
- 第三方系统数据获取
- 实时数据同步服务
"""#安装与配置
#环境准备
# 确保Python环境已安装
python --version # 需要Python 3.6+
# 创建虚拟环境(推荐)
python -m venv scrapyrt_env
source scrapyrt_env/bin/activate # Linux/Mac
# 或
scrapyrt_env\Scripts\activate # Windows
# 安装Scrapyrt
pip install scrapyrt
# 验证安装
scrapyrt --help#依赖检查
# check_dependencies.py
import sys
try:
import scrapy
import scrapyrt
import treq
import klein
print("✅ All dependencies installed successfully")
except ImportError as e:
print(f"❌ Missing dependency: {e}")
sys.exit(1)#基础配置
# scrapyrt配置文件 - scrapyrt.conf
[scrapyrt]
# 服务端口
port = 6023
# 项目路径
project_path = /path/to/your/scrapy/project
# 最大并发请求数
max_concurrent_requests = 10
# 请求超时时间(秒)
timeout = 300
# 日志级别
log_level = INFO
# 是否启用调试模式
debug = false
# 允许的最大爬取深度
max_depth = 3
# 下载延迟
download_delay = 1#基本使用方法
#启动Scrapyrt服务
# 方法1:直接启动(默认端口6023)
scrapyrt
# 方法2:指定端口
scrapyrt -p 8080
# 方法3:指定项目路径
scrapyrt -p 6023 -i /path/to/your/scrapy/project
# 方法4:使用配置文件
scrapyrt -c /path/to/scrapyrt.conf
# 方法5:后台运行
nohup scrapyrt -p 6023 > scrapyrt.log 2>&1 &#创建示例爬虫
# example_spider.py - 示例爬虫
import scrapy
from scrapy import Request
class ExampleSpider(scrapy.Spider):
name = 'example'
def start_requests(self):
urls = getattr(self, 'url', None)
if urls:
if isinstance(urls, str):
urls = [urls]
for url in urls:
yield Request(url, callback=self.parse)
def parse(self, response):
# 提取页面信息
yield {
'url': response.url,
'title': response.css('title::text').get(),
'status': response.status,
'timestamp': self.crawler.stats.start_time.isoformat()
}#验证服务启动
# 检查服务是否启动
curl http://localhost:6023/
# 预期返回
{
"message": "ScrapyRT is ready!",
"version": "0.11.0",
"endpoints": [
"/crawl.json",
"/crawl.json/request",
"/stats.json"
]
}#HTTP API接口详解
#主要API端点
"""
Scrapyrt提供的主要API端点:
1. /crawl.json - 同步爬取端点
- GET请求:触发爬虫任务
- 参数:spider_name, url, max_requests等
2. /crawl.json/request - 异步爬取端点
- POST请求:发送更复杂的爬虫请求
- 支持自定义Request对象
3. /stats.json - 统计信息端点
- GET请求:获取服务运行状态
- 返回性能指标和统计信息
"""#同步爬取API (/crawl.json)
# 基本GET请求
curl "http://localhost:6023/crawl.json?spider_name=example&url=https://example.com"
# 带参数的请求
curl "http://localhost:6023/crawl.json?spider_name=example&url=https://example.com&max_requests=5&start_requests=true"
# 返回结果示例
{
"status": "ok",
"items": [
{
"url": "https://example.com",
"title": "Example Domain",
"status": 200,
"timestamp": "2024-01-15T10:30:00.123456"
}
],
"stats": {
"start_time": "2024-01-15 10:30:00",
"finish_time": "2024-01-15 10:30:05",
"item_scraped_count": 1,
"response_received_count": 1,
"finish_reason": "finished"
}
}#异步爬取API (/crawl.json/request)
# POST请求示例
curl -X POST http://localhost:6023/crawl.json/request \
-H "Content-Type: application/json" \
-d '{
"spider_name": "example",
"request": {
"url": "https://example.com",
"method": "GET",
"headers": {
"User-Agent": "Custom Bot 1.0"
},
"meta": {
"custom_param": "value"
}
}
}'#统计信息API (/stats.json)
# 获取服务统计信息
curl http://localhost:6023/stats.json
# 返回示例
{
"scrapy_version": "2.6.1",
"scrapyrt_version": "0.11.0",
"active_requests": 2,
"total_requests": 150,
"total_items": 89,
"uptime": "2 hours 30 minutes",
"memory_usage": "128 MB"
}#Python客户端调用
#基础调用示例
# scrapyrt_client.py - Scrapyrt Python客户端
import requests
import json
from typing import Dict, List, Optional
class ScrapyrtClient:
"""
Scrapyrt客户端类
"""
def __init__(self, base_url: str = "http://localhost:6023"):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
def crawl_sync(self, spider_name: str, url: str, **params) -> Dict:
"""
同步爬取
"""
endpoint = f"{self.base_url}/crawl.json"
params.update({
'spider_name': spider_name,
'url': url
})
response = self.session.get(endpoint, params=params)
response.raise_for_status()
return response.json()
def crawl_async(self, spider_name: str, request_data: Dict) -> Dict:
"""
异步爬取
"""
endpoint = f"{self.base_url}/crawl.json/request"
payload = {
'spider_name': spider_name,
'request': request_data
}
response = self.session.post(
endpoint,
json=payload,
headers={'Content-Type': 'application/json'}
)
response.raise_for_status()
return response.json()
def get_stats(self) -> Dict:
"""
获取统计信息
"""
endpoint = f"{self.base_url}/stats.json"
response = self.session.get(endpoint)
response.raise_for_status()
return response.json()
# 使用示例
client = ScrapyrtClient()
# 同步调用
result = client.crawl_sync(
spider_name='example',
url='https://httpbin.org/get',
max_requests=1
)
print(json.dumps(result, indent=2, ensure_ascii=False))#高级调用示例
# advanced_client.py - 高级客户端功能
import asyncio
import aiohttp
import time
from concurrent.futures import ThreadPoolExecutor
import logging
class AsyncScrapyrtClient:
"""
异步Scrapyrt客户端
"""
def __init__(self, base_url: str = "http://localhost:6023"):
self.base_url = base_url.rstrip('/')
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def crawl_batch(self, requests_list: List[Dict]) -> List[Dict]:
"""
批量爬取
"""
tasks = []
for req_data in requests_list:
task = self._single_crawl(req_data)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def _single_crawl(self, req_data: Dict) -> Dict:
"""
单次爬取
"""
try:
start_time = time.time()
async with self.session.get(
f"{self.base_url}/crawl.json",
params=req_data
) as response:
result = await response.json()
result['request_time'] = time.time() - start_time
return result
except Exception as e:
return {'error': str(e), 'request_data': req_data}
# 批量调用示例
async def batch_example():
async with AsyncScrapyrtClient() as client:
requests_list = [
{'spider_name': 'example', 'url': 'https://httpbin.org/get', 'max_requests': 1},
{'spider_name': 'example', 'url': 'https://httpbin.org/user-agent', 'max_requests': 1},
{'spider_name': 'example', 'url': 'https://httpbin.org/headers', 'max_requests': 1}
]
results = await client.crawl_batch(requests_list)
for result in results:
print(json.dumps(result, indent=2, ensure_ascii=False))
# 运行示例
# asyncio.run(batch_example())#错误处理和重试机制
# error_handling.py - 错误处理和重试
import time
from functools import wraps
def retry_on_failure(max_retries=3, delay=1, backoff=2):
"""
重试装饰器
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
current_delay = delay
while retries < max_retries:
try:
return func(*args, **kwargs)
except requests.RequestException as e:
retries += 1
if retries >= max_retries:
raise e
time.sleep(current_delay)
current_delay *= backoff
return None
return wrapper
return decorator
class RobustScrapyrtClient(ScrapyrtClient):
"""
具备重试机制的客户端
"""
@retry_on_failure(max_retries=3, delay=1, backoff=2)
def crawl_with_retry(self, spider_name: str, url: str, **params) -> Dict:
"""
带重试的爬取方法
"""
return self.crawl_sync(spider_name, url, **params)
def crawl_with_timeout(self, spider_name: str, url: str, timeout: int = 300, **params) -> Dict:
"""
带超时控制的爬取方法
"""
endpoint = f"{self.base_url}/crawl.json"
params.update({
'spider_name': spider_name,
'url': url
})
response = self.session.get(endpoint, params=params, timeout=timeout)
response.raise_for_status()
return response.json()
# 使用示例
robust_client = RobustScrapyrtClient()
try:
result = robust_client.crawl_with_retry(
spider_name='example',
url='https://httpbin.org/delay/2',
max_requests=1
)
print("✅ Crawling successful:", result.get('status'))
except Exception as e:
print(f"❌ Crawling failed after retries: {e}")#高级配置选项
#服务配置优化
# advanced_config.py - 高级配置选项
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class ScrapyrtConfig:
"""
Scrapyrt配置类
"""
port: int = 6023
bind_address: str = '0.0.0.0'
project_path: str = './'
max_concurrent_requests: int = 10
timeout: int = 300
log_level: str = 'INFO'
debug: bool = False
max_depth: int = 3
download_delay: float = 1.0
concurrent_items: int = 100
memusage_limit_mb: int = 1024
memusage_warning_mb: int = 512
retry_times: int = 3
retry_http_codes: list = None
def __post_init__(self):
if self.retry_http_codes is None:
self.retry_http_codes = [500, 502, 503, 504, 408, 429]
def to_dict(self) -> dict:
"""
转换为字典格式
"""
return {
'port': self.port,
'bind_address': self.bind_address,
'project_path': self.project_path,
'max_concurrent_requests': self.max_concurrent_requests,
'timeout': self.timeout,
'log_level': self.log_level,
'debug': self.debug,
'max_depth': self.max_depth,
'download_delay': self.download_delay,
'concurrent_items': self.concurrent_items,
'memusage_limit_mb': self.memusage_limit_mb,
'memusage_warning_mb': self.memusage_warning_mb,
'retry_times': self.retry_times,
'retry_http_codes': self.retry_http_codes
}
# 配置示例
config = ScrapyrtConfig(
port=8080,
max_concurrent_requests=20,
timeout=600,
log_level='DEBUG',
memusage_limit_mb=2048
)
print("Scrapyrt配置:", json.dumps(config.to_dict(), indent=2, ensure_ascii=False))#性能调优配置
# performance_config.py - 性能调优配置
PERFORMANCE_CONFIG = {
# 网络性能优化
'CONCURRENT_REQUESTS': 32,
'CONCURRENT_REQUESTS_PER_DOMAIN': 8,
'DOWNLOAD_DELAY': 0.5,
'RANDOMIZE_DOWNLOAD_DELAY': 0.1,
# 内存优化
'MEMUSAGE_LIMIT_MB': 2048,
'MEMUSAGE_WARNING_MB': 1024,
'CLOSESPIDER_TIMEOUT': 300,
# 下载器优化
'DOWNLOAD_TIMEOUT': 60,
'DOWNLOAD_MAXSIZE': 1024*1024*100, # 100MB
'DOWNLOAD_WARNSIZE': 1024*1024*32, # 32MB
# 并发优化
'REACTOR_THREADPOOL_MAXSIZE': 20,
'DNSCACHE_ENABLED': True,
'DNSCACHE_SIZE': 10000,
# 缓存优化
'HTTPCACHE_ENABLED': False, # Scrapyrt通常不需要缓存
'REDIRECT_ENABLED': True,
'COOKIES_ENABLED': True,
# 重试优化
'RETRY_TIMES': 3,
'RETRY_HTTP_CODES': [500, 502, 503, 504, 408, 429, 403],
# 日志优化
'LOG_LEVEL': 'INFO',
'LOG_SHORT_NAMES': True
}
# 环境特定配置
ENV_CONFIGS = {
'development': {
'LOG_LEVEL': 'DEBUG',
'CONCURRENT_REQUESTS': 8,
'DOWNLOAD_DELAY': 1.0
},
'production': {
'LOG_LEVEL': 'WARNING',
'CONCURRENT_REQUESTS': 32,
'DOWNLOAD_DELAY': 0.5,
'MEMUSAGE_LIMIT_MB': 4096
},
'testing': {
'LOG_LEVEL': 'ERROR',
'CONCURRENT_REQUESTS': 4,
'DOWNLOAD_DELAY': 0.1
}
}#性能优化策略
#并发控制优化
# concurrency_optimization.py - 并发控制优化
import asyncio
import time
from asyncio import Semaphore
from typing import List, Dict
class ConcurrencyController:
"""
并发控制器
"""
def __init__(self, max_concurrent: int = 10):
self.semaphore = Semaphore(max_concurrent)
self.active_tasks = 0
self.max_concurrent = max_concurrent
async def execute_with_limit(self, func, *args, **kwargs):
"""
限制并发执行
"""
async with self.semaphore:
self.active_tasks += 1
try:
result = await func(*args, **kwargs)
return result
finally:
self.active_tasks -= 1
class OptimizedScrapyrtClient(AsyncScrapyrtClient):
"""
优化的Scrapyrt客户端
"""
def __init__(self, base_url: str = "http://localhost:6023", max_concurrent: int = 5):
super().__init__(base_url)
self.concurrency_controller = ConcurrencyController(max_concurrent)
async def optimized_batch_crawl(self, requests_list: List[Dict], batch_size: int = 10) -> List[Dict]:
"""
优化的批量爬取,支持批处理
"""
results = []
# 分批处理
for i in range(0, len(requests_list), batch_size):
batch = requests_list[i:i + batch_size]
batch_results = await self._process_batch(batch)
results.extend(batch_results)
# 批次间短暂延迟,避免过载
if i + batch_size < len(requests_list):
await asyncio.sleep(0.1)
return results
async def _process_batch(self, batch: List[Dict]) -> List[Dict]:
"""
处理单个批次
"""
tasks = []
for req_data in batch:
task = self.concurrency_controller.execute_with_limit(
self._single_crawl, req_data
)
tasks.append(task)
return await asyncio.gather(*tasks, return_exceptions=True)
# 性能监控
class PerformanceMonitor:
"""
性能监控器
"""
def __init__(self):
self.requests_count = 0
self.items_count = 0
self.errors_count = 0
self.total_time = 0
self.start_time = time.time()
def record_request(self, items_count: int = 0, error: bool = False):
"""
记录请求统计
"""
self.requests_count += 1
self.items_count += items_count
if error:
self.errors_count += 1
def get_stats(self) -> Dict:
"""
获取性能统计
"""
elapsed_time = time.time() - self.start_time
return {
'requests_per_second': self.requests_count / elapsed_time if elapsed_time > 0 else 0,
'items_per_second': self.items_count / elapsed_time if elapsed_time > 0 else 0,
'error_rate': self.errors_count / self.requests_count if self.requests_count > 0 else 0,
'total_time': elapsed_time,
'total_requests': self.requests_count,
'total_items': self.items_count,
'total_errors': self.errors_count
}#内存优化策略
# memory_optimization.py - 内存优化策略
import gc
import psutil
import sys
from weakref import WeakValueDictionary
class MemoryOptimizer:
"""
内存优化器
"""
def __init__(self, max_memory_mb: int = 1024):
self.max_memory_mb = max_memory_mb
self.object_cache = WeakValueDictionary()
self.collection_threshold = 1000 # 每处理1000个对象进行一次垃圾回收
def monitor_memory(self) -> Dict:
"""
监控内存使用情况
"""
process = psutil.Process()
memory_info = process.memory_info()
return {
'rss': memory_info.rss / 1024 / 1024, # MB
'vms': memory_info.vms / 1024 / 1024, # MB
'percent': process.memory_percent()
}
def optimize_memory_usage(self, force_gc: bool = False):
"""
优化内存使用
"""
memory_info = self.monitor_memory()
if memory_info['rss'] > self.max_memory_mb or force_gc:
# 执行垃圾回收
collected = gc.collect()
print(f"Garbage collected: {collected} objects")
# 清理缓存
self.object_cache.clear()
def cache_object(self, key: str, obj):
"""
缓存对象(使用弱引用避免内存泄漏)
"""
self.object_cache[key] = obj
def get_cached_object(self, key: str):
"""
获取缓存对象
"""
return self.object_cache.get(key)
class OptimizedResponseHandler:
"""
优化的响应处理器
"""
def __init__(self):
self.memory_optimizer = MemoryOptimizer(max_memory_mb=512)
def process_large_response(self, response_data: Dict, max_items: int = 1000):
"""
处理大响应数据
"""
if 'items' in response_data:
items = response_data['items']
# 限制返回的项目数量
if len(items) > max_items:
response_data['items'] = items[:max_items]
response_data['truncated'] = True
response_data['original_count'] = len(items)
# 限制单个项目大小
for item in response_data['items']:
self._limit_item_size(item)
return response_data
def _limit_item_size(self, item: Dict, max_string_length: int = 10000):
"""
限制项目大小
"""
for key, value in item.items():
if isinstance(value, str) and len(value) > max_string_length:
item[key] = value[:max_string_length] + "...[TRUNCATED]"
def cleanup_response(self, response_data: Dict):
"""
清理响应数据
"""
# 移除不必要的元数据
cleanup_keys = ['_response_time', '_process_info']
for key in cleanup_keys:
response_data.pop(key, None)
# 优化内存使用
self.memory_optimizer.optimize_memory_usage()#安全性配置
#访问控制中间件
# security_config.py - 安全性配置
import hmac
import hashlib
import time
from functools import wraps
from typing import List
class SecurityConfig:
"""
安全配置类
"""
def __init__(self):
self.api_keys = {} # API密钥存储
self.rate_limits = {} # 速率限制
self.whitelist_ips = [] # IP白名单
self.blacklist_ips = [] # IP黑名单
self.request_timeout = 300 # 请求超时
self.max_request_size = 1024 * 1024 # 最大请求大小1MB
def add_api_key(self, api_key: str, permissions: List[str] = None):
"""
添加API密钥
"""
if permissions is None:
permissions = ['read', 'write']
self.api_keys[api_key] = {
'permissions': permissions,
'created_at': time.time(),
'last_used': None
}
def verify_api_key(self, api_key: str, required_permission: str = 'read') -> bool:
"""
验证API密钥
"""
if api_key not in self.api_keys:
return False
key_info = self.api_keys[api_key]
if required_permission not in key_info['permissions']:
return False
# 更新最后使用时间
key_info['last_used'] = time.time()
return True
def verify_signature(self, data: str, signature: str, secret_key: str) -> bool:
"""
验证请求签名
"""
expected_signature = hmac.new(
secret_key.encode(),
data.encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected_signature)
def check_rate_limit(self, identifier: str, max_requests: int = 100, window: int = 3600) -> bool:
"""
检查速率限制
"""
current_time = time.time()
if identifier not in self.rate_limits:
self.rate_limits[identifier] = []
# 清理过期的请求记录
self.rate_limits[identifier] = [
req_time for req_time in self.rate_limits[identifier]
if current_time - req_time < window
]
# 检查是否超过限制
if len(self.rate_limits[identifier]) >= max_requests:
return False
# 记录当前请求
self.rate_limits[identifier].append(current_time)
return True
# 装饰器实现认证
def require_auth(security_config: SecurityConfig, permission: str = 'read'):
"""
认证装饰器
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 从请求中获取API密钥
api_key = kwargs.get('api_key') or args[0].get('api_key')
if not api_key or not security_config.verify_api_key(api_key, permission):
raise PermissionError("Invalid or insufficient API key")
return func(*args, **kwargs)
return wrapper
return decorator
# 使用示例
security = SecurityConfig()
security.add_api_key('your-secret-api-key', ['read', 'write'])#SSL/TLS配置
# ssl_config.py - SSL/TLS配置
SSL_CONFIG = {
# SSL证书路径
'ssl_cert_file': '/path/to/certificate.crt',
'ssl_key_file': '/path/to/private.key',
# SSL协议版本
'ssl_protocol': 'TLSv1.2',
# 密码套件
'ssl_ciphers': [
'ECDHE-RSA-AES256-GCM-SHA384',
'ECDHE-RSA-AES128-GCM-SHA256',
'ECDHE-RSA-AES256-SHA384',
'ECDHE-RSA-AES128-SHA256'
],
# 安全头
'security_headers': {
'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block'
}
}#部署与运维
#Docker部署方案
# Dockerfile.scrapyrt - Scrapyrt Docker镜像
FROM python:3.9-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
libxml2-dev \
libxslt1-dev \
libffi-dev \
libssl-dev \
libpq-dev \
libsqlite3-dev \
libreadline-dev \
libbz2-dev \
libncursesw5-dev \
libexpat1-dev \
libjpeg-dev \
libpng-dev \
libfreetype6-dev \
zlib1g-dev \
&& rm -rf /var/lib/apt/lists/*
# 复制项目文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 安装Scrapyrt
RUN pip install scrapyrt
# 复制应用代码
COPY . .
# 创建非root用户
RUN groupadd -r appuser && useradd -r -g appuser appuser
RUN chown -R appuser:appuser /app
USER appuser
# 暴露端口
EXPOSE 6023
# 启动命令
CMD ["scrapyrt", "-p", "6023", "-i", "/app"]
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:6023/ || exit 1#Docker Compose配置
# docker-compose-scrapyrt.yml
version: '3.8'
services:
scrapyrt:
build:
context: .
dockerfile: Dockerfile.scrapyrt
ports:
- "6023:6023"
environment:
- PYTHONUNBUFFERED=1
- SCRAPY_SETTINGS_MODULE=myproject.settings
volumes:
- ./logs:/app/logs
- ./config:/app/config
restart: unless-stopped
depends_on:
- redis
networks:
- crawler-network
redis:
image: redis:6.2-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
command: redis-server --appendonly yes
networks:
- crawler-network
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- scrapyrt
networks:
- crawler-network
volumes:
redis-data:
networks:
crawler-network:
driver: bridge#系统服务配置
# scrapyrt.service - systemd服务配置
[Unit]
Description=Scrapyrt Service
After=network.target
[Service]
Type=simple
User=scrapyrt
Group=scrapyrt
WorkingDirectory=/opt/scrapyrt
ExecStart=/opt/scrapyrt/venv/bin/scrapyrt -p 6023 -i /opt/scrapyrt/project
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
SyslogIdentifier=scrapyrt
[Install]
WantedBy=multi-user.target#监控与日志
#日志配置
# logging_config.py - 日志配置
import logging
import logging.handlers
import json
from datetime import datetime
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
},
'json': {
'()': 'pythonjsonlogger.jsonlogger.JsonFormatter',
'format': '%(asctime)s %(name)s %(levelname)s %(message)s'
}
},
'handlers': {
'console': {
'level': 'INFO',
'class': 'logging.StreamHandler',
'formatter': 'standard'
},
'file': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler',
'filename': 'scrapyrt.log',
'maxBytes': 1024*1024*10, # 10MB
'backupCount': 5,
'formatter': 'standard'
},
'error_file': {
'level': 'ERROR',
'class': 'logging.handlers.RotatingFileHandler',
'filename': 'scrapyrt_error.log',
'maxBytes': 1024*1024*10, # 10MB
'backupCount': 5,
'formatter': 'json'
}
},
'loggers': {
'scrapyrt': {
'handlers': ['console', 'file', 'error_file'],
'level': 'DEBUG',
'propagate': False
},
'twisted': {
'handlers': ['file'],
'level': 'WARNING',
'propagate': False
}
}
}
def setup_logging():
"""
设置日志配置
"""
import logging.config
logging.config.dictConfig(LOGGING_CONFIG)
# 使用示例
setup_logging()
logger = logging.getLogger('scrapyrt')
logger.info("Scrapyrt service started")#监控指标收集
# monitoring.py - 监控指标收集
import time
import psutil
from prometheus_client import Counter, Gauge, Histogram, start_http_server
from functools import wraps
# Prometheus指标定义
REQUEST_COUNT = Counter('scrapyrt_requests_total', 'Total requests processed', ['endpoint', 'method'])
ERROR_COUNT = Counter('scrapyrt_errors_total', 'Total errors', ['error_type'])
RESPONSE_TIME = Histogram('scrapyrt_response_time_seconds', 'Response time in seconds')
ACTIVE_REQUESTS = Gauge('scrapyrt_active_requests', 'Number of active requests')
class MetricsCollector:
"""
指标收集器
"""
def __init__(self, port: int = 8000):
self.port = port
start_http_server(port)
self.start_time = time.time()
def collect_system_metrics(self):
"""
收集系统指标
"""
return {
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'disk_usage': psutil.disk_usage('/').percent,
'uptime': time.time() - self.start_time
}
def instrument_endpoint(self, endpoint: str, method: str = 'GET'):
"""
为端点添加监控装饰器
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
ACTIVE_REQUESTS.inc()
try:
result = func(*args, **kwargs)
REQUEST_COUNT.labels(endpoint=endpoint, method=method).inc()
return result
except Exception as e:
ERROR_COUNT.labels(error_type=type(e).__name__).inc()
raise
finally:
RESPONSE_TIME.observe(time.time() - start_time)
ACTIVE_REQUESTS.dec()
return wrapper
return decorator
# 使用示例
metrics = MetricsCollector(port=8000)
@metrics.instrument_endpoint('/crawl.json', 'GET')
def monitored_crawl():
"""
受监控的爬取方法
"""
time.sleep(0.1) # 模拟处理时间
return {"status": "ok"}
# 获取监控数据
print("System metrics:", metrics.collect_system_metrics())#常见问题与解决方案
#问题1: 服务启动失败
现象: Scrapyrt服务无法启动,报错"Address already in use" 解决方案:
# 检查端口占用
netstat -tulpn | grep 6023
# 或使用lsof
lsof -i :6023
# 终止占用进程
kill -9 <PID>
# 或使用不同端口启动
scrapyrt -p 6024#问题2: 爬虫超时
现象: 爬虫请求长时间无响应 解决方案:
# 增加超时时间
TIMEOUT_CONFIG = {
'timeout': 600, # 10分钟
'download_timeout': 120, # 下载超时
'connect_timeout': 30 # 连接超时
}
# 在爬虫中设置超时
CUSTOM_SETTINGS = {
'DOWNLOAD_TIMEOUT': 120,
'CONNECTION_TIMEOUT': 30
}#问题3: 内存溢出
现象: 服务运行一段时间后内存占用过高 解决方案:
# 设置内存限制
MEMORY_CONFIG = {
'MEMUSAGE_LIMIT_MB': 2048,
'MEMUSAGE_WARNING_MB': 1024,
'CLOSESPIDER_TIMEOUT': 300
}
# 优化响应处理
def process_large_response(response):
# 限制返回的数据量
items = response.get('items', [])[:1000] # 只返回前1000个项目
return {'items': items, 'truncated': len(response.get('items', [])) > 1000}#问题4: 并发限制
现象: 高并发请求时服务响应缓慢或失败 解决方案:
# 调整并发参数
CONCURRENCY_CONFIG = {
'CONCURRENT_REQUESTS': 32,
'CONCURRENT_REQUESTS_PER_DOMAIN': 8,
'REACTOR_THREADPOOL_MAXSIZE': 20,
'max_concurrent_requests': 10 # Scrapyrt级别的并发限制
}#最佳实践总结
#部署最佳实践
-
容器化部署:
- 使用Docker进行标准化部署
- 通过Docker Compose管理多服务
- 实现环境隔离和快速扩展
-
配置管理:
- 使用配置文件管理环境变量
- 实现配置的版本控制
- 支持多环境配置切换
-
监控告警:
- 集成Prometheus等监控系统
- 设置关键指标告警
- 实现日志集中管理
#安全最佳实践
-
访问控制:
- 实施API密钥认证
- 配置IP白名单/黑名单
- 启用请求签名验证
-
数据保护:
- 使用HTTPS加密传输
- 实施速率限制
- 验证输入参数合法性
-
权限管理:
- 最小权限原则
- 定期轮换密钥
- 记录访问日志
#性能最佳实践
-
资源优化:
- 合理设置并发数
- 优化内存使用
- 实现连接池复用
-
缓存策略:
- 适当使用响应缓存
- 实现数据预热
- 避免重复计算
-
扩展性:
- 无状态设计
- 支持水平扩展
- 实现负载均衡
💡 核心要点: Scrapyrt将传统爬虫转换为HTTP API服务,实现了服务化架构。通过合理配置和优化,可以构建高可用、高性能的实时爬取服务。
#SEO优化策略
- 关键词优化: 在标题、内容中合理布局"Scrapyrt", "HTTP API", "爬虫服务", "实时爬取", "微服务"等关键词
- 内容结构: 使用清晰的标题层级(H1-H6),便于搜索引擎理解内容结构
- 内部链接: 建立与其他相关教程的内部链接,提升页面权重
- 元数据优化: 在页面头部包含描述性的标题、描述和标签
🔗 相关教程推荐
- Scrapy-Redis分布式架构 - 分布式爬虫实现
- Docker容器化爬虫 - 容器化部署方案
- 抓取监控看板 - 监控系统构建
- Pipeline管道实战 - 数据处理管道
🏷️ 标签云: Scrapyrt HTTP API 实时爬虫 微服务 按需爬取 API服务 爬虫架构 容器化部署 监控系统 安全性配置

