When crawlers meet RabbitMQ: efficient decoupled distributed crawling practice

Open a single Python script and use requests + beautifulsoup4 to crawl small websites, and the efficiency is so-so; but once you need to crawl tens of thousands of dynamic data and mixed content across multiple domain names, problems will arise: single thread is too slow, multi-thread/multi-process** task allocation is confusing, repeated crawling is difficult to control, and nodes are busy when they hang up.

At this time, Message Queuing (MQ) becomes the "savior scheduler" of distributed crawlers - it completely decouples the production and consumption of tasks: you just throw URLs into the queue, and the crawling nodes themselves grab tasks, work, and report results. Even better, MQ also comes with advanced capabilities such as task priority, persistence, and automatic retry. Among many mature MQs, RabbitMQ has become the first choice for entry-level to production-level distributed crawling with its multi-protocol support, intuitive web management interface and simple Python client (pika).

This article starts with rapid deployment and builds step by step a distributed crawler message queue architecture that is executable, reliable, and supports priority.


1. Quick overview of core features: choosing the right features for your crawler

RabbitMQ has many functions, but for crawlers, the following five features are the keys to knowing, improving efficiency, and ensuring stability:

FeaturesEffect on crawlers
Message persistenceThe producer hangs up, MQ restarts, and the consumer goes down? The task data will not be lost, and you can continue to crawl after recovery.
Release/Manual ConfirmationThe producer can wait for "receipt received" after sending; the consumer must manually "confirm completion" after crawling, and MQ will delete the task.
Fair distribution (QoS)To prevent fast nodes from being idle and slow nodes from being busy: each consumer only prefetches 1 task at a time and receives it after completion.
Priority QueueCrawl hot news first, then crawl historical archives, assign tasks according to priority, and get important data first.
Built-in Web ManagementOpen the browser and you can see the queue length, number of connections, and message delivery status. You can troubleshoot problems without typing commands.

2. Set up the local environment in 3 minutes

2.1 Start RabbitMQ with one click (Docker Party Gospel)

Docker is the easiest to use for local development. There is no need to install Erlang separately. Even the management interface is opened:

docker run -d --name local-rabbitmq \
  -p 5672:5672 \   # AMQP 协议端口(生产者/消费者连接用)
  -p 15672:15672 \ # Web 管理界面端口
  rabbitmq:3-management

Open **http://localhost:15672**,账号密码默认都是 after startupguest, you can see the concise console after logging in.

2.2 Install Python client

Use the officially recommended lightweight library pika to connect to RabbitMQ, and install the basic crawler library:

pip install pika requests beautifulsoup4

3. Infrastructure run-through: producers send URLs and consumers crawl URLs

The entire infrastructure has only two roles:

  • Task Producer: Generate the URL to be crawled (or the encapsulated task object) and throw it into the MQ "Task Queue to be Crawled";
  • Crawler Consumer: grab tasks from the "task queue to be crawled", manually confirm after the crawling is completed, and then continue to get the next one.

3.1 Producer code: just lose the URL

import pika
import json

def init_mq_producer():
    """初始化生产者连接,声明持久化的待爬队列"""
    # 建立阻塞连接(本地测试用 BlockingConnection 足够,生产环境推荐 SelectConnection)
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
    channel = connection.channel()

    # 声明队列:durable=True 表示队列持久化(MQ 重启后队列依然存在)
    channel.queue_declare(queue="scrape_url_queue", durable=True)
    return connection, channel

def send_scrape_task(channel, url: str, extra_params: dict = None):
    """发送单个爬取任务(将任务序列化为 JSON,方便传递复杂参数)"""
    task = {"url": url, "params": extra_params or {}}
    # delivery_mode=2 表示消息持久化到磁盘
    channel.basic_publish(
        exchange="",  # 使用默认直连交换机,routing_key 直接匹配队列名
        routing_key="scrape_url_queue",
        body=json.dumps(task, ensure_ascii=False),
        properties=pika.BasicProperties(delivery_mode=2),
    )
    print(f"✅ 生产者已发送任务:{url}")

if __name__ == "__main__":
    # 测试:丢 3 个百度新闻子域名的 URL
    conn, ch = init_mq_producer()
    test_urls = [
        "https://news.baidu.com/",
        "https://finance.baidu.com/",
        "https://tech.baidu.com/",
    ]
    for u in test_urls:
        send_scrape_task(ch, u)
    conn.close()

3.2 Consumer code: Confirm after completion

import pika
import json
import requests
from bs4 import BeautifulSoup

def init_mq_consumer():
    """初始化消费者连接,并开启公平分发"""
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
    channel = connection.channel()

    # 队列声明必须与生产者一致,否则会因找不到队列而报错
    channel.queue_declare(queue="scrape_url_queue", durable=True)

    # 公平分发:每个消费者只预取 1 个任务,处理完再领取下一个
    channel.basic_qos(prefetch_count=1)
    return connection, channel

