#Scrapyd与ScrapydWeb - 分布式爬虫部署与监控平台详解
📂 所属阶段:第六阶段 — 运维与监控(工程化篇)
🔗 相关章节:Scrapy-Redis分布式架构 · Docker容器化爬虫 · 抓取监控看板
#目录
- Scrapyd与ScrapydWeb概述
- Scrapyd安装与配置
- ScrapydWeb安装与配置
- 项目部署与管理
- 远程控制与API接口
- 日志管理与监控
- 生产环境部署
- 性能优化与调优
- 故障排查与维护
- 最佳实践总结
#Scrapyd与ScrapydWeb概述
Scrapyd和ScrapydWeb是Scrapy生态系统中重要的部署和监控工具,它们共同构成了完整的爬虫运维解决方案。
#Scrapyd简介
Scrapyd是一个守护进程,用于运行Scrapy爬虫服务。它提供了HTTP API,允许你部署爬虫项目并控制它们的执行。
核心特性:
- HTTP API接口
- 多项目支持
- 并发爬虫管理
- 日志管理
- 进程监控
#ScrapydWeb简介
ScrapydWeb是Scrapyd的Web UI界面,提供直观的图形化管理界面,方便运维人员进行爬虫管理。
核心特性:
- 可视化管理界面
- 多节点集群管理
- 实时日志查看
- 统计数据展示
- 任务调度功能
#Scrapyd安装与配置
#安装Scrapyd
# 安装Scrapyd
pip install scrapyd
# 安装Scrapyd客户端工具
pip install scrapyd-client#配置文件详解
# scrapyd.conf - Scrapyd配置文件
[scrapyd]
# 监听地址,默认为0.0.0.0(所有接口)
bind_address = 0.0.0.0
# 监听端口
port = 6800
# 最大并发数
max_proc = 0 # 0表示CPU核心数
# 每个主机的最大并发数
max_proc_per_host = 0
# 项目轮询间隔(秒)
poll_interval = 5
# 状态刷新间隔
status_update_interval = 10
# 日志保留天数
logs_to_keep = 30
# 每个项目保留的作业数
keep_jobs = 5
# 项目目录
eggs_dir = eggs
# 日志目录
logs_dir = logs
# 数据库文件
dbs_dir = dbs
# 项目配置目录
items_dir = items
# 是否启用JSON-RPC接口
jsonrpc_port = 6800
# 环境变量
debug = off
jobs_to_keep = 5
finished_to_keep = 100
# 示例项目配置
[myproject]
# 项目特定的并发数
max_proc = 4
# 每个主机的并发数
max_proc_per_host = 2#启动Scrapyd服务
# 方式1:直接启动
scrapyd
# 方式2:指定配置文件启动
scrapyd -c /path/to/scrapyd.conf
# 方式3:后台启动(Linux)
scrapyd &#服务管理脚本
#!/bin/bash
# scrapyd-service.sh - Scrapyd服务管理脚本
SCRAPYD_PID_FILE="/var/run/scrapyd.pid"
SCRAPYD_LOG_FILE="/var/log/scrapyd.log"
start() {
echo "Starting Scrapyd..."
if [ -f "$SCRA PID_FILE" ]; then
echo "Scrapyd is already running."
exit 1
fi
nohup scrapyd -c /etc/scrapyd/scrapyd.conf > "$SCRA PID_LOG_FILE" 2>&1 &
echo $! > "$SCRA PID_FILE"
echo "Scrapyd started with PID $(cat $SCRA PID_FILE)"
}
stop() {
echo "Stopping Scrapyd..."
if [ ! -f "$SCRA PID_FILE" ]; then
echo "Scrapyd is not running."
exit 1
fi
PID=$(cat "$SCRA PID_FILE")
kill $PID
rm -f "$SCRA PID_FILE"
echo "Scrapyd stopped."
}
restart() {
stop
sleep 2
start
}
case "$1" in
start)
start
;;
stop)
stop
;;
restart)
restart
;;
*)
echo "Usage: $0 {start|stop|restart}"
exit 1
;;
esac#ScrapydWeb安装与配置
#安装ScrapydWeb
# 安装ScrapydWeb
pip install scrapydweb
# 或者从源码安装
git clone https://github.com/my8100/scrapydweb.git
cd scrapydweb
pip install -r requirements.txt#配置ScrapydWeb
# config.py - ScrapydWeb配置文件
import os
# Flask配置
SECRET_KEY = 'your-secret-key-here'
PERMANENT_SESSION_LIFETIME = 3600 # 1小时
# ScrapydWeb配置
SCRAPYDWEB_BIND = '0.0.0.0'
SCRAPYDWEB_PORT = 5000
ENABLE_AUTHENTICATION = True
USERNAME = 'admin'
PASSWORD = 'your-password-here'
# Scrapyd节点配置
SCRAPYD_SERVERS = [
'localhost:6800', # 本地节点
'192.168.1.100:6800', # 远程节点1
'192.168.1.101:6800', # 远程节点2
]
# 日志配置
LOG_LEVEL = 'INFO'
ENABLE_LOGPARSER = True
ENABLE_EMAIL_ALERT = True
# 数据库配置
DATABASE_PATH = os.path.join(os.getcwd(), 'data.db')
# 其他配置
REFRESH_PAGES_INTERVAL = 30
SHOW_SCRAPYD_ITEMS = True
CUSTOMIZED_BULKSELECT_TAGS = ['production', 'staging', 'development']#启动ScrapydWeb服务
# 方式1:直接启动
scrapydweb
# 方式2:指定配置文件启动
scrapydweb -c /path/to/config.py
# 方式3:后台启动
nohup scrapydweb > scrapydweb.log 2>&1 &#项目部署与管理
#项目打包配置
# scrapy.cfg - Scrapy项目配置
[settings]
default = myproject.settings
[deploy]
# 目标Scrapyd服务器
url = http://localhost:6800/
# 项目名称
project = myproject
[deploy:production]
url = http://prod-server:6800/
project = myproject
username = admin
password = secret
[deploy:staging]
url = http://staging-server:6800/
project = myproject
username = admin
password = secret#部署脚本
#!/bin/bash
# deploy.sh - 自动化部署脚本
PROJECT_NAME="myproject"
DEPLOY_ENV="${1:-default}" # 默认环境
echo "Deploying project: $PROJECT_NAME to environment: $DEPLOY_ENV"
# 检查项目配置
if [ ! -f "scrapy.cfg" ]; then
echo "Error: scrapy.cfg not found!"
exit 1
fi
# 打包并部署
echo "Packaging and deploying..."
scrapyd-deploy $DEPLOY_ENV -p $PROJECT_NAME
if [ $? -eq 0 ]; then
echo "Deployment successful!"
# 获取最新的版本号
VERSION=$(curl -s "http://localhost:6800/listversions.json?project=$PROJECT_NAME" | python -c "import sys, json; print(json.load(sys.stdin)['versions'][-1])" 2>/dev/null)
echo "Deployed version: $VERSION"
else
echo "Deployment failed!"
exit 1
fi#部署管理Python脚本
# deployment_manager.py - 部署管理脚本
import requests
import zipfile
import os
import subprocess
from pathlib import Path
class ScrapydDeploymentManager:
"""
Scrapyd部署管理器
"""
def __init__(self, scrapyd_url: str = "http://localhost:6800"):
self.scrapyd_url = scrapyd_url.rstrip('/')
self.session = requests.Session()
def deploy_project(self, project_path: str, project_name: str, version: str = None) -> dict:
"""
部署项目到Scrapyd
Args:
project_path: 项目路径
project_name: 项目名称
version: 版本号(可选)
Returns:
部署结果
"""
if not version:
import time
version = f"v{int(time.time())}"
# 创建egg文件
egg_path = self._create_egg(project_path)
# 上传部署
with open(egg_path, 'rb') as f:
files = {'egg': f}
data = {
'project': project_name,
'version': version
}
response = self.session.post(
f"{self.scrapyd_url}/addversion.json",
files=files,
data=data
)
# 清理临时文件
os.remove(egg_path)
if response.status_code == 200:
result = response.json()
print(f"Project deployed successfully: {project_name} v{version}")
return result
else:
raise Exception(f"Deployment failed: {response.text}")
def _create_egg(self, project_path: str) -> str:
"""
创建egg文件
"""
project_path = Path(project_path)
old_cwd = os.getcwd()
try:
os.chdir(project_path)
# 运行setup.py创建egg
result = subprocess.run([
'python', 'setup.py', 'bdist_egg'
], capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Failed to create egg: {result.stderr}")
# 获取生成的egg文件
dist_dir = project_path / 'dist'
egg_files = list(dist_dir.glob('*.egg'))
if not egg_files:
raise Exception("No egg file found")
# 返回第一个egg文件的路径
return str(egg_files[0])
finally:
os.chdir(old_cwd)
def list_projects(self) -> dict:
"""
列出所有项目
"""
response = self.session.get(f"{self.scrapyd_url}/listprojects.json")
return response.json()
def list_versions(self, project_name: str) -> dict:
"""
列出项目版本
"""
response = self.session.get(
f"{self.scrapyd_url}/listversions.json",
params={'project': project_name}
)
return response.json()
def delete_version(self, project_name: str, version: str) -> dict:
"""
删除项目版本
"""
response = self.session.post(
f"{self.scrapyd_url}/delversion.json",
data={
'project': project_name,
'version': version
}
)
return response.json()
def schedule_spider(self, project_name: str, spider_name: str, **kwargs) -> dict:
"""
调度爬虫任务
Args:
project_name: 项目名称
spider_name: 爬虫名称
**kwargs: 爬虫参数
Returns:
任务ID
"""
data = {
'project': project_name,
'spider': spider_name
}
# 添加爬虫参数
for key, value in kwargs.items():
data[f'arg_{key}'] = value
response = self.session.post(
f"{self.scrapyd_url}/schedule.json",
data=data
)
if response.status_code == 200:
result = response.json()
print(f"Spider scheduled: {spider_name}, Job ID: {result.get('jobid')}")
return result
else:
raise Exception(f"Scheduling failed: {response.text}")
def cancel_job(self, project_name: str, job_id: str) -> dict:
"""
取消任务
"""
response = self.session.post(
f"{self.scrapyd_url}/cancel.json",
data={
'project': project_name,
'job': job_id
}
)
return response.json()
# 使用示例
if __name__ == "__main__":
manager = ScrapydDeploymentManager()
# 部署项目
# manager.deploy_project("./myproject", "myproject")
# 调度爬虫
# manager.schedule_spider("myproject", "myspider", start_url="https://example.com")#远程控制与API接口
#HTTP API详解
# scrapyd_api_client.py - Scrapyd API客户端
import requests
import json
from typing import Dict, List, Optional
class ScrapydAPIClient:
"""
Scrapyd API客户端
"""
def __init__(self, base_url: str = "http://localhost:6800"):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
def get_overview(self) -> dict:
"""
获取概览信息
"""
response = self.session.get(f"{self.base_url}/daemonstatus.json")
return response.json()
def list_projects(self) -> dict:
"""
列出所有项目
"""
response = self.session.get(f"{self.base_url}/listprojects.json")
return response.json()
def list_versions(self, project: str) -> dict:
"""
列出项目版本
"""
response = self.session.get(
f"{self.base_url}/listversions.json",
params={'project': project}
)
return response.json()
def list_spiders(self, project: str, version: str = None) -> dict:
"""
列出爬虫
"""
params = {'project': project}
if version:
params['version'] = version
response = self.session.get(
f"{self.base_url}/listspiders.json",
params=params
)
return response.json()
def schedule_spider(self, project: str, spider: str, **kwargs) -> dict:
"""
调度爬虫
"""
data = {
'project': project,
'spider': spider
}
for key, value in kwargs.items():
data[f'arg_{key}'] = value
response = self.session.post(
f"{self.base_url}/schedule.json",
data=data
)
return response.json()
def cancel_job(self, project: str, job_id: str) -> dict:
"""
取消任务
"""
response = self.session.post(
f"{self.base_url}/cancel.json",
data={
'project': project,
'job': job_id
}
)
return response.json()
def list_jobs(self, project: str = None) -> dict:
"""
列出任务
"""
params = {}
if project:
params['project'] = project
response = self.session.get(
f"{self.base_url}/listjobs.json",
params=params
)
return response.json()
def get_log(self, project: str, spider: str, job_id: str) -> str:
"""
获取日志
"""
response = self.session.get(
f"{self.base_url}/logs/{project}/{spider}/{job_id}.log"
)
return response.text
def get_items(self, project: str, spider: str, job_id: str) -> str:
"""
获取抓取的项目
"""
response = self.session.get(
f"{self.base_url}/items/{project}/{spider}/{job_id}.jl"
)
return response.text
# API使用示例
def example_usage():
client = ScrapydAPIClient("http://localhost:6800")
# 获取概览
overview = client.get_overview()
print("Daemon Status:", overview)
# 列出项目
projects = client.list_projects()
print("Projects:", projects)
if projects.get('projects'):
project_name = projects['projects'][0]
# 列出爬虫
spiders = client.list_spiders(project_name)
print("Spiders:", spiders)
if spiders.get('spiders'):
spider_name = spiders['spiders'][0]
# 调度爬虫
result = client.schedule_spider(
project_name,
spider_name,
start_url="https://example.com"
)
print("Scheduled:", result)
if __name__ == "__main__":
example_usage()#REST API封装
# scrapyd_rest_api.py - REST API封装
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
class ScrapydRESTAPI:
"""
Scrapyd REST API封装
"""
def __init__(self, scrapyd_url: str = "http://localhost:6800"):
self.scrapyd_url = scrapyd_url.rstrip('/')
def register_routes(self, app: Flask):
"""
注册路由
"""
@app.route('/api/projects', methods=['GET'])
def get_projects():
response = requests.get(f"{self.scrapyd_url}/listprojects.json")
return jsonify(response.json())
@app.route('/api/projects/<project>/spiders', methods=['GET'])
def get_spiders(project):
params = {'project': project}
response = requests.get(
f"{self.scrapyd_url}/listspiders.json",
params=params
)
return jsonify(response.json())
@app.route('/api/projects/<project>/schedule', methods=['POST'])
def schedule_spider(project):
data = request.get_json()
spider = data.get('spider')
args = data.get('args', {})
post_data = {
'project': project,
'spider': spider
}
for key, value in args.items():
post_data[f'arg_{key}'] = value
response = requests.post(
f"{self.scrapyd_url}/schedule.json",
data=post_data
)
return jsonify(response.json())
@app.route('/api/projects/<project>/jobs', methods=['GET'])
def get_jobs(project):
params = {'project': project}
response = requests.get(
f"{self.scrapyd_url}/listjobs.json",
params=params
)
return jsonify(response.json())
# 使用示例
if __name__ == "__main__":
rest_api = ScrapydRESTAPI()
rest_api.register_routes(app)
app.run(host='0.0.0.0', port=5001, debug=True)#日志管理与监控
#日志配置优化
# logging.conf - 日志配置文件
[loggers]
keys=root,scrapy,scrapyd
[handlers]
keys=consoleHandler,fileHandler,errorHandler
[formatters]
keys=simpleFormatter,detailedFormatter
[logger_root]
level=INFO
handlers=consoleHandler,fileHandler
[logger_scrapy]
level=DEBUG
handlers=consoleHandler,fileHandler
qualname=scrapy
propagate=0
[logger_scrapyd]
level=INFO
handlers=consoleHandler,fileHandler
qualname=scrapyd
propagate=0
[handler_consoleHandler]
class=StreamHandler
level=INFO
formatter=simpleFormatter
args=(sys.stdout,)
[handler_fileHandler]
class=FileHandler
level=DEBUG
formatter=detailedFormatter
args=('scrapyd.log',)
[handler_errorHandler]
class=FileHandler
level=ERROR
formatter=detailedFormatter
args=('scrapyd_error.log',)
[formatter_simpleFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
[formatter_detailedFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s#日志分析工具
# log_analyzer.py - 日志分析工具
import re
import json
from datetime import datetime
from collections import defaultdict, Counter
from pathlib import Path
class ScrapydLogAnalyzer:
"""
Scrapyd日志分析器
"""
def __init__(self, log_directory: str):
self.log_directory = Path(log_directory)
self.patterns = {
'info': r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(.*?)\] INFO: (.*)',
'error': r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(.*?)\] ERROR: (.*)',
'warning': r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(.*?)\] WARNING: (.*)',
'critical': r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(.*?)\] CRITICAL: (.*)',
}
def analyze_logs(self, project_name: str, spider_name: str, days: int = 7) -> dict:
"""
分析日志
Args:
project_name: 项目名称
spider_name: 爬虫名称
days: 分析天数
Returns:
分析结果
"""
log_dir = self.log_directory / project_name / spider_name
if not log_dir.exists():
return {"error": f"Log directory not found: {log_dir}"}
results = {
'total_lines': 0,
'errors': [],
'warnings': [],
'infos': [],
'criticals': [],
'stats': {
'error_count': 0,
'warning_count': 0,
'info_count': 0,
'critical_count': 0,
'level_distribution': Counter(),
'hourly_activity': Counter(),
'most_common_errors': Counter(),
'spider_performance': {}
}
}
# 读取最近几天的日志文件
log_files = list(log_dir.glob('*.log'))
for log_file in log_files:
if self._is_recent_file(log_file, days):
self._parse_log_file(log_file, results)
# 生成统计信息
self._generate_statistics(results)
return results
def _is_recent_file(self, file_path: Path, days: int) -> bool:
"""
检查文件是否在指定天数内
"""
import time
file_time = file_path.stat().st_mtime
return (time.time() - file_time) / (24 * 3600) <= days
def _parse_log_file(self, log_file: Path, results: dict):
"""
解析单个日志文件
"""
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
for line_num, line in enumerate(f, 1):
results['total_lines'] += 1
# 尝试匹配各种日志级别
for level, pattern in self.patterns.items():
match = re.match(pattern, line.strip())
if match:
timestamp, component, message = match.groups()
# 添加到对应级别的列表
getattr(results, level + 's').append({
'timestamp': timestamp,
'component': component,
'message': message,
'line_number': line_num,
'file': str(log_file)
})
# 更新统计
results['stats'][f'{level}_count'] += 1
results['stats']['level_distribution'][level] += 1
# 提取小时信息
hour = timestamp.split()[1].split(':')[0]
results['stats']['hourly_activity'][hour] += 1
# 记录错误信息
if level == 'error':
results['stats']['most_common_errors'][message] += 1
break
def _generate_statistics(self, results: dict):
"""
生成统计信息
"""
stats = results['stats']
# 计算错误率
total_messages = sum(stats['level_distribution'].values())
if total_messages > 0:
stats['error_rate'] = stats['error_count'] / total_messages
stats['warning_rate'] = stats['warning_count'] / total_messages
# 获取最常见的错误
stats['top_errors'] = stats['most_common_errors'].most_common(10)
# 按小时分析活跃度
stats['peak_hours'] = sorted(
stats['hourly_activity'].items(),
key=lambda x: x[1],
reverse=True
)[:5]
def export_report(self, analysis_result: dict, output_file: str):
"""
导出分析报告
"""
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(analysis_result, f, indent=2, ensure_ascii=False)
# 使用示例
def analyze_example():
analyzer = ScrapydLogAnalyzer('./logs')
result = analyzer.analyze_logs('myproject', 'myspider', days=7)
print(f"Total lines analyzed: {result['total_lines']}")
print(f"Errors: {result['stats']['error_count']}")
print(f"Warnings: {result['stats']['warning_count']}")
print(f"Top errors: {result['stats']['top_errors'][:3]}")
if __name__ == "__main__":
analyze_example()#实时监控脚本
# real_time_monitor.py - 实时监控脚本
import time
import requests
from datetime import datetime
import json
from typing import Dict, Any
class RealTimeMonitor:
"""
实时监控器
"""
def __init__(self, scrapyd_url: str = "http://localhost:6800"):
self.scrapyd_url = scrapyd_url.rstrip('/')
self.last_status = {}
def monitor_loop(self, interval: int = 30):
"""
监控循环
"""
print(f"Starting real-time monitoring at {datetime.now()}")
print(f"Monitoring Scrapyd at: {self.scrapyd_url}")
while True:
try:
current_status = self.get_current_status()
self.analyze_status_change(current_status)
# 保存当前状态
self.last_status = current_status.copy()
# 等待下次检查
time.sleep(interval)
except KeyboardInterrupt:
print("\nMonitoring stopped by user.")
break
except Exception as e:
print(f"Error during monitoring: {e}")
time.sleep(interval)
def get_current_status(self) -> Dict[str, Any]:
"""
获取当前状态
"""
status = {}
try:
# 获取守护进程状态
response = requests.get(f"{self.scrapyd_url}/daemonstatus.json")
status['daemon'] = response.json()
except Exception as e:
status['daemon'] = {'error': str(e)}
try:
# 获取所有任务
response = requests.get(f"{self.scrapyd_url}/listjobs.json")
status['jobs'] = response.json()
except Exception as e:
status['jobs'] = {'error': str(e)}
return status
def analyze_status_change(self, current_status: Dict[str, Any]):
"""
分析状态变化
"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 检查守护进程状态
daemon_status = current_status.get('daemon', {})
if 'error' not in daemon_status:
running = daemon_status.get('running', 0)
print(f"[{timestamp}] Daemon: {running} running jobs")
# 检查任务状态变化
jobs_status = current_status.get('jobs', {})
if 'error' not in jobs_status:
pending = len(jobs_status.get('pending', []))
running = len(jobs_status.get('running', []))
finished = len(jobs_status.get('finished', []))
print(f" Jobs - Pending: {pending}, Running: {running}, Finished: {finished}")
# 检查是否有状态变化
if self.last_status:
self._check_job_changes(jobs_status)
def _check_job_changes(self, current_jobs: Dict[str, Any]):
"""
检查任务变化
"""
last_jobs = self.last_status.get('jobs', {})
# 检查新开始的任务
current_running_ids = {job['id'] for job in current_jobs.get('running', [])}
last_running_ids = {job['id'] for job in last_jobs.get('running', [])}
new_started = current_running_ids - last_running_ids
if new_started:
print(f" 🚀 New jobs started: {list(new_started)}")
# 检查完成的任务
current_finished_ids = {job['id'] for job in current_jobs.get('finished', [])}
last_finished_ids = {job['id'] for job in last_jobs.get('finished', [])}
new_finished = current_finished_ids - last_finished_ids
if new_finished:
print(f" ✅ Jobs finished: {list(new_finished)}")
# 使用示例
if __name__ == "__main__":
monitor = RealTimeMonitor("http://localhost:6800")
monitor.monitor_loop(interval=10) # 每10秒检查一次#生产环境部署
#Docker部署方案
# Dockerfile - Scrapyd生产环境镜像
FROM python:3.9-slim
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
libxml2-dev \
libxslt1-dev \
libffi-dev \
libssl-dev \
wget \
curl \
&& rm -rf /var/lib/apt/lists/*
# 设置工作目录
WORKDIR /app
# 安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 安装Scrapyd
RUN pip install scrapyd
# 创建配置目录
RUN mkdir -p /etc/scrapyd /var/log/scrapyd /var/lib/scrapyd
# 复制配置文件
COPY scrapyd.conf /etc/scrapyd/scrapyd.conf
# 创建用户
RUN groupadd -r scrapyd && useradd -r -g scrapyd scrapyd
RUN chown -R scrapyd:scrapyd /var/log/scrapyd /var/lib/scrapyd /etc/scrapyd
# 暴露端口
EXPOSE 6800
# 启动命令
CMD ["scrapyd", "-c", "/etc/scrapyd/scrapyd.conf"]# docker-compose.yml - 多服务部署
version: '3.8'
services:
scrapyd:
build: .
container_name: scrapyd-server
ports:
- "6800:6800"
volumes:
- ./logs:/var/log/scrapyd
- ./projects:/var/lib/scrapyd
environment:
- PYTHONPATH=/app
restart: unless-stopped
networks:
- scrapyd-net
scrapydweb:
image: my8100/scrapydweb:latest
container_name: scrapydweb-ui
ports:
- "5000:5000"
environment:
- SCRAPYDWEB_BIND=0.0.0.0
- SCRAPYDWEB_PORT=5000
- SCRAPYD_SERVERS=localhost:6800
- ENABLE_AUTHENTICATION=True
- USERNAME=admin
- PASSWORD=your_secure_password
volumes:
- ./data:/app/data
- ./logs:/app/logs
depends_on:
- scrapyd
restart: unless-stopped
networks:
- scrapyd-net
networks:
scrapyd-net:
driver: bridge#Nginx反向代理配置
# /etc/nginx/sites-available/scrapyd
upstream scrapyd_backend {
server localhost:6800;
keepalive 32;
}
server {
listen 80;
server_name scrapyd.yourdomain.com;
# 基本安全设置
add_header X-Frame-Options DENY;
add_header X-Content-Type-Options nosniff;
add_header X-XSS-Protection "1; mode=block";
# 限制请求大小
client_max_body_size 100M;
# 代理设置
location / {
proxy_pass http://scrapyd_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 连接和超时设置
proxy_connect_timeout 30s;
proxy_send_timeout 120s;
proxy_read_timeout 120s;
# 缓冲设置
proxy_buffering off;
proxy_request_buffering off;
}
# 健康检查
location /health {
access_log off;
return 200 "healthy\n";
add_header Content-Type text/plain;
}
}
# /etc/nginx/sites-available/scrapydweb
upstream scrapydweb_backend {
server localhost:5000;
keepalive 32;
}
server {
listen 80;
server_name scrapydweb.yourdomain.com;
# SSL重定向(如果使用HTTPS)
# return 301 https://$server_name$request_uri;
# 基本安全设置
add_header X-Frame-Options DENY;
add_header X-Content-Type-Options nosniff;
add_header X-XSS-Protection "1; mode=block";
# 限制请求大小
client_max_body_size 10M;
# 代理设置
location / {
proxy_pass http://scrapydweb_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 连接和超时设置
proxy_connect_timeout 30s;
proxy_send_timeout 300s;
proxy_read_timeout 300s;
# 缓冲设置
proxy_buffering off;
proxy_request_buffering off;
}
# 静态文件缓存
location ~* \.(css|js|png|jpg|jpeg|gif|ico|svg)$ {
proxy_pass http://scrapydweb_backend;
expires 1y;
add_header Cache-Control "public, immutable";
}
}#系统服务配置
# /etc/systemd/system/scrapyd.service - Scrapyd系统服务
[Unit]
Description=Scrapyd Service
After=network.target
[Service]
Type=simple
User=scrapyd
Group=scrapyd
WorkingDirectory=/var/lib/scrapyd
ExecStart=/usr/local/bin/scrapyd -c /etc/scrapyd/scrapyd.conf
Restart=always
RestartSec=10
# 环境变量
Environment=PYTHONPATH=/var/lib/scrapyd
Environment=SCRAPY_SETTINGS_MODULE=myproject.settings
# 资源限制
LimitNOFILE=65536
LimitNPROC=32768
[Install]
WantedBy=multi-user.target# /etc/systemd/system/scrapydweb.service - ScrapydWeb系统服务
[Unit]
Description=ScrapydWeb Service
After=network.target
[Service]
Type=simple
User=www-data
Group=www-data
WorkingDirectory=/opt/scrapydweb
ExecStart=/usr/local/bin/scrapydweb -c /opt/scrapydweb/config.py
Restart=always
RestartSec=10
# 环境变量
Environment=FLASK_APP=scrapydweb
Environment=FLASK_ENV=production
# 资源限制
LimitNOFILE=65536
LimitNPROC=32768
[Install]
WantedBy=multi-user.target#性能优化与调优
#性能监控指标
# performance_monitor.py - 性能监控
import psutil
import time
import threading
from collections import deque
from typing import Dict, List
class ScrapydPerformanceMonitor:
"""
Scrapyd性能监控器
"""
def __init__(self):
self.metrics = {
'cpu_percent': deque(maxlen=100),
'memory_percent': deque(maxlen=100),
'disk_io': deque(maxlen=100),
'network_io': deque(maxlen=100),
'process_count': deque(maxlen=100),
'thread_count': deque(maxlen=100),
}
self.running = False
self.monitor_thread = None
def start_monitoring(self):
"""
开始监控
"""
self.running = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def stop_monitoring(self):
"""
停止监控
"""
self.running = False
if self.monitor_thread:
self.monitor_thread.join()
def _monitor_loop(self):
"""
监控循环
"""
process = psutil.Process()
while self.running:
try:
# CPU使用率
cpu_percent = process.cpu_percent()
self.metrics['cpu_percent'].append(cpu_percent)
# 内存使用率
memory_info = process.memory_info()
memory_percent = process.memory_percent()
self.metrics['memory_percent'].append(memory_percent)
# 磁盘IO
disk_io = psutil.disk_io_counters()
if disk_io:
self.metrics['disk_io'].append({
'read_bytes': disk_io.read_bytes,
'write_bytes': disk_io.write_bytes
})
# 网络IO
net_io = psutil.net_io_counters()
if net_io:
self.metrics['network_io'].append({
'bytes_sent': net_io.bytes_sent,
'bytes_recv': net_io.bytes_recv
})
# 进程和线程数
num_threads = process.num_threads()
self.metrics['thread_count'].append(num_threads)
# 等待下次采样
time.sleep(1)
except Exception as e:
print(f"Error in performance monitoring: {e}")
time.sleep(1)
def get_current_metrics(self) -> Dict:
"""
获取当前指标
"""
if not self.metrics['cpu_percent']:
return {}
return {
'cpu_percent': self.metrics['cpu_percent'][-1],
'memory_percent': self.metrics['memory_percent'][-1],
'avg_cpu_percent': sum(self.metrics['cpu_percent']) / len(self.metrics['cpu_percent']),
'avg_memory_percent': sum(self.metrics['memory_percent']) / len(self.metrics['memory_percent']),
'current_threads': self.metrics['thread_count'][-1] if self.metrics['thread_count'] else 0,
'peak_cpu': max(self.metrics['cpu_percent']) if self.metrics['cpu_percent'] else 0,
'peak_memory': max(self.metrics['memory_percent']) if self.metrics['memory_percent'] else 0,
}
def get_historical_metrics(self, minutes: int = 10) -> Dict:
"""
获取历史指标
"""
samples = minutes * 60 # 假设每秒采样一次
result = {}
for key, values in self.metrics.items():
recent_values = list(values)[-samples:] if len(values) >= samples else list(values)
if recent_values:
result[key] = {
'current': recent_values[-1],
'average': sum(recent_values) / len(recent_values),
'min': min(recent_values),
'max': max(recent_values),
'trend': self._calculate_trend(recent_values)
}
return result
def _calculate_trend(self, values: List[float]) -> str:
"""
计算趋势
"""
if len(values) < 2:
return 'stable'
recent_avg = sum(values[-5:]) / min(5, len(values))
older_avg = sum(values[:5]) / min(5, len(values))
if recent_avg > older_avg * 1.2:
return 'increasing'
elif recent_avg < older_avg * 0.8:
return 'decreasing'
else:
return 'stable'
# 使用示例
def performance_example():
monitor = ScrapydPerformanceMonitor()
monitor.start_monitoring()
try:
time.sleep(10) # 监控10秒
current_metrics = monitor.get_current_metrics()
print("Current Metrics:", current_metrics)
historical_metrics = monitor.get_historical_metrics(minutes=1)
print("Historical Metrics:", historical_metrics)
finally:
monitor.stop_monitoring()#配置优化建议
# optimized_scrapyd.conf - 优化的Scrapyd配置
[scrapyd]
# 网络配置
bind_address = 0.0.0.0
port = 6800
# 性能优化
max_proc = 8 # 根据CPU核心数调整
max_proc_per_host = 2
poll_interval = 2 # 更频繁的检查
status_update_interval = 5
# 日志优化
logs_to_keep = 14 # 保留2周日志
keep_jobs = 10 # 保留10个任务记录
debug = off
# 资源管理
eggs_dir = /var/lib/scrapyd/eggs
logs_dir = /var/log/scrapyd
dbs_dir = /var/lib/scrapyd/dbs
items_dir = /var/lib/scrapyd/items
# 内存优化
# 限制单个爬虫的内存使用
# 通过爬虫设置进行控制
[myproject]
# 项目特定优化
max_proc = 4
max_proc_per_host = 1
# 为不同类型的爬虫设置不同的限制
[crawling_intensive_project]
max_proc = 2 # 高负载项目限制并发
max_proc_per_host = 1
[data_heavy_project]
max_proc = 3 # 数据密集型项目
logs_to_keep = 30 # 保留更多日志用于调试#故障排查与维护
#常见问题诊断
# troubleshooter.py - 故障诊断工具
import os
import psutil
import requests
from pathlib import Path
from typing import Dict, List
class ScrapydTroubleshooter:
"""
Scrapyd故障诊断工具
"""
def __init__(self, scrapyd_url: str = "http://localhost:6800"):
self.scrapyd_url = scrapyd_url.rstrip('/')
def diagnose_system(self) -> Dict:
"""
系统诊断
"""
diagnosis = {
'system_health': {},
'scrapyd_status': {},
'resource_usage': {},
'recommendations': []
}
# 系统健康检查
diagnosis['system_health'] = self._check_system_health()
# Scrapyd状态检查
diagnosis['scrapyd_status'] = self._check_scrapyd_status()
# 资源使用检查
diagnosis['resource_usage'] = self._check_resource_usage()
# 生成建议
diagnosis['recommendations'] = self._generate_recommendations(diagnosis)
return diagnosis
def _check_system_health(self) -> Dict:
"""
检查系统健康状况
"""
health = {
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent,
'load_average': os.getloadavg() if hasattr(os, 'getloadavg') else None,
'processes': len(psutil.pids()),
'uptime': time.time() - psutil.boot_time()
}
# 检查关键指标
issues = []
if health['cpu_percent'] > 80:
issues.append(f"High CPU usage: {health['cpu_percent']}%")
if health['memory_percent'] > 85:
issues.append(f"High memory usage: {health['memory_percent']}%")
if health['disk_percent'] > 90:
issues.append(f"High disk usage: {health['disk_percent']}%")
health['issues'] = issues
health['overall_status'] = 'WARNING' if issues else 'OK'
return health
def _check_scrapyd_status(self) -> Dict:
"""
检查Scrapyd状态
"""
status = {}
try:
# 测试连接
response = requests.get(f"{self.scrapyd_url}/daemonstatus.json", timeout=10)
if response.status_code == 200:
status['connected'] = True
status['response_time'] = response.elapsed.total_seconds()
status['daemon_info'] = response.json()
else:
status['connected'] = False
status['error'] = f"HTTP {response.status_code}"
except requests.exceptions.RequestException as e:
status['connected'] = False
status['error'] = str(e)
return status
def _check_resource_usage(self) -> Dict:
"""
检查资源使用情况
"""
usage = {
'logs_size': self._get_directory_size('/var/log/scrapyd'),
'eggs_size': self._get_directory_size('/var/lib/scrapyd/eggs'),
'dbs_size': self._get_directory_size('/var/lib/scrapyd/dbs'),
'processes': self._get_scrapyd_processes()
}
return usage
def _get_directory_size(self, path: str) -> int:
"""
获取目录大小
"""
try:
path = Path(path)
if not path.exists():
return 0
total_size = 0
for dirpath, dirnames, filenames in os.walk(path):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
try:
total_size += os.path.getsize(filepath)
except OSError:
continue
return total_size
except Exception:
return 0
def _get_scrapyd_processes(self) -> List[Dict]:
"""
获取Scrapyd相关进程
"""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
if 'scrapy' in proc.info['name'].lower() or 'scrapyd' in proc.info['name'].lower():
processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return processes
def _generate_recommendations(self, diagnosis: Dict) -> List[str]:
"""
生成建议
"""
recommendations = []
# 系统资源建议
if diagnosis['system_health']['cpu_percent'] > 80:
recommendations.append("Consider reducing concurrent crawlers or upgrading hardware")
if diagnosis['system_health']['memory_percent'] > 85:
recommendations.append("Increase system memory or reduce crawler memory limits")
if diagnosis['system_health']['disk_percent'] > 90:
recommendations.append("Clean up old logs and data files")
# Scrapyd配置建议
if not diagnosis['scrapyd_status']['connected']:
recommendations.append("Check Scrapyd service status and network connectivity")
# 资源使用建议
if diagnosis['resource_usage']['logs_size'] > 10 * 1024 * 1024 * 1024: # 10GB
recommendations.append("Logs directory is large, consider cleaning old logs")
if not recommendations:
recommendations.append("System appears to be running normally")
return recommendations
# 使用示例
def troubleshoot_example():
troubleshooter = ScrapydTroubleshooter()
diagnosis = troubleshooter.diagnose_system()
print("System Health:", diagnosis['system_health'])
print("Scrapyd Status:", diagnosis['scrapyd_status'])
print("Resource Usage:", diagnosis['resource_usage'])
print("Recommendations:", diagnosis['recommendations'])
if __name__ == "__main__":
troubleshoot_example()#自动化维护脚本
#!/bin/bash
# maintenance.sh - 自动化维护脚本
# 配置变量
LOG_DIR="/var/log/scrapyd"
EGGS_DIR="/var/lib/scrapyd/eggs"
BACKUP_DIR="/backup/scrapyd"
RETENTION_DAYS=30
# 日志轮转
rotate_logs() {
echo "Rotating logs..."
# 压缩旧日志
find $LOG_DIR -name "*.log" -mtime +7 -exec gzip {} \;
# 删除超过30天的压缩日志
find $LOG_DIR -name "*.log.gz" -mtime +$RETENTION_DAYS -delete
echo "Log rotation completed."
}
# 清理旧版本
cleanup_old_versions() {
echo "Cleaning up old versions..."
# 获取所有项目
PROJECTS=$(curl -s "http://localhost:6800/listprojects.json" | python -c "import sys, json; [print(p) for p in json.load(sys.stdin).get('projects', [])]")
for project in $PROJECTS; do
# 获取项目版本
VERSIONS=$(curl -s "http://localhost:6800/listversions.json?project=$project" | python -c "import sys, json; [print(v) for v in json.load(sys.stdin).get('versions', [])]")
# 保留最新的3个版本,删除其余的
OLD_VERSIONS=$(echo "$VERSIONS" | tail -n +4)
for version in $OLD_VERSIONS; do
echo "Deleting old version $project:$version"
curl -X POST "http://localhost:6800/delversion.json" -d "project=$project&version=$version"
done
done
echo "Version cleanup completed."
}
# 备份配置
backup_config() {
echo "Backing up configuration..."
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="$BACKUP_DIR/scrapyd_backup_$TIMESTAMP.tar.gz"
mkdir -p $BACKUP_DIR
tar -czf $BACKUP_FILE \
/etc/scrapyd/scrapyd.conf \
/var/lib/scrapyd/eggs \
/var/lib/scrapyd/dbs \
--exclude="*.tmp" \
--exclude="*.log"
echo "Backup created: $BACKUP_FILE"
# 删除7天前的备份
find $BACKUP_DIR -name "scrapyd_backup_*.tar.gz" -mtime +7 -delete
}
# 检查服务状态
check_service_status() {
echo "Checking service status..."
# 检查Scrapyd进程
if pgrep -f scrapyd > /dev/null; then
echo "✓ Scrapyd is running"
else
echo "✗ Scrapyd is not running"
# 尝试重启
systemctl restart scrapyd
sleep 5
if pgrep -f scrapyd > /dev/null; then
echo "✓ Scrapyd restarted successfully"
else
echo "✗ Failed to restart Scrapyd"
fi
fi
# 检查ScrapydWeb进程
if pgrep -f scrapydweb > /dev/null; then
echo "✓ ScrapydWeb is running"
else
echo "✗ ScrapydWeb is not running"
systemctl restart scrapydweb
sleep 5
if pgrep -f scrapydweb > /dev/null; then
echo "✓ ScrapydWeb restarted successfully"
else
echo "✗ Failed to restart ScrapydWeb"
fi
fi
}
# 主维护函数
main() {
echo "Starting maintenance tasks at $(date)"
check_service_status
rotate_logs
cleanup_old_versions
backup_config
echo "Maintenance tasks completed at $(date)"
}
# 执行主函数
main#最佳实践总结
#部署最佳实践
-
环境隔离:
# 使用虚拟环境 python -m venv scrapyd_env source scrapyd_env/bin/activate pip install scrapyd scrapyd-client -
配置管理:
# 使用不同的配置文件管理不同环境 [deploy:development] url = http://dev-server:6800/ project = myproject [deploy:staging] url = http://staging-server:6800/ project = myproject [deploy:production] url = http://prod-server:6800/ project = myproject -
安全性:
- 使用防火墙限制访问
- 配置认证和授权
- 定期更新依赖包
#监控最佳实践
-
关键指标监控:
- 运行中的爬虫数量
- 任务成功率
- 错误率趋势
- 资源使用情况
-
告警设置:
- 爬虫长时间无响应
- 错误率超过阈值
- 资源使用过高
-
日志管理:
- 结构化日志格式
- 日志轮转策略
- 集中日志分析
#性能优化最佳实践
-
资源配置:
# 根据硬件资源合理配置 max_proc = [CPU核心数] max_proc_per_host = 2-4 -
任务调度:
- 避免同时启动大量爬虫
- 使用队列管理系统
- 实现优雅的任务取消
-
资源回收:
- 定期清理旧版本
- 及时释放不再使用的资源
- 实现内存泄漏检测
💡 核心要点: Scrapyd和ScrapydWeb是构建生产级爬虫运维体系的重要工具。通过合理的配置、完善的监控和规范的运维流程,可以实现爬虫的自动化部署和稳定运行。
#SEO优化策略
- 关键词优化: 在标题、内容中合理布局"Scrapyd", "ScrapydWeb", "爬虫部署", "爬虫监控", "运维管理", "自动化部署"等关键词
- 内容结构: 使用清晰的标题层级(H1-H6),便于搜索引擎理解内容结构
- 内部链接: 建立与其他相关教程的内部链接,提升页面权重
- 元数据优化: 在页面头部包含描述性的标题、描述和标签
🔗 相关教程推荐
- Scrapy-Redis分布式架构 - 分布式爬虫架构
- Docker容器化爬虫 - 容器化部署方案
- 抓取监控看板 - 监控系统建设
🏷️ 标签云: Scrapyd ScrapydWeb 爬虫部署 爬虫监控 运维管理 自动化部署 分布式爬虫 系统运维 性能优化 故障排查

