Scrapyd与ScrapydWeb - 分布式爬虫部署与监控平台详解

📂 所属阶段:第六阶段 — 运维与监控(工程化篇)
🔗 相关章节:Scrapy-Redis分布式架构 · Docker容器化爬虫 · 抓取监控看板

目录

Scrapyd与ScrapydWeb概述

Scrapyd和ScrapydWeb是Scrapy生态系统中重要的部署和监控工具,它们共同构成了完整的爬虫运维解决方案。

Scrapyd简介

Scrapyd是一个守护进程,用于运行Scrapy爬虫服务。它提供了HTTP API,允许你部署爬虫项目并控制它们的执行。

核心特性

  • HTTP API接口
  • 多项目支持
  • 并发爬虫管理
  • 日志管理
  • 进程监控

ScrapydWeb简介

ScrapydWeb是Scrapyd的Web UI界面,提供直观的图形化管理界面,方便运维人员进行爬虫管理。

核心特性

  • 可视化管理界面
  • 多节点集群管理
  • 实时日志查看
  • 统计数据展示
  • 任务调度功能

Scrapyd安装与配置

安装Scrapyd

# 安装Scrapyd
pip install scrapyd

# 安装Scrapyd客户端工具
pip install scrapyd-client

配置文件详解

# scrapyd.conf - Scrapyd配置文件
[scrapyd]
# 监听地址,默认为0.0.0.0(所有接口)
bind_address = 0.0.0.0
# 监听端口
port = 6800
# 最大并发数
max_proc = 0  # 0表示CPU核心数
# 每个主机的最大并发数
max_proc_per_host = 0
# 项目轮询间隔(秒)
poll_interval = 5
# 状态刷新间隔
status_update_interval = 10
# 日志保留天数
logs_to_keep = 30
# 每个项目保留的作业数
keep_jobs = 5
# 项目目录
eggs_dir = eggs
# 日志目录
logs_dir = logs
# 数据库文件
dbs_dir = dbs
# 项目配置目录
items_dir = items
# 是否启用JSON-RPC接口
jsonrpc_port = 6800
# 环境变量
debug = off
jobs_to_keep = 5
finished_to_keep = 100

# 示例项目配置
[myproject]
# 项目特定的并发数
max_proc = 4
# 每个主机的并发数
max_proc_per_host = 2

启动Scrapyd服务

# 方式1:直接启动
scrapyd

# 方式2:指定配置文件启动
scrapyd -c /path/to/scrapyd.conf

# 方式3:后台启动(Linux)
scrapyd &

服务管理脚本

#!/bin/bash
# scrapyd-service.sh - Scrapyd服务管理脚本

SCRAPYD_PID_FILE="/var/run/scrapyd.pid"
SCRAPYD_LOG_FILE="/var/log/scrapyd.log"

start() {
    echo "Starting Scrapyd..."
    if [ -f "$SCRA PID_FILE" ]; then
        echo "Scrapyd is already running."
        exit 1
    fi
    
    nohup scrapyd -c /etc/scrapyd/scrapyd.conf > "$SCRA PID_LOG_FILE" 2>&1 &
    echo $! > "$SCRA PID_FILE"
    echo "Scrapyd started with PID $(cat $SCRA PID_FILE)"
}

stop() {
    echo "Stopping Scrapyd..."
    if [ ! -f "$SCRA PID_FILE" ]; then
        echo "Scrapyd is not running."
        exit 1
    fi
    
    PID=$(cat "$SCRA PID_FILE")
    kill $PID
    rm -f "$SCRA PID_FILE"
    echo "Scrapyd stopped."
}

restart() {
    stop
    sleep 2
    start
}

case "$1" in
    start)
        start
        ;;
    stop)
        stop
        ;;
    restart)
        restart
        ;;
    *)
        echo "Usage: $0 {start|stop|restart}"
        exit 1
        ;;
esac

ScrapydWeb安装与配置

安装ScrapydWeb

# 安装ScrapydWeb
pip install scrapydweb

# 或者从源码安装
git clone https://github.com/my8100/scrapydweb.git
cd scrapydweb
pip install -r requirements.txt

配置ScrapydWeb

# config.py - ScrapydWeb配置文件
import os

# Flask配置
SECRET_KEY = 'your-secret-key-here'
PERMANENT_SESSION_LIFETIME = 3600  # 1小时

# ScrapydWeb配置
SCRAPYDWEB_BIND = '0.0.0.0'
SCRAPYDWEB_PORT = 5000
ENABLE_AUTHENTICATION = True
USERNAME = 'admin'
PASSWORD = 'your-password-here'

# Scrapyd节点配置
SCRAPYD_SERVERS = [
    'localhost:6800',  # 本地节点
    '192.168.1.100:6800',  # 远程节点1
    '192.168.1.101:6800',  # 远程节点2
]

# 日志配置
LOG_LEVEL = 'INFO'
ENABLE_LOGPARSER = True
ENABLE_EMAIL_ALERT = True

# 数据库配置
DATABASE_PATH = os.path.join(os.getcwd(), 'data.db')

# 其他配置
REFRESH_PAGES_INTERVAL = 30
SHOW_SCRAPYD_ITEMS = True
CUSTOMIZED_BULKSELECT_TAGS = ['production', 'staging', 'development']

启动ScrapydWeb服务

# 方式1:直接启动
scrapydweb

# 方式2:指定配置文件启动
scrapydweb -c /path/to/config.py

# 方式3:后台启动
nohup scrapydweb > scrapydweb.log 2>&1 &

项目部署与管理

项目打包配置

# scrapy.cfg - Scrapy项目配置
[settings]
default = myproject.settings

[deploy]
# 目标Scrapyd服务器
url = http://localhost:6800/
# 项目名称
project = myproject

[deploy:production]
url = http://prod-server:6800/
project = myproject
username = admin
password = secret

[deploy:staging]
url = http://staging-server:6800/
project = myproject
username = admin
password = secret

