抓取监控看板 - 爬虫系统实时监控与告警详解

📂 所属阶段:第六阶段 — 运维与监控(工程化篇)
🔗 相关章节:Scrapyd与ScrapydWeb · Docker容器化爬虫 · Scrapy-Redis分布式架构

目录

监控系统概述

爬虫监控系统是保障爬虫稳定运行的重要组成部分,通过实时监控、数据分析和告警机制,能够及时发现并解决爬虫运行中的问题。

监控的重要性

1. 可靠性保障

  • 实时监控爬虫状态
  • 及时发现异常情况
  • 确保数据抓取质量

2. 性能优化

  • 识别性能瓶颈
  • 优化资源使用
  • 提升抓取效率

3. 成本控制

  • 监控资源消耗
  • 优化资源配置
  • 降低运营成本

4. 业务连续性

  • 保障数据供应
  • 快速响应故障
  • 确保业务正常

监控架构设计

"""
监控系统架构:

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   数据源        │────│   指标收集      │────│   数据存储      │
│  (爬虫应用)     │    │  (Prometheus)   │    │  (时序数据库)   │
└─────────────────┘    └─────────────────┘    └─────────────────┘


┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   日志收集      │────│   数据处理      │────│   告警系统      │
│  (ELK Stack)    │    │  (Grafana)      │    │  (AlertManager) │
└─────────────────┘    └─────────────────┘    └─────────────────┘

关键监控指标

爬虫运行指标

  • 抓取成功率
  • 请求失败率
  • 并发请求数
  • 响应时间分布

系统资源指标

  • CPU使用率
  • 内存使用率
  • 网络I/O
  • 磁盘I/O

业务指标

  • 抓取数据量
  • 数据质量评分
  • 任务完成率
  • 错误恢复率

指标收集与存储

Prometheus指标收集

# metrics_collector.py - Prometheus指标收集器
from prometheus_client import start_http_server, Counter, Gauge, Histogram, Summary
import time
import requests
from typing import Dict, Any
import logging

# 定义指标
# 计数器 - 累计指标
SPIDER_REQUEST_COUNT = Counter(
    'spider_requests_total',
    'Total number of spider requests',
    ['spider_name', 'status_code']
)

SPIDER_ERROR_COUNT = Counter(
    'spider_errors_total',
    'Total number of spider errors',
    ['spider_name', 'error_type']
)

# 指标 - 当前状态
SPIDER_ACTIVE_JOBS = Gauge(
    'spider_active_jobs',
    'Number of active spider jobs',
    ['spider_name']
)

SPIDER_CURRENT_SPEED = Gauge(
    'spider_current_speed',
    'Current crawling speed (requests/second)',
    ['spider_name']
)

# 直方图 - 请求延迟分布
SPIDER_RESPONSE_TIME = Histogram(
    'spider_response_time_seconds',
    'Spider response time distribution',
    ['spider_name'],
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, float('inf')]
)

# 摘要 - 请求延迟统计
SPIDER_REQUEST_LATENCY = Summary(
    'spider_request_latency_seconds',
    'Spider request latency summary',
    ['spider_name']
)

class SpiderMetricsCollector:
    """
    爬虫指标收集器
    """
    
    def __init__(self, port: int = 8000):
        self.port = port
        self.logger = logging.getLogger(__name__)
    
    def start_server(self):
        """
        启动指标服务器
        """
        start_http_server(self.port)
        self.logger.info(f"Metrics server started on port {self.port}")
    
    def record_request(self, spider_name: str, status_code: int, response_time: float):
        """
        记录请求指标
        """
        SPIDER_REQUEST_COUNT.labels(spider_name=spider_name, status_code=status_code).inc()
        SPIDER_RESPONSE_TIME.labels(spider_name=spider_name).observe(response_time)
        SPIDER_REQUEST_LATENCY.labels(spider_name=spider_name).observe(response_time)
    
    def record_error(self, spider_name: str, error_type: str):
        """
        记录错误指标
        """
        SPIDER_ERROR_COUNT.labels(spider_name=spider_name, error_type=error_type).inc()
    
    def update_active_jobs(self, spider_name: str, count: int):
        """
        更新活跃任务数
        """
        SPIDER_ACTIVE_JOBS.labels(spider_name=spider_name).set(count)
    
    def update_speed(self, spider_name: str, speed: float):
        """
        更新抓取速度
        """
        SPIDER_CURRENT_SPEED.labels(spider_name=spider_name).set(speed)

# 使用示例
if __name__ == "__main__":
    collector = SpiderMetricsCollector(port=8000)
    collector.start_server()
    
    # 模拟指标收集
    while True:
        collector.record_request('example_spider', 200, 0.5)
        collector.update_speed('example_spider', 10.5)
        time.sleep(1)

自定义指标收集器

# custom_collector.py - 自定义指标收集器
from prometheus_client.core import CollectorRegistry, GaugeMetricFamily
from prometheus_client import CollectorRegistry, Gauge
import requests
import json
from datetime import datetime

class ScrapydMetricsCollector:
    """
    Scrapyd指标收集器
    """
    
    def __init__(self, scrapyd_url: str = "http://localhost:6800"):
        self.scrapyd_url = scrapyd_url.rstrip('/')
    
    def collect(self):
        """
        收集Scrapyd指标
        """
        try:
            # 获取守护进程状态
            status_response = requests.get(f"{self.scrapyd_url}/daemonstatus.json")
            if status_response.status_code == 200:
                status_data = status_response.json()
                
                # 守护进程运行中的任务数
                running_jobs = GaugeMetricFamily(
                    'scrapyd_running_jobs',
                    'Number of running jobs in Scrapyd',
                    value=status_data.get('running', 0)
                )
                yield running_jobs
        
        except Exception as e:
            print(f"Error collecting Scrapyd metrics: {e}")

class RedisMetricsCollector:
    """
    Redis指标收集器
    """
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_url = redis_url
    
    def collect(self):
        """
        收集Redis指标
        """
        try:
            import redis
            r = redis.from_url(self.redis_url)
            
            # Redis信息
            info = r.info()
            
            # 内存使用
            memory_used = GaugeMetricFamily(
                'redis_memory_used_bytes',
                'Redis memory used in bytes',
                value=info.get('used_memory', 0)
            )
            yield memory_used
            
            # 连接数
            connections = GaugeMetricFamily(
                'redis_connected_clients',
                'Number of connected clients',
                value=info.get('connected_clients', 0)
            )
            yield connections
            
            # 命令处理
            commands_processed = GaugeMetricFamily(
                'redis_total_commands_processed',
                'Total number of commands processed',
                value=info.get('total_commands_processed', 0)
            )
            yield commands_processed
        
        except Exception as e:
            print(f"Error collecting Redis metrics: {e}")

# 注册自定义收集器
from prometheus_client import REGISTRY

def register_custom_collectors():
    """
    注册自定义收集器
    """
    REGISTRY.register(ScrapydMetricsCollector())
    REGISTRY.register(RedisMetricsCollector())

指标存储配置

# prometheus.yml - Prometheus配置文件
global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "alert_rules.yml"

scrape_configs:
  # Scrapy爬虫指标
  - job_name: 'scrapy-spiders'
    static_configs:
      - targets: ['localhost:8000']
    scrape_interval: 5s
    scrape_timeout: 4s

  # Scrapyd服务
  - job_name: 'scrapyd'
    static_configs:
      - targets: ['localhost:6800']
    metrics_path: /metrics
    scrape_interval: 10s

  # Redis监控
  - job_name: 'redis'
    static_configs:
      - targets: ['localhost:9121']  # Redis Exporter
    scrape_interval: 10s

  # 系统监控
  - job_name: 'node-exporter'
    static_configs:
      - targets: ['localhost:9100']
    scrape_interval: 15s

  # Docker监控
  - job_name: 'docker'
    static_configs:
      - targets: ['localhost:8080']
    scrape_interval: 10s

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - localhost:9093

数据可视化

Grafana仪表板配置

{
  "dashboard": {
    "id": null,
    "title": "Scrapy爬虫监控仪表板",
    "tags": ["scrapy", "spider", "monitoring"],
    "style": "dark",
    "timezone": "browser",
    "panels": [
      {
        "id": 1,
        "title": "爬虫状态概览",
        "type": "stat",
        "gridPos": {"h": 8, "w": 6, "x": 0, "y": 0},
        "targets": [
          {
            "expr": "sum(spiders_requests_total)",
            "legendFormat": "总请求数"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "color": {
              "mode": "thresholds"
            },
            "thresholds": {
              "steps": [
                {"color": "green", "value": null},
                {"color": "red", "value": 80}
              ]
            }
          }
        }
      },
      {
        "id": 2,
        "title": "爬虫活跃任务",
        "type": "graph",
        "gridPos": {"h": 8, "w": 12, "x": 6, "y": 0},
        "targets": [
          {
            "expr": "spider_active_jobs",
            "legendFormat": "{{spider_name}}"
          }
        ]
      },
      {
        "id": 3,
        "title": "响应时间分布",
        "type": "histogram",
        "gridPos": {"h": 8, "w": 6, "x": 18, "y": 0},
        "targets": [
          {
            "expr": "rate(spiders_response_time_seconds_bucket[5m])",
            "legendFormat": "{{le}}"
          }
        ]
      },
      {
        "id": 4,
        "title": "错误率监控",
        "type": "graph",
        "gridPos": {"h": 8, "w": 12, "x": 0, "y": 8},
        "targets": [
          {
            "expr": "rate(spiders_errors_total[5m])",
            "legendFormat": "{{spider_name}} - {{error_type}}"
          }
        ]
      },
      {
        "id": 5,
        "title": "抓取速度",
        "type": "singlestat",
        "gridPos": {"h": 8, "w": 6, "x": 12, "y": 8},
        "targets": [
          {
            "expr": "avg_over_time(spiders_current_speed[1m])",
            "legendFormat": "平均速度"
          }
        ]
      },
      {
        "id": 6,
        "title": "资源使用情况",
        "type": "graph",
        "gridPos": {"h": 8, "w": 6, "x": 18, "y": 8},
        "targets": [
          {
            "expr": "rate(process_cpu_seconds_total[5m])",
            "legendFormat": "CPU使用率"
          },
          {
            "expr": "process_resident_memory_bytes",
            "legendFormat": "内存使用"
          }
        ]
      }
    ],
    "time": {
      "from": "now-1h",
      "to": "now"
    },
    "refresh": "5s"
  }
}

自定义可视化组件

# visualization.py - 自定义可视化组件
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px

class SpiderVisualization:
    """
    爬虫数据可视化类
    """
    
    def __init__(self):
        self.figures = {}
    
    def create_performance_dashboard(self, data: pd.DataFrame):
        """
        创建性能仪表板
        """
        fig = make_subplots(
            rows=2, cols=2,
            subplot_titles=('请求成功率', '响应时间', '错误率', '并发数'),
            specs=[[{"secondary_y": False}, {"secondary_y": False}],
                   [{"secondary_y": False}, {"secondary_y": False}]]
        )
        
        # 请求成功率
        fig.add_trace(
            go.Scatter(x=data['timestamp'], y=data['success_rate'], 
                      name='成功率', line=dict(color='green')),
            row=1, col=1
        )
        
        # 响应时间
        fig.add_trace(
            go.Scatter(x=data['timestamp'], y=data['response_time'], 
                      name='响应时间', line=dict(color='blue')),
            row=1, col=2
        )
        
        # 错误率
        fig.add_trace(
            go.Scatter(x=data['timestamp'], y=data['error_rate'], 
                      name='错误率', line=dict(color='red')),
            row=2, col=1
        )
        
        # 并发数
        fig.add_trace(
            go.Scatter(x=data['timestamp'], y=data['concurrency'], 
                      name='并发数', line=dict(color='orange')),
            row=2, col=2
        )
        
        fig.update_layout(height=600, showlegend=True, 
                         title_text="爬虫性能监控仪表板")
        
        return fig
    
    def create_error_analysis(self, error_data: pd.DataFrame):
        """
        创建错误分析图表
        """
        fig = go.Figure(data=[
            go.Bar(name='解析错误', x=error_data['timestamp'], y=error_data['parse_errors']),
            go.Bar(name='网络错误', x=error_data['timestamp'], y=error_data['network_errors']),
            go.Bar(name='超时错误', x=error_data['timestamp'], y=error_data['timeout_errors'])
        ])
        
        fig.update_layout(
            barmode='stack',
            title='错误类型分析',
            xaxis_title='时间',
            yaxis_title='错误数量'
        )
        
        return fig
    
    def create_resource_utilization(self, resource_data: pd.DataFrame):
        """
        创建资源利用率图表
        """
        fig = make_subplots(specs=[[{"secondary_y": True}]])
        
        fig.add_trace(
            go.Scatter(x=resource_data['timestamp'], y=resource_data['cpu_percent'], 
                      name="CPU使用率", line=dict(color='red')),
            secondary_y=False,
        )
        
        fig.add_trace(
            go.Scatter(x=resource_data['timestamp'], y=resource_data['memory_percent'], 
                      name="内存使用率", line=dict(color='blue')),
            secondary_y=True,
        )
        
        fig.update_xaxes(title_text="时间")
        fig.update_yaxes(title_text="CPU使用率 (%)", secondary_y=False)
        fig.update_yaxes(title_text="内存使用率 (%)", secondary_y=True)
        fig.update_layout(title_text="资源利用率监控")
        
        return fig
    
    def export_dashboard(self, dashboard_name: str = "spider_dashboard.html"):
        """
        导出仪表板
        """
        for name, fig in self.figures.items():
            fig.write_html(f"{dashboard_name}_{name}.html")

# 使用示例
def create_sample_visualization():
    # 创建示例数据
    dates = pd.date_range(start='2024-01-01', periods=100, freq='5min')
    sample_data = pd.DataFrame({
        'timestamp': dates,
        'success_rate': np.random.normal(0.95, 0.05, 100),
        'response_time': np.random.exponential(0.5, 100),
        'error_rate': np.random.exponential(0.02, 100),
        'concurrency': np.random.randint(1, 20, 100)
    })
    
    viz = SpiderVisualization()
    perf_fig = viz.create_performance_dashboard(sample_data)
    perf_fig.show()

实时数据推送

# real_time_push.py - 实时数据推送
import asyncio
import websockets
import json
import time
from datetime import datetime
import random

class RealTimeMetricsPusher:
    """
    实时指标推送器
    """
    
    def __init__(self, host: str = "localhost", port: int = 8765):
        self.host = host
        self.port = port
        self.clients = set()
    
    async def register_client(self, websocket):
        """
        注册客户端
        """
        self.clients.add(websocket)
        try:
            await websocket.wait_closed()
        finally:
            self.clients.remove(websocket)
    
    async def broadcast_metrics(self):
        """
        广播指标数据
        """
        while True:
            if self.clients:
                metrics = {
                    'timestamp': datetime.now().isoformat(),
                    'spiders': {
                        'example_spider': {
                            'active_requests': random.randint(0, 50),
                            'responses_per_second': random.uniform(0, 100),
                            'errors_per_minute': random.randint(0, 10),
                            'response_time_avg': random.uniform(0.1, 2.0)
                        }
                    },
                    'system': {
                        'cpu_percent': random.uniform(0, 100),
                        'memory_percent': random.uniform(0, 100),
                        'disk_usage': random.uniform(0, 100)
                    }
                }
                
                # 推送给所有客户端
                if self.clients:
                    await asyncio.wait([client.send(json.dumps(metrics)) for client in self.clients])
            
            await asyncio.sleep(1)  # 每秒推送一次
    
    async def start_server(self):
        """
        启动WebSocket服务器
        """
        print(f"Starting WebSocket server on {self.host}:{self.port}")
        
        # 启动指标广播任务
        asyncio.create_task(self.broadcast_metrics())
        
        # 启动WebSocket服务器
        async with websockets.serve(self.register_client, self.host, self.port):
            await asyncio.Future()  # 永远运行

# 启动实时推送服务器
if __name__ == "__main__":
    pusher = RealTimeMetricsPusher()
    asyncio.run(pusher.start_server())

告警系统

AlertManager配置

# alertmanager.yml - AlertManager配置
global:
  smtp_smarthost: 'smtp.gmail.com:587'
  smtp_from: 'alert@example.com'
  smtp_auth_username: 'alert@example.com'
  smtp_auth_password: 'password'

route:
  group_by: ['alertname', 'spider_name']
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 1h
  receiver: 'default-receiver'
  routes:
    - match:
        severity: critical
      receiver: 'critical-receiver'
      group_wait: 10s
      group_interval: 1m
      repeat_interval: 15m
    
    - match:
        alertname: HighErrorRate
      receiver: 'error-team'
      group_interval: 30s

receivers:
  - name: 'default-receiver'
    email_configs:
    - to: 'ops@example.com'
      send_resolved: true
  
  - name: 'critical-receiver'
    email_configs:
    - to: 'oncall@example.com'
      send_resolved: true
    webhook_configs:
    - url: 'http://slack-webhook:3000/alert'
  
  - name: 'error-team'
    email_configs:
    - to: 'errors@example.com'
      send_resolved: true

inhibit_rules:
  - source_match:
      severity: 'critical'
    target_match:
      severity: 'warning'
    equal: ['alertname', 'spider_name']

告警规则配置

# alert_rules.yml - 告警规则
groups:
  - name: spider_alerts
    rules:
      # 高错误率告警
      - alert: HighErrorRate
        expr: rate(spiders_errors_total[5m]) > 10
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "爬虫错误率过高"
          description: "爬虫 {{ $labels.spider_name }} 的错误率在过去5分钟内超过10次/分钟"
      
      # 响应时间过长
      - alert: HighResponseTime
        expr: histogram_quantile(0.95, rate(spiders_response_time_seconds_bucket[5m])) > 5
        for: 3m
        labels:
          severity: warning
        annotations:
          summary: "爬虫响应时间过长"
          description: "爬虫 {{ $labels.spider_name }} 的95%响应时间超过5秒"
      
      # 无活动告警
      - alert: NoActivity
        expr: increase(spiders_requests_total[10m]) == 0
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "爬虫无活动"
          description: "爬虫 {{ $labels.spider_name }} 在过去10分钟内没有发送任何请求"
      
      # 资源使用过高
      - alert: HighCpuUsage
        expr: rate(process_cpu_seconds_total[5m]) > 0.8
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "CPU使用率过高"
          description: "爬虫进程CPU使用率超过80%"
      
      - alert: HighMemoryUsage
        expr: process_resident_memory_bytes / (1024*1024) > 1024
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "内存使用过高"
          description: "爬虫进程内存使用超过1GB"
      
      # 爬虫崩溃告警
      - alert: SpiderCrashed
        expr: up{job="scrapy-spiders"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "爬虫进程崩溃"
          description: "爬虫进程 {{ $labels.instance }} 已停止响应"
      
      # 数据质量下降
      - alert: DataQualityLow
        expr: avg_over_time(spiders_data_quality_score[10m]) < 0.8
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "数据质量下降"
          description: "爬虫 {{ $labels.spider_name }} 的数据质量分数低于0.8"
      
      # 并发请求数异常
      - alert: ConcurrentRequestsAnomaly
        expr: spiders_concurrent_requests > 50
        for: 1m
        labels:
          severity: warning
        annotations:
          summary: "并发请求数异常"
          description: "爬虫 {{ $labels.spider_name }} 的并发请求数超过50"

自定义告警处理器

# alert_handler.py - 自定义告警处理器
import smtplib
import json
import requests
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import logging
from typing import Dict, List

class AlertHandler:
    """
    告警处理器
    """
    
    def __init__(self, config: Dict):
        self.config = config
        self.logger = logging.getLogger(__name__)
    
    def handle_prometheus_alert(self, alert_data: Dict):
        """
        处理Prometheus告警
        """
        alerts = alert_data.get('alerts', [])
        
        for alert in alerts:
            status = alert.get('status', 'firing')
            labels = alert.get('labels', {})
            annotations = alert.get('annotations', {})
            
            if status == 'firing':
                self.logger.warning(f"Alert fired: {annotations.get('summary', 'Unknown alert')}")
                self.send_notification(alert)
            elif status == 'resolved':
                self.logger.info(f"Alert resolved: {annotations.get('summary', 'Unknown alert')}")
                self.send_resolution_notification(alert)
    
    def send_notification(self, alert: Dict):
        """
        发送告警通知
        """
        labels = alert.get('labels', {})
        annotations = alert.get('annotations', {})
        
        # 根据告警级别选择通知方式
        severity = labels.get('severity', 'warning')
        
        if severity == 'critical':
            self.send_critical_notification(alert)
        else:
            self.send_warning_notification(alert)
    
    def send_critical_notification(self, alert: Dict):
        """
        发送严重告警通知
        """
        # 发送邮件
        self.send_email_notification(alert, critical=True)
        
        # 发送Slack通知
        self.send_slack_notification(alert)
        
        # 发送短信(如果配置)
        if 'sms_config' in self.config:
            self.send_sms_notification(alert)
    
    def send_warning_notification(self, alert: Dict):
        """
        发送警告通知
        """
        self.send_email_notification(alert, critical=False)
    
    def send_email_notification(self, alert: Dict, critical: bool = False):
        """
        发送邮件通知
        """
        try:
            config = self.config.get('email', {})
            
            msg = MIMEMultipart()
            msg['From'] = config.get('from')
            msg['To'] = ', '.join(config.get('recipients', []))
            msg['Subject'] = f"{'[CRITICAL]' if critical else '[WARNING]'} {alert['annotations']['summary']}"
            
            body = self.format_alert_message(alert)
            msg.attach(MIMEText(body, 'plain'))
            
            server = smtplib.SMTP(config.get('smtp_server'), config.get('smtp_port', 587))
            server.starttls()
            server.login(config.get('username'), config.get('password'))
            server.send_message(msg)
            server.quit()
            
            self.logger.info("Email notification sent successfully")
        
        except Exception as e:
            self.logger.error(f"Failed to send email notification: {e}")
    
    def send_slack_notification(self, alert: Dict):
        """
        发送Slack通知
        """
        try:
            webhook_url = self.config.get('slack', {}).get('webhook_url')
            if not webhook_url:
                return
            
            payload = {
                "text": f"🚨 {alert['annotations']['summary']}",
                "attachments": [
                    {
                        "color": "danger" if alert['labels'].get('severity') == 'critical' else "warning",
                        "fields": [
                            {
                                "title": "Description",
                                "value": alert['annotations'].get('description', 'No description'),
                                "short": False
                            },
                            {
                                "title": "Severity",
                                "value": alert['labels'].get('severity', 'unknown'),
                                "short": True
                            },
                            {
                                "title": "Time",
                                "value": datetime.now().isoformat(),
                                "short": True
                            }
                        ]
                    }
                ]
            }
            
            response = requests.post(webhook_url, json=payload)
            if response.status_code == 200:
                self.logger.info("Slack notification sent successfully")
            else:
                self.logger.error(f"Failed to send Slack notification: {response.status_code}")
        
        except Exception as e:
            self.logger.error(f"Failed to send Slack notification: {e}")
    
    def format_alert_message(self, alert: Dict) -> str:
        """
        格式化告警消息
        """
        labels = alert.get('labels', {})
        annotations = alert.get('annotations', {})
        
        message = f"""
