#抓取监控看板 - 爬虫系统实时监控与告警详解
📂 所属阶段:第六阶段 — 运维与监控(工程化篇)
🔗 相关章节:Scrapyd与ScrapydWeb · Docker容器化爬虫 · Scrapy-Redis分布式架构
#目录
#监控系统概述
爬虫监控系统是保障爬虫稳定运行的重要组成部分,通过实时监控、数据分析和告警机制,能够及时发现并解决爬虫运行中的问题。
#监控的重要性
1. 可靠性保障
- 实时监控爬虫状态
- 及时发现异常情况
- 确保数据抓取质量
2. 性能优化
- 识别性能瓶颈
- 优化资源使用
- 提升抓取效率
3. 成本控制
- 监控资源消耗
- 优化资源配置
- 降低运营成本
4. 业务连续性
- 保障数据供应
- 快速响应故障
- 确保业务正常
#监控架构设计
"""
监控系统架构:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 数据源 │────│ 指标收集 │────│ 数据存储 │
│ (爬虫应用) │ │ (Prometheus) │ │ (时序数据库) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 日志收集 │────│ 数据处理 │────│ 告警系统 │
│ (ELK Stack) │ │ (Grafana) │ │ (AlertManager) │
└─────────────────┘ └─────────────────┘ └─────────────────┘#关键监控指标
爬虫运行指标
- 抓取成功率
- 请求失败率
- 并发请求数
- 响应时间分布
系统资源指标
- CPU使用率
- 内存使用率
- 网络I/O
- 磁盘I/O
业务指标
- 抓取数据量
- 数据质量评分
- 任务完成率
- 错误恢复率
#指标收集与存储
#Prometheus指标收集
# metrics_collector.py - Prometheus指标收集器
from prometheus_client import start_http_server, Counter, Gauge, Histogram, Summary
import time
import requests
from typing import Dict, Any
import logging
# 定义指标
# 计数器 - 累计指标
SPIDER_REQUEST_COUNT = Counter(
'spider_requests_total',
'Total number of spider requests',
['spider_name', 'status_code']
)
SPIDER_ERROR_COUNT = Counter(
'spider_errors_total',
'Total number of spider errors',
['spider_name', 'error_type']
)
# 指标 - 当前状态
SPIDER_ACTIVE_JOBS = Gauge(
'spider_active_jobs',
'Number of active spider jobs',
['spider_name']
)
SPIDER_CURRENT_SPEED = Gauge(
'spider_current_speed',
'Current crawling speed (requests/second)',
['spider_name']
)
# 直方图 - 请求延迟分布
SPIDER_RESPONSE_TIME = Histogram(
'spider_response_time_seconds',
'Spider response time distribution',
['spider_name'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, float('inf')]
)
# 摘要 - 请求延迟统计
SPIDER_REQUEST_LATENCY = Summary(
'spider_request_latency_seconds',
'Spider request latency summary',
['spider_name']
)
class SpiderMetricsCollector:
"""
爬虫指标收集器
"""
def __init__(self, port: int = 8000):
self.port = port
self.logger = logging.getLogger(__name__)
def start_server(self):
"""
启动指标服务器
"""
start_http_server(self.port)
self.logger.info(f"Metrics server started on port {self.port}")
def record_request(self, spider_name: str, status_code: int, response_time: float):
"""
记录请求指标
"""
SPIDER_REQUEST_COUNT.labels(spider_name=spider_name, status_code=status_code).inc()
SPIDER_RESPONSE_TIME.labels(spider_name=spider_name).observe(response_time)
SPIDER_REQUEST_LATENCY.labels(spider_name=spider_name).observe(response_time)
def record_error(self, spider_name: str, error_type: str):
"""
记录错误指标
"""
SPIDER_ERROR_COUNT.labels(spider_name=spider_name, error_type=error_type).inc()
def update_active_jobs(self, spider_name: str, count: int):
"""
更新活跃任务数
"""
SPIDER_ACTIVE_JOBS.labels(spider_name=spider_name).set(count)
def update_speed(self, spider_name: str, speed: float):
"""
更新抓取速度
"""
SPIDER_CURRENT_SPEED.labels(spider_name=spider_name).set(speed)
# 使用示例
if __name__ == "__main__":
collector = SpiderMetricsCollector(port=8000)
collector.start_server()
# 模拟指标收集
while True:
collector.record_request('example_spider', 200, 0.5)
collector.update_speed('example_spider', 10.5)
time.sleep(1)#自定义指标收集器
# custom_collector.py - 自定义指标收集器
from prometheus_client.core import CollectorRegistry, GaugeMetricFamily
from prometheus_client import CollectorRegistry, Gauge
import requests
import json
from datetime import datetime
class ScrapydMetricsCollector:
"""
Scrapyd指标收集器
"""
def __init__(self, scrapyd_url: str = "http://localhost:6800"):
self.scrapyd_url = scrapyd_url.rstrip('/')
def collect(self):
"""
收集Scrapyd指标
"""
try:
# 获取守护进程状态
status_response = requests.get(f"{self.scrapyd_url}/daemonstatus.json")
if status_response.status_code == 200:
status_data = status_response.json()
# 守护进程运行中的任务数
running_jobs = GaugeMetricFamily(
'scrapyd_running_jobs',
'Number of running jobs in Scrapyd',
value=status_data.get('running', 0)
)
yield running_jobs
except Exception as e:
print(f"Error collecting Scrapyd metrics: {e}")
class RedisMetricsCollector:
"""
Redis指标收集器
"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_url = redis_url
def collect(self):
"""
收集Redis指标
"""
try:
import redis
r = redis.from_url(self.redis_url)
# Redis信息
info = r.info()
# 内存使用
memory_used = GaugeMetricFamily(
'redis_memory_used_bytes',
'Redis memory used in bytes',
value=info.get('used_memory', 0)
)
yield memory_used
# 连接数
connections = GaugeMetricFamily(
'redis_connected_clients',
'Number of connected clients',
value=info.get('connected_clients', 0)
)
yield connections
# 命令处理
commands_processed = GaugeMetricFamily(
'redis_total_commands_processed',
'Total number of commands processed',
value=info.get('total_commands_processed', 0)
)
yield commands_processed
except Exception as e:
print(f"Error collecting Redis metrics: {e}")
# 注册自定义收集器
from prometheus_client import REGISTRY
def register_custom_collectors():
"""
注册自定义收集器
"""
REGISTRY.register(ScrapydMetricsCollector())
REGISTRY.register(RedisMetricsCollector())#指标存储配置
# prometheus.yml - Prometheus配置文件
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "alert_rules.yml"
scrape_configs:
# Scrapy爬虫指标
- job_name: 'scrapy-spiders'
static_configs:
- targets: ['localhost:8000']
scrape_interval: 5s
scrape_timeout: 4s
# Scrapyd服务
- job_name: 'scrapyd'
static_configs:
- targets: ['localhost:6800']
metrics_path: /metrics
scrape_interval: 10s
# Redis监控
- job_name: 'redis'
static_configs:
- targets: ['localhost:9121'] # Redis Exporter
scrape_interval: 10s
# 系统监控
- job_name: 'node-exporter'
static_configs:
- targets: ['localhost:9100']
scrape_interval: 15s
# Docker监控
- job_name: 'docker'
static_configs:
- targets: ['localhost:8080']
scrape_interval: 10s
alerting:
alertmanagers:
- static_configs:
- targets:
- localhost:9093#数据可视化
#Grafana仪表板配置
{
"dashboard": {
"id": null,
"title": "Scrapy爬虫监控仪表板",
"tags": ["scrapy", "spider", "monitoring"],
"style": "dark",
"timezone": "browser",
"panels": [
{
"id": 1,
"title": "爬虫状态概览",
"type": "stat",
"gridPos": {"h": 8, "w": 6, "x": 0, "y": 0},
"targets": [
{
"expr": "sum(spiders_requests_total)",
"legendFormat": "总请求数"
}
],
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"thresholds": {
"steps": [
{"color": "green", "value": null},
{"color": "red", "value": 80}
]
}
}
}
},
{
"id": 2,
"title": "爬虫活跃任务",
"type": "graph",
"gridPos": {"h": 8, "w": 12, "x": 6, "y": 0},
"targets": [
{
"expr": "spider_active_jobs",
"legendFormat": "{{spider_name}}"
}
]
},
{
"id": 3,
"title": "响应时间分布",
"type": "histogram",
"gridPos": {"h": 8, "w": 6, "x": 18, "y": 0},
"targets": [
{
"expr": "rate(spiders_response_time_seconds_bucket[5m])",
"legendFormat": "{{le}}"
}
]
},
{
"id": 4,
"title": "错误率监控",
"type": "graph",
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 8},
"targets": [
{
"expr": "rate(spiders_errors_total[5m])",
"legendFormat": "{{spider_name}} - {{error_type}}"
}
]
},
{
"id": 5,
"title": "抓取速度",
"type": "singlestat",
"gridPos": {"h": 8, "w": 6, "x": 12, "y": 8},
"targets": [
{
"expr": "avg_over_time(spiders_current_speed[1m])",
"legendFormat": "平均速度"
}
]
},
{
"id": 6,
"title": "资源使用情况",
"type": "graph",
"gridPos": {"h": 8, "w": 6, "x": 18, "y": 8},
"targets": [
{
"expr": "rate(process_cpu_seconds_total[5m])",
"legendFormat": "CPU使用率"
},
{
"expr": "process_resident_memory_bytes",
"legendFormat": "内存使用"
}
]
}
],
"time": {
"from": "now-1h",
"to": "now"
},
"refresh": "5s"
}
}#自定义可视化组件
# visualization.py - 自定义可视化组件
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
class SpiderVisualization:
"""
爬虫数据可视化类
"""
def __init__(self):
self.figures = {}
def create_performance_dashboard(self, data: pd.DataFrame):
"""
创建性能仪表板
"""
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('请求成功率', '响应时间', '错误率', '并发数'),
specs=[[{"secondary_y": False}, {"secondary_y": False}],
[{"secondary_y": False}, {"secondary_y": False}]]
)
# 请求成功率
fig.add_trace(
go.Scatter(x=data['timestamp'], y=data['success_rate'],
name='成功率', line=dict(color='green')),
row=1, col=1
)
# 响应时间
fig.add_trace(
go.Scatter(x=data['timestamp'], y=data['response_time'],
name='响应时间', line=dict(color='blue')),
row=1, col=2
)
# 错误率
fig.add_trace(
go.Scatter(x=data['timestamp'], y=data['error_rate'],
name='错误率', line=dict(color='red')),
row=2, col=1
)
# 并发数
fig.add_trace(
go.Scatter(x=data['timestamp'], y=data['concurrency'],
name='并发数', line=dict(color='orange')),
row=2, col=2
)
fig.update_layout(height=600, showlegend=True,
title_text="爬虫性能监控仪表板")
return fig
def create_error_analysis(self, error_data: pd.DataFrame):
"""
创建错误分析图表
"""
fig = go.Figure(data=[
go.Bar(name='解析错误', x=error_data['timestamp'], y=error_data['parse_errors']),
go.Bar(name='网络错误', x=error_data['timestamp'], y=error_data['network_errors']),
go.Bar(name='超时错误', x=error_data['timestamp'], y=error_data['timeout_errors'])
])
fig.update_layout(
barmode='stack',
title='错误类型分析',
xaxis_title='时间',
yaxis_title='错误数量'
)
return fig
def create_resource_utilization(self, resource_data: pd.DataFrame):
"""
创建资源利用率图表
"""
fig = make_subplots(specs=[[{"secondary_y": True}]])
fig.add_trace(
go.Scatter(x=resource_data['timestamp'], y=resource_data['cpu_percent'],
name="CPU使用率", line=dict(color='red')),
secondary_y=False,
)
fig.add_trace(
go.Scatter(x=resource_data['timestamp'], y=resource_data['memory_percent'],
name="内存使用率", line=dict(color='blue')),
secondary_y=True,
)
fig.update_xaxes(title_text="时间")
fig.update_yaxes(title_text="CPU使用率 (%)", secondary_y=False)
fig.update_yaxes(title_text="内存使用率 (%)", secondary_y=True)
fig.update_layout(title_text="资源利用率监控")
return fig
def export_dashboard(self, dashboard_name: str = "spider_dashboard.html"):
"""
导出仪表板
"""
for name, fig in self.figures.items():
fig.write_html(f"{dashboard_name}_{name}.html")
# 使用示例
def create_sample_visualization():
# 创建示例数据
dates = pd.date_range(start='2024-01-01', periods=100, freq='5min')
sample_data = pd.DataFrame({
'timestamp': dates,
'success_rate': np.random.normal(0.95, 0.05, 100),
'response_time': np.random.exponential(0.5, 100),
'error_rate': np.random.exponential(0.02, 100),
'concurrency': np.random.randint(1, 20, 100)
})
viz = SpiderVisualization()
perf_fig = viz.create_performance_dashboard(sample_data)
perf_fig.show()#实时数据推送
# real_time_push.py - 实时数据推送
import asyncio
import websockets
import json
import time
from datetime import datetime
import random
class RealTimeMetricsPusher:
"""
实时指标推送器
"""
def __init__(self, host: str = "localhost", port: int = 8765):
self.host = host
self.port = port
self.clients = set()
async def register_client(self, websocket):
"""
注册客户端
"""
self.clients.add(websocket)
try:
await websocket.wait_closed()
finally:
self.clients.remove(websocket)
async def broadcast_metrics(self):
"""
广播指标数据
"""
while True:
if self.clients:
metrics = {
'timestamp': datetime.now().isoformat(),
'spiders': {
'example_spider': {
'active_requests': random.randint(0, 50),
'responses_per_second': random.uniform(0, 100),
'errors_per_minute': random.randint(0, 10),
'response_time_avg': random.uniform(0.1, 2.0)
}
},
'system': {
'cpu_percent': random.uniform(0, 100),
'memory_percent': random.uniform(0, 100),
'disk_usage': random.uniform(0, 100)
}
}
# 推送给所有客户端
if self.clients:
await asyncio.wait([client.send(json.dumps(metrics)) for client in self.clients])
await asyncio.sleep(1) # 每秒推送一次
async def start_server(self):
"""
启动WebSocket服务器
"""
print(f"Starting WebSocket server on {self.host}:{self.port}")
# 启动指标广播任务
asyncio.create_task(self.broadcast_metrics())
# 启动WebSocket服务器
async with websockets.serve(self.register_client, self.host, self.port):
await asyncio.Future() # 永远运行
# 启动实时推送服务器
if __name__ == "__main__":
pusher = RealTimeMetricsPusher()
asyncio.run(pusher.start_server())#告警系统
#AlertManager配置
# alertmanager.yml - AlertManager配置
global:
smtp_smarthost: 'smtp.gmail.com:587'
smtp_from: 'alert@example.com'
smtp_auth_username: 'alert@example.com'
smtp_auth_password: 'password'
route:
group_by: ['alertname', 'spider_name']
group_wait: 30s
group_interval: 5m
repeat_interval: 1h
receiver: 'default-receiver'
routes:
- match:
severity: critical
receiver: 'critical-receiver'
group_wait: 10s
group_interval: 1m
repeat_interval: 15m
- match:
alertname: HighErrorRate
receiver: 'error-team'
group_interval: 30s
receivers:
- name: 'default-receiver'
email_configs:
- to: 'ops@example.com'
send_resolved: true
- name: 'critical-receiver'
email_configs:
- to: 'oncall@example.com'
send_resolved: true
webhook_configs:
- url: 'http://slack-webhook:3000/alert'
- name: 'error-team'
email_configs:
- to: 'errors@example.com'
send_resolved: true
inhibit_rules:
- source_match:
severity: 'critical'
target_match:
severity: 'warning'
equal: ['alertname', 'spider_name']#告警规则配置
# alert_rules.yml - 告警规则
groups:
- name: spider_alerts
rules:
# 高错误率告警
- alert: HighErrorRate
expr: rate(spiders_errors_total[5m]) > 10
for: 2m
labels:
severity: warning
annotations:
summary: "爬虫错误率过高"
description: "爬虫 {{ $labels.spider_name }} 的错误率在过去5分钟内超过10次/分钟"
# 响应时间过长
- alert: HighResponseTime
expr: histogram_quantile(0.95, rate(spiders_response_time_seconds_bucket[5m])) > 5
for: 3m
labels:
severity: warning
annotations:
summary: "爬虫响应时间过长"
description: "爬虫 {{ $labels.spider_name }} 的95%响应时间超过5秒"
# 无活动告警
- alert: NoActivity
expr: increase(spiders_requests_total[10m]) == 0
for: 5m
labels:
severity: critical
annotations:
summary: "爬虫无活动"
description: "爬虫 {{ $labels.spider_name }} 在过去10分钟内没有发送任何请求"
# 资源使用过高
- alert: HighCpuUsage
expr: rate(process_cpu_seconds_total[5m]) > 0.8
for: 2m
labels:
severity: warning
annotations:
summary: "CPU使用率过高"
description: "爬虫进程CPU使用率超过80%"
- alert: HighMemoryUsage
expr: process_resident_memory_bytes / (1024*1024) > 1024
for: 2m
labels:
severity: warning
annotations:
summary: "内存使用过高"
description: "爬虫进程内存使用超过1GB"
# 爬虫崩溃告警
- alert: SpiderCrashed
expr: up{job="scrapy-spiders"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "爬虫进程崩溃"
description: "爬虫进程 {{ $labels.instance }} 已停止响应"
# 数据质量下降
- alert: DataQualityLow
expr: avg_over_time(spiders_data_quality_score[10m]) < 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "数据质量下降"
description: "爬虫 {{ $labels.spider_name }} 的数据质量分数低于0.8"
# 并发请求数异常
- alert: ConcurrentRequestsAnomaly
expr: spiders_concurrent_requests > 50
for: 1m
labels:
severity: warning
annotations:
summary: "并发请求数异常"
description: "爬虫 {{ $labels.spider_name }} 的并发请求数超过50"#自定义告警处理器
# alert_handler.py - 自定义告警处理器
import smtplib
import json
import requests
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import logging
from typing import Dict, List
class AlertHandler:
"""
告警处理器
"""
def __init__(self, config: Dict):
self.config = config
self.logger = logging.getLogger(__name__)
def handle_prometheus_alert(self, alert_data: Dict):
"""
处理Prometheus告警
"""
alerts = alert_data.get('alerts', [])
for alert in alerts:
status = alert.get('status', 'firing')
labels = alert.get('labels', {})
annotations = alert.get('annotations', {})
if status == 'firing':
self.logger.warning(f"Alert fired: {annotations.get('summary', 'Unknown alert')}")
self.send_notification(alert)
elif status == 'resolved':
self.logger.info(f"Alert resolved: {annotations.get('summary', 'Unknown alert')}")
self.send_resolution_notification(alert)
def send_notification(self, alert: Dict):
"""
发送告警通知
"""
labels = alert.get('labels', {})
annotations = alert.get('annotations', {})
# 根据告警级别选择通知方式
severity = labels.get('severity', 'warning')
if severity == 'critical':
self.send_critical_notification(alert)
else:
self.send_warning_notification(alert)
def send_critical_notification(self, alert: Dict):
"""
发送严重告警通知
"""
# 发送邮件
self.send_email_notification(alert, critical=True)
# 发送Slack通知
self.send_slack_notification(alert)
# 发送短信(如果配置)
if 'sms_config' in self.config:
self.send_sms_notification(alert)
def send_warning_notification(self, alert: Dict):
"""
发送警告通知
"""
self.send_email_notification(alert, critical=False)
def send_email_notification(self, alert: Dict, critical: bool = False):
"""
发送邮件通知
"""
try:
config = self.config.get('email', {})
msg = MIMEMultipart()
msg['From'] = config.get('from')
msg['To'] = ', '.join(config.get('recipients', []))
msg['Subject'] = f"{'[CRITICAL]' if critical else '[WARNING]'} {alert['annotations']['summary']}"
body = self.format_alert_message(alert)
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP(config.get('smtp_server'), config.get('smtp_port', 587))
server.starttls()
server.login(config.get('username'), config.get('password'))
server.send_message(msg)
server.quit()
self.logger.info("Email notification sent successfully")
except Exception as e:
self.logger.error(f"Failed to send email notification: {e}")
def send_slack_notification(self, alert: Dict):
"""
发送Slack通知
"""
try:
webhook_url = self.config.get('slack', {}).get('webhook_url')
if not webhook_url:
return
payload = {
"text": f"🚨 {alert['annotations']['summary']}",
"attachments": [
{
"color": "danger" if alert['labels'].get('severity') == 'critical' else "warning",
"fields": [
{
"title": "Description",
"value": alert['annotations'].get('description', 'No description'),
"short": False
},
{
"title": "Severity",
"value": alert['labels'].get('severity', 'unknown'),
"short": True
},
{
"title": "Time",
"value": datetime.now().isoformat(),
"short": True
}
]
}
]
}
response = requests.post(webhook_url, json=payload)
if response.status_code == 200:
self.logger.info("Slack notification sent successfully")
else:
self.logger.error(f"Failed to send Slack notification: {response.status_code}")
except Exception as e:
self.logger.error(f"Failed to send Slack notification: {e}")
def format_alert_message(self, alert: Dict) -> str:
"""
格式化告警消息
"""
labels = alert.get('labels', {})
annotations = alert.get('annotations', {})
message = f"""
告警信息:
========
时间:{datetime.now().isoformat()}
摘要:{annotations.get('summary', 'No summary')}
描述:{annotations.get('description', 'No description')}
级别:{labels.get('severity', 'unknown')}
爬虫:{labels.get('spider_name', 'unknown')}
标签:{', '.join([f'{k}={v}' for k, v in labels.items()])}
请尽快检查并处理!
"""
return message.strip()
# Webhook处理器
from flask import Flask, request, jsonify
app = Flask(__name__)
alert_handler = AlertHandler({
'email': {
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'username': 'your-email@gmail.com',
'password': 'your-app-password',
'from': 'your-email@gmail.com',
'recipients': ['ops@example.com']
},
'slack': {
'webhook_url': 'https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK'
}
})
@app.route('/webhook/alert', methods=['POST'])
def handle_alert_webhook():
"""
处理告警Webhook
"""
try:
alert_data = request.json
alert_handler.handle_prometheus_alert(alert_data)
return jsonify({'status': 'ok'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000)#日志监控与分析
#ELK Stack配置
# docker-compose.elk.yml - ELK Stack配置
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.security.enabled=false
ports:
- "9200:9200"
volumes:
- es_data:/usr/share/elasticsearch/data
networks:
- elk-network
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
container_name: logstash
environment:
- "LS_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "5044:5044"
- "5000:5000/tcp"
- "5000:5000/udp"
- "9600:9600"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
depends_on:
- elasticsearch
networks:
- elk-network
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
container_name: kibana
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
networks:
- elk-network
volumes:
es_data:
networks:
elk-network:
driver: bridge#Logstash配置文件
# logstash.conf - Logstash配置
input {
beats {
port => 5044
}
file {
path => "/var/log/scrapy/*.log"
start_position => "beginning"
codec => multiline {
pattern => "^\[.*\]"
negate => true
what => "previous"
}
}
}
filter {
# 解析Scrapy日志
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{WORD:level}:\s*%(?<spider_name>\w+)\s*%(?<message_content>.*)" }
}
# 解析时间戳
date {
match => [ "timestamp", "ISO8601" ]
}
# 添加爬虫相关字段
mutate {
add_field => { "[@metadata][spider]" => "%{spider_name}" }
add_tag => [ "scrapy_log" ]
}
# 分类错误日志
if [message] =~ /ERROR|CRITICAL/ {
mutate {
add_tag => [ "error" ]
add_field => { "severity" => "high" }
}
} else if [message] =~ /WARNING/ {
mutate {
add_tag => [ "warning" ]
add_field => { "severity" => "medium" }
}
} else {
mutate {
add_field => { "severity" => "low" }
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "scrapy-logs-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}#日志分析脚本
# log_analyzer.py - 日志分析脚本
import re
import json
import pandas as pd
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
class ScrapyLogAnalyzer:
"""
Scrapy日志分析器
"""
def __init__(self, log_directory: str):
self.log_directory = Path(log_directory)
self.patterns = {
'info': r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] (\w+)\.INFO: (.*)',
'error': r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] (\w+)\.ERROR: (.*)',
'warning': r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] (\w+)\.WARNING: (.*)',
'debug': r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] (\w+)\.DEBUG: (.*)',
}
def parse_log_file(self, log_file: Path) -> pd.DataFrame:
"""
解析单个日志文件
"""
entries = []
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
for level, pattern in self.patterns.items():
match = re.match(pattern, line)
if match:
timestamp_str, spider_name, message = match.groups()
try:
timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
except ValueError:
continue
entries.append({
'timestamp': timestamp,
'level': level.upper(),
'spider': spider_name,
'message': message,
'file': str(log_file),
'line_number': line_num
})
break
return pd.DataFrame(entries) if entries else pd.DataFrame()
def analyze_logs(self, days: int = 7) -> Dict:
"""
分析日志数据
"""
log_files = list(self.log_directory.glob('*.log'))
all_entries = []
for log_file in log_files:
if self._is_recent_file(log_file, days):
df = self.parse_log_file(log_file)
if not df.empty:
all_entries.append(df)
if not all_entries:
return {"error": "No log files found"}
# 合并所有日志
combined_df = pd.concat(all_entries, ignore_index=True)
combined_df = combined_df.sort_values('timestamp')
# 生成分析结果
analysis = {
'total_entries': len(combined_df),
'time_range': {
'start': combined_df['timestamp'].min().isoformat() if not combined_df.empty else None,
'end': combined_df['timestamp'].max().isoformat() if not combined_df.empty else None
},
'level_distribution': combined_df['level'].value_counts().to_dict(),
'spider_distribution': combined_df['spider'].value_counts().to_dict(),
'hourly_activity': self._get_hourly_activity(combined_df),
'error_summary': self._get_error_summary(combined_df),
'performance_metrics': self._get_performance_metrics(combined_df),
'trends': self._get_trends(combined_df)
}
return analysis
def _is_recent_file(self, file_path: Path, days: int) -> bool:
"""
检查文件是否在指定天数内
"""
import time
file_time = file_path.stat().st_mtime
return (time.time() - file_time) / (24 * 3600) <= days
def _get_hourly_activity(self, df: pd.DataFrame) -> Dict:
"""
获取小时活动统计
"""
if df.empty:
return {}
df['hour'] = df['timestamp'].dt.hour
hourly_counts = df.groupby(['hour', 'level']).size().unstack(fill_value=0)
return hourly_counts.to_dict()
def _get_error_summary(self, df: pd.DataFrame) -> Dict:
"""
获取错误摘要
"""
error_df = df[df['level'].isin(['ERROR', 'CRITICAL'])]
if error_df.empty:
return {}
# 最常见的错误
error_patterns = Counter()
for msg in error_df['message']:
# 提取错误类型
if '404' in msg:
error_patterns['404_NOT_FOUND'] += 1
elif 'timeout' in msg.lower():
error_patterns['TIMEOUT'] += 1
elif 'connection' in msg.lower():
error_patterns['CONNECTION_ERROR'] += 1
elif 'parse' in msg.lower():
error_patterns['PARSE_ERROR'] += 1
else:
error_patterns['OTHER_ERROR'] += 1
return {
'total_errors': len(error_df),
'error_types': dict(error_patterns.most_common(10)),
'spider_errors': error_df['spider'].value_counts().to_dict()
}
def _get_performance_metrics(self, df: pd.DataFrame) -> Dict:
"""
获取性能指标
"""
if df.empty:
return {}
# 请求频率
df_sorted = df.sort_values('timestamp')
df_sorted['time_diff'] = df_sorted.groupby('spider')['timestamp'].diff()
# 计算每秒请求数
second_counts = df_sorted.set_index('timestamp').groupby(
[pd.Grouper(freq='S'), 'spider']
).size().reset_index(name='requests_per_second')
return {
'avg_requests_per_second': second_counts['requests_per_second'].mean(),
'max_requests_per_second': second_counts['requests_per_second'].max(),
'request_frequency_distribution': second_counts['requests_per_second'].describe().to_dict()
}
def _get_trends(self, df: pd.DataFrame) -> Dict:
"""
获取趋势分析
"""
if df.empty:
return {}
# 按天统计
df['date'] = df['timestamp'].dt.date
daily_stats = df.groupby(['date', 'level']).size().unstack(fill_value=0)
# 计算增长率
if len(daily_stats) > 1:
latest_day = daily_stats.iloc[-1]
previous_day = daily_stats.iloc[-2]
growth_rates = {}
for level in daily_stats.columns:
if previous_day[level] > 0:
growth_rates[level] = ((latest_day[level] - previous_day[level]) / previous_day[level]) * 100
else:
growth_rates[level] = float('inf') if latest_day[level] > 0 else 0
return {
'daily_stats': daily_stats.to_dict(),
'growth_rates': growth_rates
}
else:
return {'daily_stats': daily_stats.to_dict()}
def generate_report(self, analysis_result: Dict, output_file: str = None):
"""
生成分析报告
"""
report = f"""
# Scrapy日志分析报告
## 概述
- 分析时间范围: {analysis_result['time_range']['start']} 至 {analysis_result['time_range']['end']}
- 总日志条目: {analysis_result['total_entries']:,}
## 日志级别分布
{json.dumps(analysis_result['level_distribution'], indent=2, ensure_ascii=False)}
## 爬虫分布
{json.dumps(analysis_result['spider_distribution'], indent=2, ensure_ascii=False)}
## 错误摘要
- 总错误数: {analysis_result['error_summary'].get('total_errors', 0)}
- 错误类型: {json.dumps(analysis_result['error_summary'].get('error_types', {}), indent=2, ensure_ascii=False)}
## 性能指标
{json.dumps(analysis_result['performance_metrics'], indent=2, ensure_ascii=False)}
## 趋势分析
{json.dumps(analysis_result['trends'], indent=2, ensure_ascii=False)}
"""
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report)
return report
# 使用示例
def analyze_example():
analyzer = ScrapyLogAnalyzer('./logs')
result = analyzer.analyze_logs(days=7)
print(f"Total entries: {result['total_entries']}")
print(f"Level distribution: {result['level_distribution']}")
print(f"Error summary: {result['error_summary']}")
if __name__ == "__main__":
analyze_example()#性能分析与优化
#性能监控指标
# performance_monitor.py - 性能监控
import time
import psutil
import threading
from collections import deque
import statistics
from datetime import datetime
import json
class PerformanceMonitor:
"""
性能监控器
"""
def __init__(self):
self.metrics = {
'cpu_percent': deque(maxlen=300), # 5分钟数据
'memory_percent': deque(maxlen=300),
'disk_io': deque(maxlen=300),
'network_io': deque(maxlen=300),
'process_count': deque(maxlen=300),
'thread_count': deque(maxlen=300),
'response_times': deque(maxlen=300),
}
self.running = False
self.monitor_thread = None
self.process = psutil.Process()
def start_monitoring(self):
"""
开始监控
"""
self.running = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def stop_monitoring(self):
"""
停止监控
"""
self.running = False
if self.monitor_thread:
self.monitor_thread.join()
def _monitor_loop(self):
"""
监控循环
"""
while self.running:
try:
# CPU使用率
cpu_percent = self.process.cpu_percent()
self.metrics['cpu_percent'].append(cpu_percent)
# 内存使用率
memory_info = self.process.memory_info()
memory_percent = self.process.memory_percent()
self.metrics['memory_percent'].append(memory_percent)
# 磁盘IO
io_counters = self.process.io_counters()
if io_counters:
self.metrics['disk_io'].append({
'read_bytes': io_counters.read_bytes,
'write_bytes': io_counters.write_bytes,
'read_count': io_counters.read_count,
'write_count': io_counters.write_count
})
# 网络IO
net_io = psutil.net_io_counters()
if net_io:
self.metrics['network_io'].append({
'bytes_sent': net_io.bytes_sent,
'bytes_recv': net_io.bytes_recv
})
# 进程和线程数
num_threads = self.process.num_threads()
self.metrics['thread_count'].append(num_threads)
# 等待下次采样
time.sleep(1)
except Exception as e:
print(f"Error in performance monitoring: {e}")
time.sleep(1)
def get_current_metrics(self) -> Dict:
"""
获取当前指标
"""
if not self.metrics['cpu_percent']:
return {}
return {
'timestamp': datetime.now().isoformat(),
'cpu_percent': self.metrics['cpu_percent'][-1] if self.metrics['cpu_percent'] else 0,
'memory_percent': self.metrics['memory_percent'][-1] if self.metrics['memory_percent'] else 0,
'avg_cpu_percent': statistics.mean(self.metrics['cpu_percent']) if self.metrics['cpu_percent'] else 0,
'avg_memory_percent': statistics.mean(self.metrics['memory_percent']) if self.metrics['memory_percent'] else 0,
'current_threads': self.metrics['thread_count'][-1] if self.metrics['thread_count'] else 0,
'peak_cpu': max(self.metrics['cpu_percent']) if self.metrics['cpu_percent'] else 0,
'peak_memory': max(self.metrics['memory_percent']) if self.metrics['memory_percent'] else 0,
}
def get_historical_metrics(self, minutes: int = 5) -> Dict:
"""
获取历史指标
"""
samples = minutes * 60 # 假设每秒采样一次
result = {}
for key, values in self.metrics.items():
if key == 'disk_io' or key == 'network_io':
# 处理复杂数据结构
recent_values = list(values)[-samples:] if len(values) >= samples else list(values)
if recent_values:
result[key] = {
'sample_count': len(recent_values),
'trend': self._calculate_complex_trend(recent_values)
}
else:
recent_values = list(values)[-samples:] if len(values) >= samples else list(values)
if recent_values:
result[key] = {
'current': recent_values[-1],
'average': statistics.mean(recent_values),
'min': min(recent_values),
'max': max(recent_values),
'std_dev': statistics.stdev(recent_values) if len(recent_values) > 1 else 0,
'trend': self._calculate_trend(recent_values)
}
return result
def _calculate_trend(self, values: list) -> str:
"""
计算趋势
"""
if len(values) < 2:
return 'stable'
recent_avg = statistics.mean(values[-5:]) if len(values) >= 5 else statistics.mean(values)
older_avg = statistics.mean(values[:5]) if len(values) >= 5 else statistics.mean(values)
if recent_avg > older_avg * 1.2:
return 'increasing'
elif recent_avg < older_avg * 0.8:
return 'decreasing'
else:
return 'stable'
def _calculate_complex_trend(self, values: list) -> str:
"""
计算复杂数据趋势
"""
if len(values) < 2:
return 'stable'
# 简单的趋势判断
first_val = values[0] if isinstance(values[0], (int, float)) else 0
last_val = values[-1] if isinstance(values[-1], (int, float)) else 0
if isinstance(first_val, dict) and isinstance(last_val, dict):
# 比较数值字段
numeric_fields = [k for k, v in first_val.items() if isinstance(v, (int, float))]
if numeric_fields:
first_sum = sum(first_val.get(k, 0) for k in numeric_fields)
last_sum = sum(last_val.get(k, 0) for k in numeric_fields)
if last_sum > first_sum * 1.2:
return 'increasing'
elif last_sum < first_sum * 0.8:
return 'decreasing'
return 'stable'
# 性能分析报告
def generate_performance_report(monitor: PerformanceMonitor):
"""
生成性能分析报告
"""
current_metrics = monitor.get_current_metrics()
historical_metrics = monitor.get_historical_metrics(minutes=5)
report = {
'report_time': datetime.now().isoformat(),
'current_metrics': current_metrics,
'historical_metrics': historical_metrics,
'recommendations': generate_recommendations(current_metrics, historical_metrics)
}
return report
def generate_recommendations(current: Dict, historical: Dict) -> List[str]:
"""
生成性能优化建议
"""
recommendations = []
# CPU使用率建议
if current.get('cpu_percent', 0) > 80:
recommendations.append("CPU使用率过高 (>80%),考虑优化算法或增加资源")
if current.get('avg_cpu_percent', 0) > 70:
recommendations.append("平均CPU使用率偏高,建议进行性能优化")
# 内存使用建议
if current.get('memory_percent', 0) > 85:
recommendations.append("内存使用率过高 (>85%),可能存在内存泄漏")
# 趋势建议
if historical.get('cpu_percent', {}).get('trend') == 'increasing':
recommendations.append("CPU使用率呈上升趋势,需关注性能退化")
if historical.get('memory_percent', {}).get('trend') == 'increasing':
recommendations.append("内存使用率呈上升趋势,可能存在内存泄漏")
if not recommendations:
recommendations.append("系统运行正常,性能指标稳定")
return recommendations#性能优化工具
# performance_optimizer.py - 性能优化工具
import cProfile
import pstats
import io
import time
import functools
from typing import Callable, Any
class PerformanceOptimizer:
"""
性能优化工具类
"""
@staticmethod
def profile_function(func: Callable) -> Callable:
"""
函数性能分析装饰器
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
pr = cProfile.Profile()
pr.enable()
result = func(*args, **kwargs)
pr.disable()
# 输出性能分析结果
s = io.StringIO()
ps = pstats.Stats(pr, stream=s)
ps.sort_stats('cumulative')
ps.print_stats(10) # 显示前10个最耗时的函数
print(f"\n=== 性能分析结果 - {func.__name__} ===")
print(s.getvalue())
return result
return wrapper
@staticmethod
def time_it(func: Callable) -> Callable:
"""
函数执行时间测量装饰器
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
print(f"{func.__name__} 执行时间: {execution_time:.4f}秒")
return result
return wrapper
@staticmethod
def memory_profiler(func: Callable) -> Callable:
"""
内存使用分析装饰器
"""
try:
from memory_profiler import profile
return profile(func)
except ImportError:
print("memory_profiler 未安装,跳过内存分析")
return func
# 使用示例
@PerformanceOptimizer.profile_function
@PerformanceOptimizer.time_it
def example_heavy_function(n: int) -> int:
"""
示例重计算函数
"""
result = 0
for i in range(n):
result += i ** 2
return result
# 爬虫性能优化示例
class OptimizedSpiderMixin:
"""
优化的爬虫混入类
"""
def __init__(self):
self.request_cache = {}
self.response_cache = {}
self.start_time = time.time()
def optimized_parse(self, response):
"""
优化的解析方法
"""
# 检查缓存
cache_key = hash(response.url)
if cache_key in self.response_cache:
return self.response_cache[cache_key]
# 执行解析
parsed_data = self._parse_content(response)
# 缓存结果
self.response_cache[cache_key] = parsed_data
return parsed_data
def _parse_content(self, response):
"""
内容解析
"""
# 实际解析逻辑
pass
def get_performance_stats(self) -> Dict:
"""
获取性能统计
"""
elapsed_time = time.time() - self.start_time
return {
'elapsed_time': elapsed_time,
'cached_responses': len(self.response_cache),
'cache_hit_rate': self._calculate_cache_hit_rate()
}
def _calculate_cache_hit_rate(self) -> float:
"""
计算缓存命中率
"""
# 实现缓存命中率计算
pass
# 性能测试工具
def performance_test_suite():
"""
性能测试套件
"""
import unittest
import time
class PerformanceTests(unittest.TestCase):
def test_response_time(self):
"""
测试响应时间
"""
start_time = time.time()
# 执行被测试的功能
result = example_heavy_function(10000)
end_time = time.time()
execution_time = end_time - start_time
self.assertLess(execution_time, 1.0, "函数执行时间超过1秒")
def test_memory_usage(self):
"""
测试内存使用
"""
import tracemalloc
tracemalloc.start()
result = example_heavy_function(10000)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
self.assertLess(peak / 1024 / 1024, 100, "峰值内存使用超过100MB") # 100MB限制
# 运行性能测试
suite = unittest.TestLoader().loadTestsFromTestCase(PerformanceTests)
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite)#监控面板设计
#监控面板组件
# dashboard_components.py - 监控面板组件
import dash
from dash import dcc, html, Input, Output, State
import plotly.graph_objs as go
import pandas as pd
from datetime import datetime, timedelta
import redis
import json
def create_dashboard_app():
"""
创建监控面板应用
"""
app = dash.Dash(__name__, suppress_callback_exceptions=True)
app.layout = html.Div([
# 顶部导航
html.Div([
html.H1("Scrapy爬虫监控面板", style={'textAlign': 'center'}),
html.Div(id='live-update-text'),
], className='banner'),
# 控制面板
html.Div([
html.Div([
dcc.Dropdown(
id='spider-selector',
options=[
{'label': '所有爬虫', 'value': 'all'},
{'label': '新闻爬虫', 'value': 'news_spider'},
{'label': '电商爬虫', 'value': 'ecommerce_spider'},
{'label': '社交媒体爬虫', 'value': 'social_spider'}
],
value='all'
),
], className='two columns'),
html.Div([
dcc.DatePickerRange(
id='date-picker-range',
start_date=datetime.today() - timedelta(days=7),
end_date=datetime.today(),
),
], className='four columns'),
], className='row'),
# 实时指标卡片
html.Div([
html.Div([
html.H3("活跃任务数"),
html.P(id='active-jobs', children="0"),
], className='three columns', style={'background-color': '#f9f9f9', 'padding': '20px', 'border-radius': '10px'}),
html.Div([
html.H3("成功率"),
html.P(id='success-rate', children="0%"),
], className='three columns', style={'background-color': '#f9f9f9', 'padding': '20px', 'border-radius': '10px'}),
html.Div([
html.H3("错误率"),
html.P(id='error-rate', children="0%"),
], className='three columns', style={'background-color': '#f9f9f9', 'padding': '20px', 'border-radius': '10px'}),
html.Div([
html.H3("响应时间"),
html.P(id='response-time', children="0ms"),
], className='three columns', style={'background-color': '#f9f9f9', 'padding': '20px', 'border-radius': '10px'}),
], className='row'),
# 图表区域
html.Div([
html.Div([
dcc.Graph(id='requests-over-time')
], className='six columns'),
html.Div([
dcc.Graph(id='error-distribution')
], className='six columns'),
], className='row'),
html.Div([
html.Div([
dcc.Graph(id='resource-usage')
], className='six columns'),
html.Div([
dcc.Graph(id='spider-performance')
], className='six columns'),
], className='row'),
# 刷新间隔
dcc.Interval(
id='interval-component',
interval=10*1000, # 每10秒更新
n_intervals=0
)
])
# 回调函数
@app.callback(
[Output('active-jobs', 'children'),
Output('success-rate', 'children'),
Output('error-rate', 'children'),
Output('response-time', 'children')],
[Input('interval-component', 'n_intervals')]
)
def update_metrics(n):
# 从Redis或其他数据源获取实时数据
metrics = get_real_time_metrics()
return [
f"{metrics.get('active_jobs', 0)}",
f"{metrics.get('success_rate', 0):.2f}%",
f"{metrics.get('error_rate', 0):.2f}%",
f"{metrics.get('avg_response_time', 0):.2f}s"
]
@app.callback(
Output('requests-over-time', 'figure'),
[Input('interval-component', 'n_intervals'),
Input('spider-selector', 'value')]
)
def update_requests_chart(n, spider_name):
# 获取请求数据
data = get_requests_data(spider_name)
fig = go.Figure(data=[
go.Scatter(x=data['timestamp'], y=data['requests'],
mode='lines+markers', name='请求数')
])
fig.update_layout(
title='请求数随时间变化',
xaxis_title='时间',
yaxis_title='请求数'
)
return fig
@app.callback(
Output('error-distribution', 'figure'),
[Input('interval-component', 'n_intervals')]
)
def update_error_chart(n):
# 获取错误分布数据
error_data = get_error_distribution()
fig = go.Figure(data=[
go.Pie(labels=list(error_data.keys()), values=list(error_data.values()))
])
fig.update_layout(title='错误类型分布')
return fig
return app
def get_real_time_metrics():
"""
获取实时指标
"""
# 从Redis或其他数据源获取数据
try:
r = redis.Redis(host='localhost', port=6379, db=0)
metrics = r.hgetall('spider:metrics')
return {
'active_jobs': int(metrics.get(b'active_jobs', 0)),
'success_rate': float(metrics.get(b'success_rate', 0)),
'error_rate': float(metrics.get(b'error_rate', 0)),
'avg_response_time': float(metrics.get(b'avg_response_time', 0))
}
except:
# 模拟数据
return {
'active_jobs': 15,
'success_rate': 95.5,
'error_rate': 2.1,
'avg_response_time': 0.8
}
def get_requests_data(spider_name: str = 'all'):
"""
获取请求数据
"""
# 模拟数据
import numpy as np
dates = pd.date_range(start='2024-01-01', periods=24, freq='H')
requests = np.random.poisson(50, 24) # 泊松分布模拟请求
return pd.DataFrame({
'timestamp': dates,
'requests': requests
})
def get_error_distribution():
"""
获取错误分布
"""
return {
'网络错误': 45,
'解析错误': 30,
'超时错误': 15,
'其他错误': 10
}
# 启动应用
if __name__ == '__main__':
app = create_dashboard_app()
app.run_server(debug=True, host='0.0.0.0', port=8050)#移动端友好的监控面板
<!DOCTYPE html>
<html>
<head>
<title>爬虫监控面板 - 移动端</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: Arial, sans-serif;
background-color: #f5f5f5;
padding: 10px;
}
.header {
background-color: #2c3e50;
color: white;
padding: 15px;
text-align: center;
border-radius: 10px;
margin-bottom: 15px;
}
.metrics-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(150px, 1fr));
gap: 10px;
margin-bottom: 15px;
}
.metric-card {
background-color: white;
padding: 15px;
border-radius: 8px;
text-align: center;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.metric-value {
font-size: 1.5em;
font-weight: bold;
margin: 5px 0;
}
.metric-label {
font-size: 0.8em;
color: #666;
}
.chart-container {
background-color: white;
padding: 15px;
border-radius: 8px;
margin-bottom: 15px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.status-indicator {
display: inline-block;
width: 12px;
height: 12px;
border-radius: 50%;
margin-right: 5px;
}
.status-good { background-color: #27ae60; }
.status-warning { background-color: #f39c12; }
.status-critical { background-color: #e74c3c; }
.refresh-btn {
background-color: #3498db;
color: white;
border: none;
padding: 10px 20px;
border-radius: 5px;
cursor: pointer;
width: 100%;
margin-top: 10px;
}
.refresh-btn:hover {
background-color: #2980b9;
}
</style>
</head>
<body>
<div class="header">
<h1>📊 爬虫监控面板</h1>
<p id="update-time">最后更新: --:--:--</p>
</div>
<div class="metrics-grid">
<div class="metric-card">
<div class="metric-value" id="active-jobs">0</div>
<div class="metric-label">活跃任务</div>
</div>
<div class="metric-card">
<div class="metric-value" id="success-rate">0%</div>
<div class="metric-label">成功率</div>
</div>
<div class="metric-card">
<div class="metric-value" id="error-rate">0%</div>
<div class="metric-label">错误率</div>
</div>
<div class="metric-card">
<div class="metric-value" id="response-time">0ms</div>
<div class="metric-label">响应时间</div>
</div>
</div>
<div class="chart-container">
<h3>📈 实时监控图表</h3>
<canvas id="realtime-chart" width="400" height="200"></canvas>
</div>
<div class="chart-container">
<h3>⚠️ 错误分布</h3>
<canvas id="error-chart" width="400" height="200"></canvas>
</div>
<button class="refresh-btn" onclick="refreshData()">🔄 刷新数据</button>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<script>
// 实时数据图表
const realtimeCtx = document.getElementById('realtime-chart').getContext('2d');
const realtimeChart = new Chart(realtimeCtx, {
type: 'line',
data: {
labels: Array.from({length: 10}, (_, i) => `${i*10}s`),
datasets: [{
label: '请求数',
data: Array(10).fill(0),
borderColor: '#3498db',
tension: 0.1
}]
},
options: {
responsive: true,
scales: {
y: {
beginAtZero: true
}
}
}
});
// 错误分布图表
const errorCtx = document.getElementById('error-chart').getContext('2d');
const errorChart = new Chart(errorCtx, {
type: 'doughnut',
data: {
labels: ['网络错误', '解析错误', '超时错误', '其他'],
datasets: [{
data: [45, 30, 15, 10],
backgroundColor: ['#e74c3c', '#f39c12', '#3498db', '#95a5a6']
}]
},
options: {
responsive: true
}
});
// 更新数据显示
function updateDisplay(data) {
document.getElementById('active-jobs').textContent = data.active_jobs || 0;
document.getElementById('success-rate').textContent = (data.success_rate || 0).toFixed(1) + '%';
document.getElementById('error-rate').textContent = (data.error_rate || 0).toFixed(1) + '%';
document.getElementById('response-time').textContent = (data.avg_response_time || 0).toFixed(2) + 's';
// 更新时间
document.getElementById('update-time').textContent = '最后更新: ' + new Date().toLocaleTimeString();
}
// 模拟数据更新
function refreshData() {
// 模拟API调用
const mockData = {
active_jobs: Math.floor(Math.random() * 50),
success_rate: 90 + Math.random() * 9,
error_rate: Math.random() * 5,
avg_response_time: Math.random() * 2
};
updateDisplay(mockData);
// 更新图表数据
const newData = Math.floor(Math.random() * 100);
realtimeChart.data.datasets[0].data.push(newData);
realtimeChart.data.datasets[0].data.shift();
realtimeChart.update();
}
// 定时刷新
setInterval(refreshData, 10000);
refreshData(); // 初始化
</script>
</body>
</html>#故障排查与诊断
#故障诊断工具
# diagnostic_tools.py - 故障诊断工具
import psutil
import requests
import socket
import time
from datetime import datetime
import logging
from typing import Dict, List, Optional
class DiagnosticTools:
"""
故障诊断工具集合
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
def check_system_resources(self) -> Dict:
"""
检查系统资源
"""
return {
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent,
'process_count': len(psutil.pids()),
'load_average': getattr(os, 'getloadavg', lambda: (0, 0, 0))(),
'network_connections': len(psutil.net_connections())
}
def check_network_connectivity(self, urls: List[str]) -> Dict:
"""
检查网络连通性
"""
results = {}
for url in urls:
try:
start_time = time.time()
response = requests.get(url, timeout=10)
end_time = time.time()
results[url] = {
'status': 'success',
'status_code': response.status_code,
'response_time': round(end_time - start_time, 3),
'content_length': len(response.content)
}
except Exception as e:
results[url] = {
'status': 'failed',
'error': str(e)
}
return results
def check_port_availability(self, host: str, ports: List[int]) -> Dict:
"""
检查端口可用性
"""
results = {}
for port in ports:
try:
sock = socket.socket(socket.AF