部署脚本

#!/bin/bash
# deploy.sh - 自动化部署脚本

PROJECT_NAME="myproject"
DEPLOY_ENV="${1:-default}"  # 默认环境

echo "Deploying project: $PROJECT_NAME to environment: $DEPLOY_ENV"

# 检查项目配置
if [ ! -f "scrapy.cfg" ]; then
    echo "Error: scrapy.cfg not found!"
    exit 1
fi

# 打包并部署
echo "Packaging and deploying..."
scrapyd-deploy $DEPLOY_ENV -p $PROJECT_NAME

if [ $? -eq 0 ]; then
    echo "Deployment successful!"
    
    # 获取最新的版本号
    VERSION=$(curl -s "http://localhost:6800/listversions.json?project=$PROJECT_NAME" | python -c "import sys, json; print(json.load(sys.stdin)['versions'][-1])" 2>/dev/null)
    echo "Deployed version: $VERSION"
else
    echo "Deployment failed!"
    exit 1
fi

部署管理Python脚本

# deployment_manager.py - 部署管理脚本
import requests
import zipfile
import os
import subprocess
from pathlib import Path

class ScrapydDeploymentManager:
    """
    Scrapyd部署管理器
    """
    
    def __init__(self, scrapyd_url: str = "http://localhost:6800"):
        self.scrapyd_url = scrapyd_url.rstrip('/')
        self.session = requests.Session()
    
    def deploy_project(self, project_path: str, project_name: str, version: str = None) -> dict:
        """
        部署项目到Scrapyd
        
        Args:
            project_path: 项目路径
            project_name: 项目名称
            version: 版本号(可选)
        
        Returns:
            部署结果
        """
        if not version:
            import time
            version = f"v{int(time.time())}"
        
        # 创建egg文件
        egg_path = self._create_egg(project_path)
        
        # 上传部署
        with open(egg_path, 'rb') as f:
            files = {'egg': f}
            data = {
                'project': project_name,
                'version': version
            }
            
            response = self.session.post(
                f"{self.scrapyd_url}/addversion.json",
                files=files,
                data=data
            )
        
        # 清理临时文件
        os.remove(egg_path)
        
        if response.status_code == 200:
            result = response.json()
            print(f"Project deployed successfully: {project_name} v{version}")
            return result
        else:
            raise Exception(f"Deployment failed: {response.text}")
    
    def _create_egg(self, project_path: str) -> str:
        """
        创建egg文件
        """
        project_path = Path(project_path)
        old_cwd = os.getcwd()
        
        try:
            os.chdir(project_path)
            
            # 运行setup.py创建egg
            result = subprocess.run([
                'python', 'setup.py', 'bdist_egg'
            ], capture_output=True, text=True)
            
            if result.returncode != 0:
                raise Exception(f"Failed to create egg: {result.stderr}")
            
            # 获取生成的egg文件
            dist_dir = project_path / 'dist'
            egg_files = list(dist_dir.glob('*.egg'))
            
            if not egg_files:
                raise Exception("No egg file found")
            
            # 返回第一个egg文件的路径
            return str(egg_files[0])
            
        finally:
            os.chdir(old_cwd)
    
    def list_projects(self) -> dict:
        """
        列出所有项目
        """
        response = self.session.get(f"{self.scrapyd_url}/listprojects.json")
        return response.json()
    
    def list_versions(self, project_name: str) -> dict:
        """
        列出项目版本
        """
        response = self.session.get(
            f"{self.scrapyd_url}/listversions.json",
            params={'project': project_name}
        )
        return response.json()
    
    def delete_version(self, project_name: str, version: str) -> dict:
        """
        删除项目版本
        """
        response = self.session.post(
            f"{self.scrapyd_url}/delversion.json",
            data={
                'project': project_name,
                'version': version
            }
        )
        return response.json()
    
    def schedule_spider(self, project_name: str, spider_name: str, **kwargs) -> dict:
        """
        调度爬虫任务
        
        Args:
            project_name: 项目名称
            spider_name: 爬虫名称
            **kwargs: 爬虫参数
        
        Returns:
            任务ID
        """
        data = {
            'project': project_name,
            'spider': spider_name
        }
        
        # 添加爬虫参数
        for key, value in kwargs.items():
            data[f'arg_{key}'] = value
        
        response = self.session.post(
            f"{self.scrapyd_url}/schedule.json",
            data=data
        )
        
        if response.status_code == 200:
            result = response.json()
            print(f"Spider scheduled: {spider_name}, Job ID: {result.get('jobid')}")
            return result
        else:
            raise Exception(f"Scheduling failed: {response.text}")
    
    def cancel_job(self, project_name: str, job_id: str) -> dict:
        """
        取消任务
        """
        response = self.session.post(
            f"{self.scrapyd_url}/cancel.json",
            data={
                'project': project_name,
                'job': job_id
            }
        )
        return response.json()

# 使用示例
if __name__ == "__main__":
    manager = ScrapydDeploymentManager()
    
    # 部署项目
    # manager.deploy_project("./myproject", "myproject")
    
    # 调度爬虫
    # manager.schedule_spider("myproject", "myspider", start_url="https://example.com")

远程控制与API接口

HTTP API详解

# scrapyd_api_client.py - Scrapyd API客户端
import requests
import json
from typing import Dict, List, Optional