告警信息:
========
时间:{datetime.now().isoformat()}
摘要:{annotations.get('summary', 'No summary')}
描述:{annotations.get('description', 'No description')}
级别:{labels.get('severity', 'unknown')}
爬虫:{labels.get('spider_name', 'unknown')}
标签:{', '.join([f'{k}={v}' for k, v in labels.items()])}

请尽快检查并处理!
        """
        
        return message.strip()

# Webhook处理器
from flask import Flask, request, jsonify

app = Flask(__name__)
alert_handler = AlertHandler({
    'email': {
        'smtp_server': 'smtp.gmail.com',
        'smtp_port': 587,
        'username': 'your-email@gmail.com',
        'password': 'your-app-password',
        'from': 'your-email@gmail.com',
        'recipients': ['ops@example.com']
    },
    'slack': {
        'webhook_url': 'https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK'
    }
})

@app.route('/webhook/alert', methods=['POST'])
def handle_alert_webhook():
    """
    处理告警Webhook
    """
    try:
        alert_data = request.json
        alert_handler.handle_prometheus_alert(alert_data)
        return jsonify({'status': 'ok'})
    except Exception as e:
        return jsonify({'status': 'error', 'message': str(e)}), 500

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=5000)

日志监控与分析

ELK Stack配置

# docker-compose.elk.yml - ELK Stack配置
version: '3.8'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - xpack.security.enabled=false
    ports:
      - "9200:9200"
    volumes:
      - es_data:/usr/share/elasticsearch/data
    networks:
      - elk-network

  logstash:
    image: docker.elastic.co/logstash/logstash:8.11.0
    container_name: logstash
    environment:
      - "LS_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
      - "5044:5044"
      - "5000:5000/tcp"
      - "5000:5000/udp"
      - "9600:9600"
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
    depends_on:
      - elasticsearch
    networks:
      - elk-network

  kibana:
    image: docker.elastic.co/kibana/kibana:8.11.0
    container_name: kibana
    ports:
      - "5601:5601"
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    depends_on:
      - elasticsearch
    networks:
      - elk-network

volumes:
  es_data:

networks:
  elk-network:
    driver: bridge

Logstash配置文件

# logstash.conf - Logstash配置
input {
  beats {
    port => 5044
  }
  
  file {
    path => "/var/log/scrapy/*.log"
    start_position => "beginning"
    codec => multiline {
      pattern => "^\[.*\]"
      negate => true
      what => "previous"
    }
  }
}

filter {
  # 解析Scrapy日志
  grok {
    match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{WORD:level}:\s*%(?<spider_name>\w+)\s*%(?<message_content>.*)" }
  }
  
  # 解析时间戳
  date {
    match => [ "timestamp", "ISO8601" ]
  }
  
  # 添加爬虫相关字段
  mutate {
    add_field => { "[@metadata][spider]" => "%{spider_name}" }
    add_tag => [ "scrapy_log" ]
  }
  
  # 分类错误日志
  if [message] =~ /ERROR|CRITICAL/ {
    mutate {
      add_tag => [ "error" ]
      add_field => { "severity" => "high" }
    }
  } else if [message] =~ /WARNING/ {
    mutate {
      add_tag => [ "warning" ]
      add_field => { "severity" => "medium" }
    }
  } else {
    mutate {
      add_field => { "severity" => "low" }
    }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "scrapy-logs-%{+YYYY.MM.dd}"
  }
  
  stdout {
    codec => rubydebug
  }
}

日志分析脚本

# log_analyzer.py - 日志分析脚本
import re
import json
import pandas as pd
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path

class ScrapyLogAnalyzer:
    """
    Scrapy日志分析器
    """
    
    def __init__(self, log_directory: str):
        self.log_directory = Path(log_directory)
        self.patterns = {
            'info': r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] (\w+)\.INFO: (.*)',
            'error': r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] (\w+)\.ERROR: (.*)',
            'warning': r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] (\w+)\.WARNING: (.*)',
            'debug': r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] (\w+)\.DEBUG: (.*)',
        }
    
    def parse_log_file(self, log_file: Path) -> pd.DataFrame:
        """
        解析单个日志文件
        """
        entries = []
        
        with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()
                if not line:
                    continue
                
                for level, pattern in self.patterns.items():
                    match = re.match(pattern, line)
                    if match:
                        timestamp_str, spider_name, message = match.groups()
                        try:
                            timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
                        except ValueError:
                            continue
                        
                        entries.append({
                            'timestamp': timestamp,
                            'level': level.upper(),
                            'spider': spider_name,
                            'message': message,
                            'file': str(log_file),
                            'line_number': line_num
                        })
                        break
        
        return pd.DataFrame(entries) if entries else pd.DataFrame()
    
    def analyze_logs(self, days: int = 7) -> Dict:
        """
        分析日志数据
        """
        log_files = list(self.log_directory.glob('*.log'))
        all_entries = []
        
        for log_file in log_files:
            if self._is_recent_file(log_file, days):
                df = self.parse_log_file(log_file)
                if not df.empty:
                    all_entries.append(df)
        
        if not all_entries:
            return {"error": "No log files found"}
        
        # 合并所有日志
        combined_df = pd.concat(all_entries, ignore_index=True)
        combined_df = combined_df.sort_values('timestamp')
        
        # 生成分析结果
        analysis = {
            'total_entries': len(combined_df),
            'time_range': {
                'start': combined_df['timestamp'].min().isoformat() if not combined_df.empty else None,
                'end': combined_df['timestamp'].max().isoformat() if not combined_df.empty else None
            },
            'level_distribution': combined_df['level'].value_counts().to_dict(),
            'spider_distribution': combined_df['spider'].value_counts().to_dict(),
            'hourly_activity': self._get_hourly_activity(combined_df),
            'error_summary': self._get_error_summary(combined_df),
            'performance_metrics': self._get_performance_metrics(combined_df),
            'trends': self._get_trends(combined_df)
        }
        
        return analysis
    
    def _is_recent_file(self, file_path: Path, days: int) -> bool:
        """
        检查文件是否在指定天数内
        """
        import time
        file_time = file_path.stat().st_mtime
        return (time.time() - file_time) / (24 * 3600) <= days
    
    def _get_hourly_activity(self, df: pd.DataFrame) -> Dict:
        """
        获取小时活动统计
        """
        if df.empty:
            return {}
        
        df['hour'] = df['timestamp'].dt.hour
        hourly_counts = df.groupby(['hour', 'level']).size().unstack(fill_value=0)
        return hourly_counts.to_dict()
    
    def _get_error_summary(self, df: pd.DataFrame) -> Dict:
        """
        获取错误摘要
        """
        error_df = df[df['level'].isin(['ERROR', 'CRITICAL'])]
        if error_df.empty:
            return {}
        
        # 最常见的错误
        error_patterns = Counter()
        for msg in error_df['message']:
            # 提取错误类型
            if '404' in msg:
                error_patterns['404_NOT_FOUND'] += 1
            elif 'timeout' in msg.lower():
                error_patterns['TIMEOUT'] += 1
            elif 'connection' in msg.lower():
                error_patterns['CONNECTION_ERROR'] += 1
            elif 'parse' in msg.lower():
                error_patterns['PARSE_ERROR'] += 1
            else:
                error_patterns['OTHER_ERROR'] += 1
        
        return {
            'total_errors': len(error_df),
            'error_types': dict(error_patterns.most_common(10)),
            'spider_errors': error_df['spider'].value_counts().to_dict()
        }
    
    def _get_performance_metrics(self, df: pd.DataFrame) -> Dict:
        """
        获取性能指标
        """
        if df.empty:
            return {}
        
        # 请求频率
        df_sorted = df.sort_values('timestamp')
        df_sorted['time_diff'] = df_sorted.groupby('spider')['timestamp'].diff()
        
        # 计算每秒请求数
        second_counts = df_sorted.set_index('timestamp').groupby(
            [pd.Grouper(freq='S'), 'spider']
        ).size().reset_index(name='requests_per_second')
        
        return {
            'avg_requests_per_second': second_counts['requests_per_second'].mean(),
            'max_requests_per_second': second_counts['requests_per_second'].max(),
            'request_frequency_distribution': second_counts['requests_per_second'].describe().to_dict()
        }
    
    def _get_trends(self, df: pd.DataFrame) -> Dict:
        """
        获取趋势分析
        """
        if df.empty:
            return {}
        
        # 按天统计
        df['date'] = df['timestamp'].dt.date
        daily_stats = df.groupby(['date', 'level']).size().unstack(fill_value=0)
        
        # 计算增长率
        if len(daily_stats) > 1:
            latest_day = daily_stats.iloc[-1]
            previous_day = daily_stats.iloc[-2]
            
            growth_rates = {}
            for level in daily_stats.columns:
                if previous_day[level] > 0:
                    growth_rates[level] = ((latest_day[level] - previous_day[level]) / previous_day[level]) * 100
                else:
                    growth_rates[level] = float('inf') if latest_day[level] > 0 else 0
            
            return {
                'daily_stats': daily_stats.to_dict(),
                'growth_rates': growth_rates
            }
        else:
            return {'daily_stats': daily_stats.to_dict()}
    
    def generate_report(self, analysis_result: Dict, output_file: str = None):
        """
        生成分析报告
        """
        report = f"""