def scrape_page(task: dict):
    """模拟真实爬取:抓取页面标题,并返回结果字典"""
    try:
        resp = requests.get(task["url"], headers={"User-Agent": "Mozilla/5.0"}, timeout=5)
        resp.raise_for_status()  # 遇到 4xx/5xx 直接抛出异常
        soup = BeautifulSoup(resp.text, "html.parser")
        return {"url": task["url"], "title": soup.title.string.strip(), "status": "success"}
    except Exception as e:
        return {"url": task["url"], "error": str(e), "status": "failed"}

def task_callback(ch, method, properties, body):
    """回调函数:反序列化 → 爬取 → 手动确认 → 继续等待新任务"""
    task = json.loads(body.decode("utf-8"))
    print(f"🔄 消费者正在处理:{task['url']}")

    result = scrape_page(task)
    print(f"📄 处理结果:{result}")

    # 手动确认:告诉 MQ 这个任务已处理完,可以从队列中删除
    ch.basic_ack(delivery_tag=method.delivery_tag)

if __name__ == "__main__":
    conn, ch = init_mq_consumer()
    # 注册回调,开始消费
    ch.basic_consume(
        queue="scrape_url_queue",
        on_message_callback=task_callback,
        auto_ack=False,  # 必须关闭自动确认!否则消费者刚拿到任务 MQ 就会删掉
    )
    print("⏳ 消费者等待任务中... 按 CTRL+C 退出")
    try:
        ch.start_consuming()
    except KeyboardInterrupt:
        print("\n👋 消费者已停止")
        conn.close()

4. Advanced optimization for crawlers

The infrastructure is up and running, but production-level projects usually still need: priority control, failed task handling, and postback of crawling results (such as writing the title to the database or sending it to another queue).

4.1 Priority Queue: Climb the hot list first, then the history

Set a priority of 1 to 10 for the task (the larger the number, the higher the priority), and hotspot data can be consumed by "jumping in line".

Producer Adjustments:

# 声明队列时设置最大优先级
channel.queue_declare(
    queue="priority_scrape_queue",
    durable=True,
    arguments={"x-max-priority": 10}   # 最大优先级 10,避免设太高影响性能
)

# 发送任务时携带优先级属性
def send_priority_task(channel, url, priority):
    ch.basic_publish(
        exchange="",
        routing_key="priority_scrape_queue",
        body=json.dumps({"url": url}),
        properties=pika.BasicProperties(delivery_mode=2, priority=priority),
    )

# 示例:先发历史归档(优先级 1),再发热榜新闻(优先级 10)
send_priority_task(ch, "https://news.baidu.com/history/20240101", 1)
send_priority_task(ch, "https://news.baidu.com/", 10)
# 消费者将优先收到优先级为 10 的热榜任务

Consumer code does not need to be modified, as long as the queue declaration is consistent with the producer, the priority mechanism will automatically take effect.

4.2 Dead Letter Queue (DLQ): Give failed tasks a home

Tasks that fail after multiple retries (such as network timeouts, page structure changes) should not remain stuck in the queue. You can use RabbitMQ's Dead Letter Queue to store them separately to facilitate manual inspection and decide whether to re-deliver.

Simplified version of the implementation idea:

  1. When defining the main queue, specifyx-dead-letter-exchangeandx-dead-letter-routing-keyparameter;
  2. Create an independent "dead letter queue" and bind it to the above switch/routing key;
  3. When the consumer fails to process, callch.basic_nack(delivery_tag=..., requeue=False)Reject the task and MQ will automatically post it to the dead letter queue.

This way, normal retry mechanisms (e.g. viarequeue=TruePushing back to the queue) will not affect the final destination of the failed task.


5. Performance and stability tips

  1. Reasonable use of connections and channels: It is best for a consumer to only establish one TCP connection. Multiple threads can reuse the connection but should create channels separately (channels in pika are lightweight).
  2. Enable heartbeat: Production environment servers usually disconnect connections without communication for a long time.ConnectionParametersChina plusheartbeat=60Avoid being killed by mistake.
  3. Avoid message accumulation: Check the queue length of the management interface regularly, and if necessary, increase the number of consumers or enable multi-process consumption.
  4. Task deduplication: If necessary, Redis or Bloom filters can be introduced on the producer side to avoid repeated URLs being queued.

6. Monitoring and Management

Browser access **http://localhost:15672**,善用以下页面:

  • Queues: View statistics on the current number of messages, number of consumers, and message delivery/confirmation/rejection of each queue;
  • Connections / Channels: Check the current active connections and channels, and quickly troubleshoot abnormal disconnections;
  • Overview: Master global message traffic and node health status.

7. Further reading

  1. RabbitMQ 官方教程(Python 版)
  2. Pika 生产级连接指南
  3. Scrapy 集成 RabbitMQ 项目示例

Through this article, you have built a distributed crawler infrastructure from scratch that is highly decoupled, supports persistence, and has priority. In actual projects, you can further expand the "deduplication queue" and "data cleaning queue", or encapsulate the tasks into classes and usepickleSerialization (note: pickle is only available in trusted environments). When crawlers meet RabbitMQ, you say goodbye to the pain of manual scheduling and let your crawler cluster truly "breathe autonomously".