class ScrapydAPIClient:
    """
    Scrapyd API客户端
    """
    
    def __init__(self, base_url: str = "http://localhost:6800"):
        self.base_url = base_url.rstrip('/')
        self.session = requests.Session()
    
    def get_overview(self) -> dict:
        """
        获取概览信息
        """
        response = self.session.get(f"{self.base_url}/daemonstatus.json")
        return response.json()
    
    def list_projects(self) -> dict:
        """
        列出所有项目
        """
        response = self.session.get(f"{self.base_url}/listprojects.json")
        return response.json()
    
    def list_versions(self, project: str) -> dict:
        """
        列出项目版本
        """
        response = self.session.get(
            f"{self.base_url}/listversions.json",
            params={'project': project}
        )
        return response.json()
    
    def list_spiders(self, project: str, version: str = None) -> dict:
        """
        列出爬虫
        """
        params = {'project': project}
        if version:
            params['version'] = version
        
        response = self.session.get(
            f"{self.base_url}/listspiders.json",
            params=params
        )
        return response.json()
    
    def schedule_spider(self, project: str, spider: str, **kwargs) -> dict:
        """
        调度爬虫
        """
        data = {
            'project': project,
            'spider': spider
        }
        
        for key, value in kwargs.items():
            data[f'arg_{key}'] = value
        
        response = self.session.post(
            f"{self.base_url}/schedule.json",
            data=data
        )
        return response.json()
    
    def cancel_job(self, project: str, job_id: str) -> dict:
        """
        取消任务
        """
        response = self.session.post(
            f"{self.base_url}/cancel.json",
            data={
                'project': project,
                'job': job_id
            }
        )
        return response.json()
    
    def list_jobs(self, project: str = None) -> dict:
        """
        列出任务
        """
        params = {}
        if project:
            params['project'] = project
        
        response = self.session.get(
            f"{self.base_url}/listjobs.json",
            params=params
        )
        return response.json()
    
    def get_log(self, project: str, spider: str, job_id: str) -> str:
        """
        获取日志
        """
        response = self.session.get(
            f"{self.base_url}/logs/{project}/{spider}/{job_id}.log"
        )
        return response.text
    
    def get_items(self, project: str, spider: str, job_id: str) -> str:
        """
        获取抓取的项目
        """
        response = self.session.get(
            f"{self.base_url}/items/{project}/{spider}/{job_id}.jl"
        )
        return response.text

# API使用示例
def example_usage():
    client = ScrapydAPIClient("http://localhost:6800")
    
    # 获取概览
    overview = client.get_overview()
    print("Daemon Status:", overview)
    
    # 列出项目
    projects = client.list_projects()
    print("Projects:", projects)
    
    if projects.get('projects'):
        project_name = projects['projects'][0]
        
        # 列出爬虫
        spiders = client.list_spiders(project_name)
        print("Spiders:", spiders)
        
        if spiders.get('spiders'):
            spider_name = spiders['spiders'][0]
            
            # 调度爬虫
            result = client.schedule_spider(
                project_name, 
                spider_name, 
                start_url="https://example.com"
            )
            print("Scheduled:", result)

if __name__ == "__main__":
    example_usage()

REST API封装

# scrapyd_rest_api.py - REST API封装
from flask import Flask, request, jsonify
import requests

app = Flask(__name__)

class ScrapydRESTAPI:
    """
    Scrapyd REST API封装
    """
    
    def __init__(self, scrapyd_url: str = "http://localhost:6800"):
        self.scrapyd_url = scrapyd_url.rstrip('/')
    
    def register_routes(self, app: Flask):
        """
        注册路由
        """
        @app.route('/api/projects', methods=['GET'])
        def get_projects():
            response = requests.get(f"{self.scrapyd_url}/listprojects.json")
            return jsonify(response.json())
        
        @app.route('/api/projects/<project>/spiders', methods=['GET'])
        def get_spiders(project):
            params = {'project': project}
            response = requests.get(
                f"{self.scrapyd_url}/listspiders.json",
                params=params
            )
            return jsonify(response.json())
        
        @app.route('/api/projects/<project>/schedule', methods=['POST'])
        def schedule_spider(project):
            data = request.get_json()
            spider = data.get('spider')
            args = data.get('args', {})
            
            post_data = {
                'project': project,
                'spider': spider
            }
            
            for key, value in args.items():
                post_data[f'arg_{key}'] = value
            
            response = requests.post(
                f"{self.scrapyd_url}/schedule.json",
                data=post_data
            )
            return jsonify(response.json())
        
        @app.route('/api/projects/<project>/jobs', methods=['GET'])
        def get_jobs(project):
            params = {'project': project}
            response = requests.get(
                f"{self.scrapyd_url}/listjobs.json",
                params=params
            )
            return jsonify(response.json())

# 使用示例
if __name__ == "__main__":
    rest_api = ScrapydRESTAPI()
    rest_api.register_routes(app)
    app.run(host='0.0.0.0', port=5001, debug=True)

日志管理与监控

日志配置优化

# logging.conf - 日志配置文件
[loggers]
keys=root,scrapy,scrapyd

[handlers]
keys=consoleHandler,fileHandler,errorHandler

[formatters]
keys=simpleFormatter,detailedFormatter

[logger_root]
level=INFO
handlers=consoleHandler,fileHandler

[logger_scrapy]
level=DEBUG
handlers=consoleHandler,fileHandler
qualname=scrapy
propagate=0

[logger_scrapyd]
level=INFO
handlers=consoleHandler,fileHandler
qualname=scrapyd
propagate=0

[handler_consoleHandler]
class=StreamHandler
level=INFO
formatter=simpleFormatter
args=(sys.stdout,)

[handler_fileHandler]
class=FileHandler
level=DEBUG
formatter=detailedFormatter
args=('scrapyd.log',)

[handler_errorHandler]
class=FileHandler
level=ERROR
formatter=detailedFormatter
args=('scrapyd_error.log',)