# Scrapy日志分析报告

## 概述
- 分析时间范围: {analysis_result['time_range']['start']}{analysis_result['time_range']['end']}
- 总日志条目: {analysis_result['total_entries']:,}

## 日志级别分布
{json.dumps(analysis_result['level_distribution'], indent=2, ensure_ascii=False)}

## 爬虫分布
{json.dumps(analysis_result['spider_distribution'], indent=2, ensure_ascii=False)}

## 错误摘要
- 总错误数: {analysis_result['error_summary'].get('total_errors', 0)}
- 错误类型: {json.dumps(analysis_result['error_summary'].get('error_types', {}), indent=2, ensure_ascii=False)}

## 性能指标
{json.dumps(analysis_result['performance_metrics'], indent=2, ensure_ascii=False)}

## 趋势分析
{json.dumps(analysis_result['trends'], indent=2, ensure_ascii=False)}
        """
        
        if output_file:
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write(report)
        
        return report

# 使用示例
def analyze_example():
    analyzer = ScrapyLogAnalyzer('./logs')
    result = analyzer.analyze_logs(days=7)
    
    print(f"Total entries: {result['total_entries']}")
    print(f"Level distribution: {result['level_distribution']}")
    print(f"Error summary: {result['error_summary']}")

if __name__ == "__main__":
    analyze_example()

性能分析与优化

性能监控指标

# performance_monitor.py - 性能监控
import time
import psutil
import threading
from collections import deque
import statistics
from datetime import datetime
import json

class PerformanceMonitor:
    """
    性能监控器
    """
    
    def __init__(self):
        self.metrics = {
            'cpu_percent': deque(maxlen=300),  # 5分钟数据
            'memory_percent': deque(maxlen=300),
            'disk_io': deque(maxlen=300),
            'network_io': deque(maxlen=300),
            'process_count': deque(maxlen=300),
            'thread_count': deque(maxlen=300),
            'response_times': deque(maxlen=300),
        }
        self.running = False
        self.monitor_thread = None
        self.process = psutil.Process()
    
    def start_monitoring(self):
        """
        开始监控
        """
        self.running = True
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
    
    def stop_monitoring(self):
        """
        停止监控
        """
        self.running = False
        if self.monitor_thread:
            self.monitor_thread.join()
    
    def _monitor_loop(self):
        """
        监控循环
        """
        while self.running:
            try:
                # CPU使用率
                cpu_percent = self.process.cpu_percent()
                self.metrics['cpu_percent'].append(cpu_percent)
                
                # 内存使用率
                memory_info = self.process.memory_info()
                memory_percent = self.process.memory_percent()
                self.metrics['memory_percent'].append(memory_percent)
                
                # 磁盘IO
                io_counters = self.process.io_counters()
                if io_counters:
                    self.metrics['disk_io'].append({
                        'read_bytes': io_counters.read_bytes,
                        'write_bytes': io_counters.write_bytes,
                        'read_count': io_counters.read_count,
                        'write_count': io_counters.write_count
                    })
                
                # 网络IO
                net_io = psutil.net_io_counters()
                if net_io:
                    self.metrics['network_io'].append({
                        'bytes_sent': net_io.bytes_sent,
                        'bytes_recv': net_io.bytes_recv
                    })
                
                # 进程和线程数
                num_threads = self.process.num_threads()
                self.metrics['thread_count'].append(num_threads)
                
                # 等待下次采样
                time.sleep(1)
                
            except Exception as e:
                print(f"Error in performance monitoring: {e}")
                time.sleep(1)
    
    def get_current_metrics(self) -> Dict:
        """
        获取当前指标
        """
        if not self.metrics['cpu_percent']:
            return {}
        
        return {
            'timestamp': datetime.now().isoformat(),
            'cpu_percent': self.metrics['cpu_percent'][-1] if self.metrics['cpu_percent'] else 0,
            'memory_percent': self.metrics['memory_percent'][-1] if self.metrics['memory_percent'] else 0,
            'avg_cpu_percent': statistics.mean(self.metrics['cpu_percent']) if self.metrics['cpu_percent'] else 0,
            'avg_memory_percent': statistics.mean(self.metrics['memory_percent']) if self.metrics['memory_percent'] else 0,
            'current_threads': self.metrics['thread_count'][-1] if self.metrics['thread_count'] else 0,
            'peak_cpu': max(self.metrics['cpu_percent']) if self.metrics['cpu_percent'] else 0,
            'peak_memory': max(self.metrics['memory_percent']) if self.metrics['memory_percent'] else 0,
        }
    
    def get_historical_metrics(self, minutes: int = 5) -> Dict:
        """
        获取历史指标
        """
        samples = minutes * 60  # 假设每秒采样一次
        
        result = {}
        for key, values in self.metrics.items():
            if key == 'disk_io' or key == 'network_io':
                # 处理复杂数据结构
                recent_values = list(values)[-samples:] if len(values) >= samples else list(values)
                if recent_values:
                    result[key] = {
                        'sample_count': len(recent_values),
                        'trend': self._calculate_complex_trend(recent_values)
                    }
            else:
                recent_values = list(values)[-samples:] if len(values) >= samples else list(values)
                if recent_values:
                    result[key] = {
                        'current': recent_values[-1],
                        'average': statistics.mean(recent_values),
                        'min': min(recent_values),
                        'max': max(recent_values),
                        'std_dev': statistics.stdev(recent_values) if len(recent_values) > 1 else 0,
                        'trend': self._calculate_trend(recent_values)
                    }
        
        return result
    
    def _calculate_trend(self, values: list) -> str:
        """
        计算趋势
        """
        if len(values) < 2:
            return 'stable'
        
        recent_avg = statistics.mean(values[-5:]) if len(values) >= 5 else statistics.mean(values)
        older_avg = statistics.mean(values[:5]) if len(values) >= 5 else statistics.mean(values)
        
        if recent_avg > older_avg * 1.2:
            return 'increasing'
        elif recent_avg < older_avg * 0.8:
            return 'decreasing'
        else:
            return 'stable'
    
    def _calculate_complex_trend(self, values: list) -> str:
        """
        计算复杂数据趋势
        """
        if len(values) < 2:
            return 'stable'
        
        # 简单的趋势判断
        first_val = values[0] if isinstance(values[0], (int, float)) else 0
        last_val = values[-1] if isinstance(values[-1], (int, float)) else 0
        
        if isinstance(first_val, dict) and isinstance(last_val, dict):
            # 比较数值字段
            numeric_fields = [k for k, v in first_val.items() if isinstance(v, (int, float))]
            if numeric_fields:
                first_sum = sum(first_val.get(k, 0) for k in numeric_fields)
                last_sum = sum(last_val.get(k, 0) for k in numeric_fields)
                if last_sum > first_sum * 1.2:
                    return 'increasing'
                elif last_sum < first_sum * 0.8:
                    return 'decreasing'
        
        return 'stable'

# 性能分析报告
def generate_performance_report(monitor: PerformanceMonitor):
    """
    生成性能分析报告
    """
    current_metrics = monitor.get_current_metrics()
    historical_metrics = monitor.get_historical_metrics(minutes=5)
    
    report = {
        'report_time': datetime.now().isoformat(),
        'current_metrics': current_metrics,
        'historical_metrics': historical_metrics,
        'recommendations': generate_recommendations(current_metrics, historical_metrics)
    }
    
    return report

def generate_recommendations(current: Dict, historical: Dict) -> List[str]:
    """
    生成性能优化建议
    """
    recommendations = []
    
    # CPU使用率建议
    if current.get('cpu_percent', 0) > 80:
        recommendations.append("CPU使用率过高 (>80%),考虑优化算法或增加资源")
    
    if current.get('avg_cpu_percent', 0) > 70:
        recommendations.append("平均CPU使用率偏高,建议进行性能优化")
    
    # 内存使用建议
    if current.get('memory_percent', 0) > 85:
        recommendations.append("内存使用率过高 (>85%),可能存在内存泄漏")
    
    # 趋势建议
    if historical.get('cpu_percent', {}).get('trend') == 'increasing':
        recommendations.append("CPU使用率呈上升趋势,需关注性能退化")
    
    if historical.get('memory_percent', {}).get('trend') == 'increasing':
        recommendations.append("内存使用率呈上升趋势,可能存在内存泄漏")
    
    if not recommendations:
        recommendations.append("系统运行正常,性能指标稳定")
    
    return recommendations

性能优化工具

# performance_optimizer.py - 性能优化工具
import cProfile
import pstats
import io
import time
import functools
from typing import Callable, Any

class PerformanceOptimizer:
    """
    性能优化工具类
    """
    
    @staticmethod
    def profile_function(func: Callable) -> Callable:
        """
        函数性能分析装饰器
        """
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            pr = cProfile.Profile()
            pr.enable()
            
            result = func(*args, **kwargs)
            
            pr.disable()
            
            # 输出性能分析结果
            s = io.StringIO()
            ps = pstats.Stats(pr, stream=s)
            ps.sort_stats('cumulative')
            ps.print_stats(10)  # 显示前10个最耗时的函数
            
            print(f"\n=== 性能分析结果 - {func.__name__} ===")
            print(s.getvalue())
            
            return result
        
        return wrapper
    
    @staticmethod
    def time_it(func: Callable) -> Callable:
        """
        函数执行时间测量装饰器
        """
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            result = func(*args, **kwargs)
            end_time = time.time()
            
            execution_time = end_time - start_time
            print(f"{func.__name__} 执行时间: {execution_time:.4f}秒")
            
            return result
        
        return wrapper
    
    @staticmethod
    def memory_profiler(func: Callable) -> Callable:
        """
        内存使用分析装饰器
        """
        try:
            from memory_profiler import profile
            return profile(func)
        except ImportError:
            print("memory_profiler 未安装,跳过内存分析")
            return func

# 使用示例
@PerformanceOptimizer.profile_function
@PerformanceOptimizer.time_it
def example_heavy_function(n: int) -> int:
    """
    示例重计算函数
    """
    result = 0
    for i in range(n):
        result += i ** 2
    return result

# 爬虫性能优化示例
class OptimizedSpiderMixin:
    """
    优化的爬虫混入类
    """
    
    def __init__(self):
        self.request_cache = {}
        self.response_cache = {}
        self.start_time = time.time()
    
    def optimized_parse(self, response):
        """
        优化的解析方法
        """
        # 检查缓存
        cache_key = hash(response.url)
        if cache_key in self.response_cache:
            return self.response_cache[cache_key]
        
        # 执行解析
        parsed_data = self._parse_content(response)
        
        # 缓存结果
        self.response_cache[cache_key] = parsed_data
        
        return parsed_data
    
    def _parse_content(self, response):
        """
        内容解析
        """
        # 实际解析逻辑
        pass
    
    def get_performance_stats(self) -> Dict:
        """
        获取性能统计
        """
        elapsed_time = time.time() - self.start_time
        return {
            'elapsed_time': elapsed_time,
            'cached_responses': len(self.response_cache),
            'cache_hit_rate': self._calculate_cache_hit_rate()
        }
    
    def _calculate_cache_hit_rate(self) -> float:
        """
        计算缓存命中率
        """
        # 实现缓存命中率计算
        pass

# 性能测试工具
def performance_test_suite():
    """
    性能测试套件
    """
    import unittest
    import time
    
    class PerformanceTests(unittest.TestCase):
        def test_response_time(self):
            """
            测试响应时间
            """
            start_time = time.time()
            # 执行被测试的功能
            result = example_heavy_function(10000)
            end_time = time.time()
            
            execution_time = end_time - start_time
            self.assertLess(execution_time, 1.0, "函数执行时间超过1秒")
        
        def test_memory_usage(self):
            """
            测试内存使用
            """
            import tracemalloc
            
            tracemalloc.start()
            result = example_heavy_function(10000)
            current, peak = tracemalloc.get_traced_memory()
            tracemalloc.stop()
            
            self.assertLess(peak / 1024 / 1024, 100, "峰值内存使用超过100MB")  # 100MB限制
    
    # 运行性能测试
    suite = unittest.TestLoader().loadTestsFromTestCase(PerformanceTests)
    runner = unittest.TextTestRunner(verbosity=2)
    runner.run(suite)

监控面板设计

监控面板组件

# dashboard_components.py - 监控面板组件
import dash
from dash import dcc, html, Input, Output, State
import plotly.graph_objs as go
import pandas as pd
from datetime import datetime, timedelta
import redis
import json

def create_dashboard_app():
    """
    创建监控面板应用
    """
    app = dash.Dash(__name__, suppress_callback_exceptions=True)
    
    app.layout = html.Div([
        # 顶部导航
        html.Div([
            html.H1("Scrapy爬虫监控面板", style={'textAlign': 'center'}),
            html.Div(id='live-update-text'),
        ], className='banner'),
        
        # 控制面板
        html.Div([
            html.Div([
                dcc.Dropdown(
                    id='spider-selector',
                    options=[
                        {'label': '所有爬虫', 'value': 'all'},
                        {'label': '新闻爬虫', 'value': 'news_spider'},
                        {'label': '电商爬虫', 'value': 'ecommerce_spider'},
                        {'label': '社交媒体爬虫', 'value': 'social_spider'}
                    ],
                    value='all'
                ),
            ], className='two columns'),
            
            html.Div([
                dcc.DatePickerRange(
                    id='date-picker-range',
                    start_date=datetime.today() - timedelta(days=7),
                    end_date=datetime.today(),
                ),
            ], className='four columns'),
        ], className='row'),
        
        # 实时指标卡片
        html.Div([
            html.Div([
                html.H3("活跃任务数"),
                html.P(id='active-jobs', children="0"),
            ], className='three columns', style={'background-color': '#f9f9f9', 'padding': '20px', 'border-radius': '10px'}),
            
            html.Div([
                html.H3("成功率"),
                html.P(id='success-rate', children="0%"),
            ], className='three columns', style={'background-color': '#f9f9f9', 'padding': '20px', 'border-radius': '10px'}),
            
            html.Div([
                html.H3("错误率"),
                html.P(id='error-rate', children="0%"),
            ], className='three columns', style={'background-color': '#f9f9f9', 'padding': '20px', 'border-radius': '10px'}),
            
            html.Div([
                html.H3("响应时间"),
                html.P(id='response-time', children="0ms"),
            ], className='three columns', style={'background-color': '#f9f9f9', 'padding': '20px', 'border-radius': '10px'}),
        ], className='row'),
        
        # 图表区域
        html.Div([
            html.Div([
                dcc.Graph(id='requests-over-time')
            ], className='six columns'),
            
            html.Div([
                dcc.Graph(id='error-distribution')
            ], className='six columns'),
        ], className='row'),
        
        html.Div([
            html.Div([
                dcc.Graph(id='resource-usage')
            ], className='six columns'),
            
            html.Div([
                dcc.Graph(id='spider-performance')
            ], className='six columns'),
        ], className='row'),
        
        # 刷新间隔
        dcc.Interval(
            id='interval-component',
            interval=10*1000,  # 每10秒更新
            n_intervals=0
        )
    ])
    
    # 回调函数
    @app.callback(
        [Output('active-jobs', 'children'),
         Output('success-rate', 'children'),
         Output('error-rate', 'children'),
         Output('response-time', 'children')],
        [Input('interval-component', 'n_intervals')]
    )
    def update_metrics(n):
        # 从Redis或其他数据源获取实时数据
        metrics = get_real_time_metrics()
        
        return [
            f"{metrics.get('active_jobs', 0)}",
            f"{metrics.get('success_rate', 0):.2f}%",
            f"{metrics.get('error_rate', 0):.2f}%",
            f"{metrics.get('avg_response_time', 0):.2f}s"
        ]
    
    @app.callback(
        Output('requests-over-time', 'figure'),
        [Input('interval-component', 'n_intervals'),
         Input('spider-selector', 'value')]
    )
    def update_requests_chart(n, spider_name):
        # 获取请求数据
        data = get_requests_data(spider_name)
        
        fig = go.Figure(data=[
            go.Scatter(x=data['timestamp'], y=data['requests'], 
                      mode='lines+markers', name='请求数')
        ])
        
        fig.update_layout(
            title='请求数随时间变化',
            xaxis_title='时间',
            yaxis_title='请求数'
        )
        
        return fig
    
    @app.callback(
        Output('error-distribution', 'figure'),
        [Input('interval-component', 'n_intervals')]
    )
    def update_error_chart(n):
        # 获取错误分布数据
        error_data = get_error_distribution()
        
        fig = go.Figure(data=[
            go.Pie(labels=list(error_data.keys()), values=list(error_data.values()))
        ])
        
        fig.update_layout(title='错误类型分布')
        
        return fig
    
    return app

def get_real_time_metrics():
    """
    获取实时指标
    """
    # 从Redis或其他数据源获取数据
    try:
        r = redis.Redis(host='localhost', port=6379, db=0)
        metrics = r.hgetall('spider:metrics')
        
        return {
            'active_jobs': int(metrics.get(b'active_jobs', 0)),
            'success_rate': float(metrics.get(b'success_rate', 0)),
            'error_rate': float(metrics.get(b'error_rate', 0)),
            'avg_response_time': float(metrics.get(b'avg_response_time', 0))
        }
    except:
        # 模拟数据
        return {
            'active_jobs': 15,
            'success_rate': 95.5,
            'error_rate': 2.1,
            'avg_response_time': 0.8
        }

def get_requests_data(spider_name: str = 'all'):
    """
    获取请求数据
    """
    # 模拟数据
    import numpy as np
    dates = pd.date_range(start='2024-01-01', periods=24, freq='H')
    requests = np.random.poisson(50, 24)  # 泊松分布模拟请求
    
    return pd.DataFrame({
        'timestamp': dates,
        'requests': requests
    })

def get_error_distribution():
    """
    获取错误分布
    """
    return {
        '网络错误': 45,
        '解析错误': 30,
        '超时错误': 15,
        '其他错误': 10
    }

# 启动应用
if __name__ == '__main__':
    app = create_dashboard_app()
    app.run_server(debug=True, host='0.0.0.0', port=8050)

移动端友好的监控面板

<!DOCTYPE html>
<html>
<head>
    <title>爬虫监控面板 - 移动端</title>
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <style>
        * {
            margin: 0;
            padding: 0;
            box-sizing: border-box;
        }
        
        body {
            font-family: Arial, sans-serif;
            background-color: #f5f5f5;
            padding: 10px;
        }
        
        .header {
            background-color: #2c3e50;
            color: white;
            padding: 15px;
            text-align: center;
            border-radius: 10px;
            margin-bottom: 15px;
        }
        
        .metrics-grid {
            display: grid;
            grid-template-columns: repeat(auto-fit, minmax(150px, 1fr));
            gap: 10px;
            margin-bottom: 15px;
        }
        
        .metric-card {
            background-color: white;
            padding: 15px;
            border-radius: 8px;
            text-align: center;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        
        .metric-value {
            font-size: 1.5em;
            font-weight: bold;
            margin: 5px 0;
        }
        
        .metric-label {
            font-size: 0.8em;
            color: #666;
        }
        
        .chart-container {
            background-color: white;
            padding: 15px;
            border-radius: 8px;
            margin-bottom: 15px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        
        .status-indicator {
            display: inline-block;
            width: 12px;
            height: 12px;
            border-radius: 50%;
            margin-right: 5px;
        }
        
        .status-good { background-color: #27ae60; }
        .status-warning { background-color: #f39c12; }
        .status-critical { background-color: #e74c3c; }
        
        .refresh-btn {
            background-color: #3498db;
            color: white;
            border: none;
            padding: 10px 20px;
            border-radius: 5px;
            cursor: pointer;
            width: 100%;
            margin-top: 10px;
        }
        
        .refresh-btn:hover {
            background-color: #2980b9;
        }
    </style>
</head>
<body>
    <div class="header">
        <h1>📊 爬虫监控面板</h1>
        <p id="update-time">最后更新: --:--:--</p>
    </div>
    
    <div class="metrics-grid">
        <div class="metric-card">
            <div class="metric-value" id="active-jobs">0</div>
            <div class="metric-label">活跃任务</div>
        </div>
        <div class="metric-card">
            <div class="metric-value" id="success-rate">0%</div>
            <div class="metric-label">成功率</div>
        </div>
        <div class="metric-card">
            <div class="metric-value" id="error-rate">0%</div>
            <div class="metric-label">错误率</div>
        </div>
        <div class="metric-card">
            <div class="metric-value" id="response-time">0ms</div>
            <div class="metric-label">响应时间</div>
        </div>
    </div>
    
    <div class="chart-container">
        <h3>📈 实时监控图表</h3>
        <canvas id="realtime-chart" width="400" height="200"></canvas>
    </div>
    
    <div class="chart-container">
        <h3>⚠️ 错误分布</h3>
        <canvas id="error-chart" width="400" height="200"></canvas>
    </div>
    
    <button class="refresh-btn" onclick="refreshData()">🔄 刷新数据</button>
    
    <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
    <script>
        // 实时数据图表
        const realtimeCtx = document.getElementById('realtime-chart').getContext('2d');
        const realtimeChart = new Chart(realtimeCtx, {
            type: 'line',
            data: {
                labels: Array.from({length: 10}, (_, i) => `${i*10}s`),
                datasets: [{
                    label: '请求数',
                    data: Array(10).fill(0),
                    borderColor: '#3498db',
                    tension: 0.1
                }]
            },
            options: {
                responsive: true,
                scales: {
                    y: {
                        beginAtZero: true
                    }
                }
            }
        });
        
        // 错误分布图表
        const errorCtx = document.getElementById('error-chart').getContext('2d');
        const errorChart = new Chart(errorCtx, {
            type: 'doughnut',
            data: {
                labels: ['网络错误', '解析错误', '超时错误', '其他'],
                datasets: [{
                    data: [45, 30, 15, 10],
                    backgroundColor: ['#e74c3c', '#f39c12', '#3498db', '#95a5a6']
                }]
            },
            options: {
                responsive: true
            }
        });
        
        // 更新数据显示
        function updateDisplay(data) {
            document.getElementById('active-jobs').textContent = data.active_jobs || 0;
            document.getElementById('success-rate').textContent = (data.success_rate || 0).toFixed(1) + '%';
            document.getElementById('error-rate').textContent = (data.error_rate || 0).toFixed(1) + '%';
            document.getElementById('response-time').textContent = (data.avg_response_time || 0).toFixed(2) + 's';
            
            // 更新时间
            document.getElementById('update-time').textContent = '最后更新: ' + new Date().toLocaleTimeString();
        }
        
        // 模拟数据更新
        function refreshData() {
            // 模拟API调用
            const mockData = {
                active_jobs: Math.floor(Math.random() * 50),
                success_rate: 90 + Math.random() * 9,
                error_rate: Math.random() * 5,
                avg_response_time: Math.random() * 2
            };
            
            updateDisplay(mockData);
            
            // 更新图表数据
            const newData = Math.floor(Math.random() * 100);
            realtimeChart.data.datasets[0].data.push(newData);
            realtimeChart.data.datasets[0].data.shift();
            realtimeChart.update();
        }
        
        // 定时刷新
        setInterval(refreshData, 10000);
        refreshData(); // 初始化
    </script>
</body>
</html>

故障排查与诊断

故障诊断工具

# diagnostic_tools.py - 故障诊断工具
import psutil
import requests
import socket
import time
from datetime import datetime
import logging
from typing import Dict, List, Optional

class DiagnosticTools:
    """
    故障诊断工具集合
    """
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    def check_system_resources(self) -> Dict:
        """
        检查系统资源
        """
        return {
            'cpu_percent': psutil.cpu_percent(interval=1),
            'memory_percent': psutil.virtual_memory().percent,
            'disk_percent': psutil.disk_usage('/').percent,
            'process_count': len(psutil.pids()),
            'load_average': getattr(os, 'getloadavg', lambda: (0, 0, 0))(),
            'network_connections': len(psutil.net_connections())
        }
    
    def check_network_connectivity(self, urls: List[str]) -> Dict:
        """
        检查网络连通性
        """
        results = {}
        
        for url in urls:
            try:
                start_time = time.time()
                response = requests.get(url, timeout=10)
                end_time = time.time()
                
                results[url] = {
                    'status': 'success',
                    'status_code': response.status_code,
                    'response_time': round(end_time - start_time, 3),
                    'content_length': len(response.content)
                }
            except Exception as e:
                results[url] = {
                    'status': 'failed',
                    'error': str(e)
                }
        
        return results
    
    def check_port_availability(self, host: str, ports: List[int]) -> Dict:
        """
        检查端口可用性
        """
        results = {}
        
        for port in ports:
            try:
                sock = socket.socket(socket.AF