[formatter_simpleFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s

[formatter_detailedFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s

日志分析工具

# log_analyzer.py - 日志分析工具
import re
import json
from datetime import datetime
from collections import defaultdict, Counter
from pathlib import Path

class ScrapydLogAnalyzer:
    """
    Scrapyd日志分析器
    """
    
    def __init__(self, log_directory: str):
        self.log_directory = Path(log_directory)
        self.patterns = {
            'info': r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(.*?)\] INFO: (.*)',
            'error': r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(.*?)\] ERROR: (.*)',
            'warning': r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(.*?)\] WARNING: (.*)',
            'critical': r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(.*?)\] CRITICAL: (.*)',
        }
    
    def analyze_logs(self, project_name: str, spider_name: str, days: int = 7) -> dict:
        """
        分析日志
        
        Args:
            project_name: 项目名称
            spider_name: 爬虫名称
            days: 分析天数
        
        Returns:
            分析结果
        """
        log_dir = self.log_directory / project_name / spider_name
        if not log_dir.exists():
            return {"error": f"Log directory not found: {log_dir}"}
        
        results = {
            'total_lines': 0,
            'errors': [],
            'warnings': [],
            'infos': [],
            'criticals': [],
            'stats': {
                'error_count': 0,
                'warning_count': 0,
                'info_count': 0,
                'critical_count': 0,
                'level_distribution': Counter(),
                'hourly_activity': Counter(),
                'most_common_errors': Counter(),
                'spider_performance': {}
            }
        }
        
        # 读取最近几天的日志文件
        log_files = list(log_dir.glob('*.log'))
        for log_file in log_files:
            if self._is_recent_file(log_file, days):
                self._parse_log_file(log_file, results)
        
        # 生成统计信息
        self._generate_statistics(results)
        
        return results
    
    def _is_recent_file(self, file_path: Path, days: int) -> bool:
        """
        检查文件是否在指定天数内
        """
        import time
        file_time = file_path.stat().st_mtime
        return (time.time() - file_time) / (24 * 3600) <= days
    
    def _parse_log_file(self, log_file: Path, results: dict):
        """
        解析单个日志文件
        """
        with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
            for line_num, line in enumerate(f, 1):
                results['total_lines'] += 1
                
                # 尝试匹配各种日志级别
                for level, pattern in self.patterns.items():
                    match = re.match(pattern, line.strip())
                    if match:
                        timestamp, component, message = match.groups()
                        
                        # 添加到对应级别的列表
                        getattr(results, level + 's').append({
                            'timestamp': timestamp,
                            'component': component,
                            'message': message,
                            'line_number': line_num,
                            'file': str(log_file)
                        })
                        
                        # 更新统计
                        results['stats'][f'{level}_count'] += 1
                        results['stats']['level_distribution'][level] += 1
                        
                        # 提取小时信息
                        hour = timestamp.split()[1].split(':')[0]
                        results['stats']['hourly_activity'][hour] += 1
                        
                        # 记录错误信息
                        if level == 'error':
                            results['stats']['most_common_errors'][message] += 1
                        
                        break
    
    def _generate_statistics(self, results: dict):
        """
        生成统计信息
        """
        stats = results['stats']
        
        # 计算错误率
        total_messages = sum(stats['level_distribution'].values())
        if total_messages > 0:
            stats['error_rate'] = stats['error_count'] / total_messages
            stats['warning_rate'] = stats['warning_count'] / total_messages
        
        # 获取最常见的错误
        stats['top_errors'] = stats['most_common_errors'].most_common(10)
        
        # 按小时分析活跃度
        stats['peak_hours'] = sorted(
            stats['hourly_activity'].items(), 
            key=lambda x: x[1], 
            reverse=True
        )[:5]
    
    def export_report(self, analysis_result: dict, output_file: str):
        """
        导出分析报告
        """
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(analysis_result, f, indent=2, ensure_ascii=False)

# 使用示例
def analyze_example():
    analyzer = ScrapydLogAnalyzer('./logs')
    result = analyzer.analyze_logs('myproject', 'myspider', days=7)
    
    print(f"Total lines analyzed: {result['total_lines']}")
    print(f"Errors: {result['stats']['error_count']}")
    print(f"Warnings: {result['stats']['warning_count']}")
    print(f"Top errors: {result['stats']['top_errors'][:3]}")

if __name__ == "__main__":
    analyze_example()

实时监控脚本

# real_time_monitor.py - 实时监控脚本
import time
import requests
from datetime import datetime
import json
from typing import Dict, Any

class RealTimeMonitor:
    """
    实时监控器
    """
    
    def __init__(self, scrapyd_url: str = "http://localhost:6800"):
        self.scrapyd_url = scrapyd_url.rstrip('/')
        self.last_status = {}
    
    def monitor_loop(self, interval: int = 30):
        """
        监控循环
        """
        print(f"Starting real-time monitoring at {datetime.now()}")
        print(f"Monitoring Scrapyd at: {self.scrapyd_url}")
        
        while True:
            try:
                current_status = self.get_current_status()
                self.analyze_status_change(current_status)
                
                # 保存当前状态
                self.last_status = current_status.copy()
                
                # 等待下次检查
                time.sleep(interval)
                
            except KeyboardInterrupt:
                print("\nMonitoring stopped by user.")
                break
            except Exception as e:
                print(f"Error during monitoring: {e}")
                time.sleep(interval)
    
    def get_current_status(self) -> Dict[str, Any]:
        """
        获取当前状态
        """
        status = {}
        
        try:
            # 获取守护进程状态
            response = requests.get(f"{self.scrapyd_url}/daemonstatus.json")
            status['daemon'] = response.json()
        except Exception as e:
            status['daemon'] = {'error': str(e)}
        
        try:
            # 获取所有任务
            response = requests.get(f"{self.scrapyd_url}/listjobs.json")
            status['jobs'] = response.json()
        except Exception as e:
            status['jobs'] = {'error': str(e)}
        
        return status
    
    def analyze_status_change(self, current_status: Dict[str, Any]):
        """
        分析状态变化
        """
        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        
        # 检查守护进程状态
        daemon_status = current_status.get('daemon', {})
        if 'error' not in daemon_status:
            running = daemon_status.get('running', 0)
            print(f"[{timestamp}] Daemon: {running} running jobs")
        
        # 检查任务状态变化
        jobs_status = current_status.get('jobs', {})
        if 'error' not in jobs_status:
            pending = len(jobs_status.get('pending', []))
            running = len(jobs_status.get('running', []))
            finished = len(jobs_status.get('finished', []))
            
            print(f"  Jobs - Pending: {pending}, Running: {running}, Finished: {finished}")
            
            # 检查是否有状态变化
            if self.last_status:
                self._check_job_changes(jobs_status)
    
    def _check_job_changes(self, current_jobs: Dict[str, Any]):
        """
        检查任务变化
        """
        last_jobs = self.last_status.get('jobs', {})
        
        # 检查新开始的任务
        current_running_ids = {job['id'] for job in current_jobs.get('running', [])}
        last_running_ids = {job['id'] for job in last_jobs.get('running', [])}
        
        new_started = current_running_ids - last_running_ids
        if new_started:
            print(f"  🚀 New jobs started: {list(new_started)}")
        
        # 检查完成的任务
        current_finished_ids = {job['id'] for job in current_jobs.get('finished', [])}
        last_finished_ids = {job['id'] for job in last_jobs.get('finished', [])}
        
        new_finished = current_finished_ids - last_finished_ids
        if new_finished:
            print(f"  ✅ Jobs finished: {list(new_finished)}")

# 使用示例
if __name__ == "__main__":
    monitor = RealTimeMonitor("http://localhost:6800")
    monitor.monitor_loop(interval=10)  # 每10秒检查一次

生产环境部署

Docker部署方案

# Dockerfile - Scrapyd生产环境镜像
FROM python:3.9-slim

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    libxml2-dev \
    libxslt1-dev \
    libffi-dev \
    libssl-dev \
    wget \
    curl \
    && rm -rf /var/lib/apt/lists/*

# 设置工作目录
WORKDIR /app

# 安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 安装Scrapyd
RUN pip install scrapyd

# 创建配置目录
RUN mkdir -p /etc/scrapyd /var/log/scrapyd /var/lib/scrapyd

# 复制配置文件
COPY scrapyd.conf /etc/scrapyd/scrapyd.conf

# 创建用户
RUN groupadd -r scrapyd && useradd -r -g scrapyd scrapyd
RUN chown -R scrapyd:scrapyd /var/log/scrapyd /var/lib/scrapyd /etc/scrapyd

# 暴露端口
EXPOSE 6800

# 启动命令
CMD ["scrapyd", "-c", "/etc/scrapyd/scrapyd.conf"]
# docker-compose.yml - 多服务部署
version: '3.8'

services:
  scrapyd:
    build: .
    container_name: scrapyd-server
    ports:
      - "6800:6800"
    volumes:
      - ./logs:/var/log/scrapyd
      - ./projects:/var/lib/scrapyd
    environment:
      - PYTHONPATH=/app
    restart: unless-stopped
    networks:
      - scrapyd-net

  scrapydweb:
    image: my8100/scrapydweb:latest
    container_name: scrapydweb-ui
    ports:
      - "5000:5000"
    environment:
      - SCRAPYDWEB_BIND=0.0.0.0
      - SCRAPYDWEB_PORT=5000
      - SCRAPYD_SERVERS=localhost:6800
      - ENABLE_AUTHENTICATION=True
      - USERNAME=admin
      - PASSWORD=your_secure_password
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
    depends_on:
      - scrapyd
    restart: unless-stopped
    networks:
      - scrapyd-net

networks:
  scrapyd-net:
    driver: bridge

Nginx反向代理配置

# /etc/nginx/sites-available/scrapyd
upstream scrapyd_backend {
    server localhost:6800;
    keepalive 32;
}

server {
    listen 80;
    server_name scrapyd.yourdomain.com;
    
    # 基本安全设置
    add_header X-Frame-Options DENY;
    add_header X-Content-Type-Options nosniff;
    add_header X-XSS-Protection "1; mode=block";
    
    # 限制请求大小
    client_max_body_size 100M;
    
    # 代理设置
    location / {
        proxy_pass http://scrapyd_backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # 连接和超时设置
        proxy_connect_timeout 30s;
        proxy_send_timeout 120s;
        proxy_read_timeout 120s;
        
        # 缓冲设置
        proxy_buffering off;
        proxy_request_buffering off;
    }
    
    # 健康检查
    location /health {
        access_log off;
        return 200 "healthy\n";
        add_header Content-Type text/plain;
    }
}

# /etc/nginx/sites-available/scrapydweb
upstream scrapydweb_backend {
    server localhost:5000;
    keepalive 32;
}

server {
    listen 80;
    server_name scrapydweb.yourdomain.com;
    
    # SSL重定向(如果使用HTTPS)
    # return 301 https://$server_name$request_uri;
    
    # 基本安全设置
    add_header X-Frame-Options DENY;
    add_header X-Content-Type-Options nosniff;
    add_header X-XSS-Protection "1; mode=block";
    
    # 限制请求大小
    client_max_body_size 10M;
    
    # 代理设置
    location / {
        proxy_pass http://scrapydweb_backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # 连接和超时设置
        proxy_connect_timeout 30s;
        proxy_send_timeout 300s;
        proxy_read_timeout 300s;
        
        # 缓冲设置
        proxy_buffering off;
        proxy_request_buffering off;
    }
    
    # 静态文件缓存
    location ~* \.(css|js|png|jpg|jpeg|gif|ico|svg)$ {
        proxy_pass http://scrapydweb_backend;
        expires 1y;
        add_header Cache-Control "public, immutable";
    }
}

系统服务配置

# /etc/systemd/system/scrapyd.service - Scrapyd系统服务
[Unit]
Description=Scrapyd Service
After=network.target

[Service]
Type=simple
User=scrapyd
Group=scrapyd
WorkingDirectory=/var/lib/scrapyd
ExecStart=/usr/local/bin/scrapyd -c /etc/scrapyd/scrapyd.conf
Restart=always
RestartSec=10

# 环境变量
Environment=PYTHONPATH=/var/lib/scrapyd
Environment=SCRAPY_SETTINGS_MODULE=myproject.settings

# 资源限制
LimitNOFILE=65536
LimitNPROC=32768

[Install]
WantedBy=multi-user.target
# /etc/systemd/system/scrapydweb.service - ScrapydWeb系统服务
[Unit]
Description=ScrapydWeb Service
After=network.target

[Service]
Type=simple
User=www-data
Group=www-data
WorkingDirectory=/opt/scrapydweb
ExecStart=/usr/local/bin/scrapydweb -c /opt/scrapydweb/config.py
Restart=always
RestartSec=10

# 环境变量
Environment=FLASK_APP=scrapydweb
Environment=FLASK_ENV=production

# 资源限制
LimitNOFILE=65536
LimitNPROC=32768

[Install]
WantedBy=multi-user.target

性能优化与调优

性能监控指标

# performance_monitor.py - 性能监控
import psutil
import time
import threading
from collections import deque
from typing import Dict, List

class ScrapydPerformanceMonitor:
    """
    Scrapyd性能监控器
    """
    
    def __init__(self):
        self.metrics = {
            'cpu_percent': deque(maxlen=100),
            'memory_percent': deque(maxlen=100),
            'disk_io': deque(maxlen=100),
            'network_io': deque(maxlen=100),
            'process_count': deque(maxlen=100),
            'thread_count': deque(maxlen=100),
        }
        self.running = False
        self.monitor_thread = None
    
    def start_monitoring(self):
        """
        开始监控
        """
        self.running = True
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
    
    def stop_monitoring(self):
        """
        停止监控
        """
        self.running = False
        if self.monitor_thread:
            self.monitor_thread.join()
    
    def _monitor_loop(self):
        """
        监控循环
        """
        process = psutil.Process()
        
        while self.running:
            try:
                # CPU使用率
                cpu_percent = process.cpu_percent()
                self.metrics['cpu_percent'].append(cpu_percent)
                
                # 内存使用率
                memory_info = process.memory_info()
                memory_percent = process.memory_percent()
                self.metrics['memory_percent'].append(memory_percent)
                
                # 磁盘IO
                disk_io = psutil.disk_io_counters()
                if disk_io:
                    self.metrics['disk_io'].append({
                        'read_bytes': disk_io.read_bytes,
                        'write_bytes': disk_io.write_bytes
                    })
                
                # 网络IO
                net_io = psutil.net_io_counters()
                if net_io:
                    self.metrics['network_io'].append({
                        'bytes_sent': net_io.bytes_sent,
                        'bytes_recv': net_io.bytes_recv
                    })
                
                # 进程和线程数
                num_threads = process.num_threads()
                self.metrics['thread_count'].append(num_threads)
                
                # 等待下次采样
                time.sleep(1)
                
            except Exception as e:
                print(f"Error in performance monitoring: {e}")
                time.sleep(1)
    
    def get_current_metrics(self) -> Dict:
        """
        获取当前指标
        """
        if not self.metrics['cpu_percent']:
            return {}
        
        return {
            'cpu_percent': self.metrics['cpu_percent'][-1],
            'memory_percent': self.metrics['memory_percent'][-1],
            'avg_cpu_percent': sum(self.metrics['cpu_percent']) / len(self.metrics['cpu_percent']),
            'avg_memory_percent': sum(self.metrics['memory_percent']) / len(self.metrics['memory_percent']),
            'current_threads': self.metrics['thread_count'][-1] if self.metrics['thread_count'] else 0,
            'peak_cpu': max(self.metrics['cpu_percent']) if self.metrics['cpu_percent'] else 0,
            'peak_memory': max(self.metrics['memory_percent']) if self.metrics['memory_percent'] else 0,
        }
    
    def get_historical_metrics(self, minutes: int = 10) -> Dict:
        """
        获取历史指标
        """
        samples = minutes * 60  # 假设每秒采样一次
        
        result = {}
        for key, values in self.metrics.items():
            recent_values = list(values)[-samples:] if len(values) >= samples else list(values)
            if recent_values:
                result[key] = {
                    'current': recent_values[-1],
                    'average': sum(recent_values) / len(recent_values),
                    'min': min(recent_values),
                    'max': max(recent_values),
                    'trend': self._calculate_trend(recent_values)
                }
        
        return result
    
    def _calculate_trend(self, values: List[float]) -> str:
        """
        计算趋势
        """
        if len(values) < 2:
            return 'stable'
        
        recent_avg = sum(values[-5:]) / min(5, len(values))
        older_avg = sum(values[:5]) / min(5, len(values))
        
        if recent_avg > older_avg * 1.2:
            return 'increasing'
        elif recent_avg < older_avg * 0.8:
            return 'decreasing'
        else:
            return 'stable'

# 使用示例
def performance_example():
    monitor = ScrapydPerformanceMonitor()
    monitor.start_monitoring()
    
    try:
        time.sleep(10)  # 监控10秒
        
        current_metrics = monitor.get_current_metrics()
        print("Current Metrics:", current_metrics)
        
        historical_metrics = monitor.get_historical_metrics(minutes=1)
        print("Historical Metrics:", historical_metrics)
        
    finally:
        monitor.stop_monitoring()

配置优化建议

# optimized_scrapyd.conf - 优化的Scrapyd配置
[scrapyd]
# 网络配置
bind_address = 0.0.0.0
port = 6800

# 性能优化
max_proc = 8  # 根据CPU核心数调整
max_proc_per_host = 2
poll_interval = 2  # 更频繁的检查
status_update_interval = 5

# 日志优化
logs_to_keep = 14  # 保留2周日志
keep_jobs = 10  # 保留10个任务记录
debug = off

# 资源管理
eggs_dir = /var/lib/scrapyd/eggs
logs_dir = /var/log/scrapyd
dbs_dir = /var/lib/scrapyd/dbs
items_dir = /var/lib/scrapyd/items

# 内存优化
# 限制单个爬虫的内存使用
# 通过爬虫设置进行控制

[myproject]
# 项目特定优化
max_proc = 4
max_proc_per_host = 1

# 为不同类型的爬虫设置不同的限制
[crawling_intensive_project]
max_proc = 2  # 高负载项目限制并发
max_proc_per_host = 1

[data_heavy_project]
max_proc = 3  # 数据密集型项目
logs_to_keep = 30  # 保留更多日志用于调试

故障排查与维护

常见问题诊断

# troubleshooter.py - 故障诊断工具
import os
import psutil
import requests
from pathlib import Path
from typing import Dict, List

class ScrapydTroubleshooter:
    """
    Scrapyd故障诊断工具
    """
    
    def __init__(self, scrapyd_url: str = "http://localhost:6800"):
        self.scrapyd_url = scrapyd_url.rstrip('/')
    
    def diagnose_system(self) -> Dict:
        """
        系统诊断
        """
        diagnosis = {
            'system_health': {},
            'scrapyd_status': {},
            'resource_usage': {},
            'recommendations': []
        }
        
        # 系统健康检查
        diagnosis['system_health'] = self._check_system_health()
        
        # Scrapyd状态检查
        diagnosis['scrapyd_status'] = self._check_scrapyd_status()
        
        # 资源使用检查
        diagnosis['resource_usage'] = self._check_resource_usage()
        
        # 生成建议
        diagnosis['recommendations'] = self._generate_recommendations(diagnosis)
        
        return diagnosis
    
    def _check_system_health(self) -> Dict:
        """
        检查系统健康状况
        """
        health = {
            'cpu_percent': psutil.cpu_percent(interval=1),
            'memory_percent': psutil.virtual_memory().percent,
            'disk_percent': psutil.disk_usage('/').percent,
            'load_average': os.getloadavg() if hasattr(os, 'getloadavg') else None,
            'processes': len(psutil.pids()),
            'uptime': time.time() - psutil.boot_time()
        }
        
        # 检查关键指标
        issues = []
        if health['cpu_percent'] > 80:
            issues.append(f"High CPU usage: {health['cpu_percent']}%")
        if health['memory_percent'] > 85:
            issues.append(f"High memory usage: {health['memory_percent']}%")
        if health['disk_percent'] > 90:
            issues.append(f"High disk usage: {health['disk_percent']}%")
        
        health['issues'] = issues
        health['overall_status'] = 'WARNING' if issues else 'OK'
        
        return health
    
    def _check_scrapyd_status(self) -> Dict:
        """
        检查Scrapyd状态
        """
        status = {}
        
        try:
            # 测试连接
            response = requests.get(f"{self.scrapyd_url}/daemonstatus.json", timeout=10)
            if response.status_code == 200:
                status['connected'] = True
                status['response_time'] = response.elapsed.total_seconds()
                status['daemon_info'] = response.json()
            else:
                status['connected'] = False
                status['error'] = f"HTTP {response.status_code}"
        except requests.exceptions.RequestException as e:
            status['connected'] = False
            status['error'] = str(e)
        
        return status
    
    def _check_resource_usage(self) -> Dict:
        """
        检查资源使用情况
        """
        usage = {
            'logs_size': self._get_directory_size('/var/log/scrapyd'),
            'eggs_size': self._get_directory_size('/var/lib/scrapyd/eggs'),
            'dbs_size': self._get_directory_size('/var/lib/scrapyd/dbs'),
            'processes': self._get_scrapyd_processes()
        }
        
        return usage
    
    def _get_directory_size(self, path: str) -> int:
        """
        获取目录大小
        """
        try:
            path = Path(path)
            if not path.exists():
                return 0
            
            total_size = 0
            for dirpath, dirnames, filenames in os.walk(path):
                for filename in filenames:
                    filepath = os.path.join(dirpath, filename)
                    try:
                        total_size += os.path.getsize(filepath)
                    except OSError:
                        continue
            return total_size
        except Exception:
            return 0
    
    def _get_scrapyd_processes(self) -> List[Dict]:
        """
        获取Scrapyd相关进程
        """
        processes = []
        for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
            try:
                if 'scrapy' in proc.info['name'].lower() or 'scrapyd' in proc.info['name'].lower():
                    processes.append(proc.info)
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue
        return processes
    
    def _generate_recommendations(self, diagnosis: Dict) -> List[str]:
        """
        生成建议
        """
        recommendations = []
        
        # 系统资源建议
        if diagnosis['system_health']['cpu_percent'] > 80:
            recommendations.append("Consider reducing concurrent crawlers or upgrading hardware")
        if diagnosis['system_health']['memory_percent'] > 85:
            recommendations.append("Increase system memory or reduce crawler memory limits")
        if diagnosis['system_health']['disk_percent'] > 90:
            recommendations.append("Clean up old logs and data files")
        
        # Scrapyd配置建议
        if not diagnosis['scrapyd_status']['connected']:
            recommendations.append("Check Scrapyd service status and network connectivity")
        
        # 资源使用建议
        if diagnosis['resource_usage']['logs_size'] > 10 * 1024 * 1024 * 1024:  # 10GB
            recommendations.append("Logs directory is large, consider cleaning old logs")
        
        if not recommendations:
            recommendations.append("System appears to be running normally")
        
        return recommendations

# 使用示例
def troubleshoot_example():
    troubleshooter = ScrapydTroubleshooter()
    diagnosis = troubleshooter.diagnose_system()
    
    print("System Health:", diagnosis['system_health'])
    print("Scrapyd Status:", diagnosis['scrapyd_status'])
    print("Resource Usage:", diagnosis['resource_usage'])
    print("Recommendations:", diagnosis['recommendations'])

if __name__ == "__main__":
    troubleshoot_example()

自动化维护脚本

#!/bin/bash
# maintenance.sh - 自动化维护脚本

# 配置变量
LOG_DIR="/var/log/scrapyd"
EGGS_DIR="/var/lib/scrapyd/eggs"
BACKUP_DIR="/backup/scrapyd"
RETENTION_DAYS=30

# 日志轮转
rotate_logs() {
    echo "Rotating logs..."
    
    # 压缩旧日志
    find $LOG_DIR -name "*.log" -mtime +7 -exec gzip {} \;
    
    # 删除超过30天的压缩日志
    find $LOG_DIR -name "*.log.gz" -mtime +$RETENTION_DAYS -delete
    
    echo "Log rotation completed."
}

# 清理旧版本
cleanup_old_versions() {
    echo "Cleaning up old versions..."
    
    # 获取所有项目
    PROJECTS=$(curl -s "http://localhost:6800/listprojects.json" | python -c "import sys, json; [print(p) for p in json.load(sys.stdin).get('projects', [])]")
    
    for project in $PROJECTS; do
        # 获取项目版本
        VERSIONS=$(curl -s "http://localhost:6800/listversions.json?project=$project" | python -c "import sys, json; [print(v) for v in json.load(sys.stdin).get('versions', [])]")
        
        # 保留最新的3个版本,删除其余的
        OLD_VERSIONS=$(echo "$VERSIONS" | tail -n +4)
        for version in $OLD_VERSIONS; do
            echo "Deleting old version $project:$version"
            curl -X POST "http://localhost:6800/delversion.json" -d "project=$project&version=$version"
        done
    done
    
    echo "Version cleanup completed."
}

# 备份配置
backup_config() {
    echo "Backing up configuration..."
    
    TIMESTAMP=$(date +%Y%m%d_%H%M%S)
    BACKUP_FILE="$BACKUP_DIR/scrapyd_backup_$TIMESTAMP.tar.gz"
    
    mkdir -p $BACKUP_DIR
    
    tar -czf $BACKUP_FILE \
        /etc/scrapyd/scrapyd.conf \
        /var/lib/scrapyd/eggs \
        /var/lib/scrapyd/dbs \
        --exclude="*.tmp" \
        --exclude="*.log"
    
    echo "Backup created: $BACKUP_FILE"
    
    # 删除7天前的备份
    find $BACKUP_DIR -name "scrapyd_backup_*.tar.gz" -mtime +7 -delete
}

# 检查服务状态
check_service_status() {
    echo "Checking service status..."
    
    # 检查Scrapyd进程
    if pgrep -f scrapyd > /dev/null; then
        echo "✓ Scrapyd is running"
    else
        echo "✗ Scrapyd is not running"
        # 尝试重启
        systemctl restart scrapyd
        sleep 5
        if pgrep -f scrapyd > /dev/null; then
            echo "✓ Scrapyd restarted successfully"
        else
            echo "✗ Failed to restart Scrapyd"
        fi
    fi
    
    # 检查ScrapydWeb进程
    if pgrep -f scrapydweb > /dev/null; then
        echo "✓ ScrapydWeb is running"
    else
        echo "✗ ScrapydWeb is not running"
        systemctl restart scrapydweb
        sleep 5
        if pgrep -f scrapydweb > /dev/null; then
            echo "✓ ScrapydWeb restarted successfully"
        else
            echo "✗ Failed to restart ScrapydWeb"
        fi
    fi
}

# 主维护函数
main() {
    echo "Starting maintenance tasks at $(date)"
    
    check_service_status
    rotate_logs
    cleanup_old_versions
    backup_config
    
    echo "Maintenance tasks completed at $(date)"
}

# 执行主函数
main

最佳实践总结

部署最佳实践

  1. 环境隔离

    # 使用虚拟环境
    python -m venv scrapyd_env
    source scrapyd_env/bin/activate
    pip install scrapyd scrapyd-client
  2. 配置管理

    # 使用不同的配置文件管理不同环境
    [deploy:development]
    url = http://dev-server:6800/
    project = myproject
    
    [deploy:staging]
    url = http://staging-server:6800/
    project = myproject
    
    [deploy:production]
    url = http://prod-server:6800/
    project = myproject
  3. 安全性

    • 使用防火墙限制访问
    • 配置认证和授权
    • 定期更新依赖包

监控最佳实践

  1. 关键指标监控

    • 运行中的爬虫数量
    • 任务成功率
    • 错误率趋势
    • 资源使用情况
  2. 告警设置

    • 爬虫长时间无响应
    • 错误率超过阈值
    • 资源使用过高
  3. 日志管理

    • 结构化日志格式
    • 日志轮转策略
    • 集中日志分析

性能优化最佳实践

  1. 资源配置

    # 根据硬件资源合理配置
    max_proc = [CPU核心数]
    max_proc_per_host = 2-4
  2. 任务调度

    • 避免同时启动大量爬虫
    • 使用队列管理系统
    • 实现优雅的任务取消
  3. 资源回收

    • 定期清理旧版本
    • 及时释放不再使用的资源
    • 实现内存泄漏检测

💡 核心要点: Scrapyd和ScrapydWeb是构建生产级爬虫运维体系的重要工具。通过合理的配置、完善的监控和规范的运维流程,可以实现爬虫的自动化部署和稳定运行。

SEO优化策略

  1. 关键词优化: 在标题、内容中合理布局"Scrapyd", "ScrapydWeb", "爬虫部署", "爬虫监控", "运维管理", "自动化部署"等关键词
  2. 内容结构: 使用清晰的标题层级(H1-H6),便于搜索引擎理解内容结构
  3. 内部链接: 建立与其他相关教程的内部链接,提升页面权重
  4. 元数据优化: 在页面头部包含描述性的标题、描述和标签

🔗 相关教程推荐

🏷️ 标签云: Scrapyd ScrapydWeb 爬虫部署 爬虫监控 运维管理 自动化部署 分布式爬虫 系统运维 性能优化 故障排查