Why are microservices/asynchronous tasks using RabbitMQ?

RabbitMQ is an open source message broker/queue server based on the AMQP protocol. It can help us solve the headaches of microservice decoupling, asynchronous processing of background tasks, and multi-system broadcast notifications.

Its core advantages are clear enough:

  • ✅ Reliable: supports message/queue persistence and release confirmation
  • ✅ Flexible: 4 types of commonly used switches cope with various routing scenarios
  • ✅ Easy to use: intuitive web management interface
  • ✅ Full ecosystem: multi-language client, Pythonpikavery mature

1. Complete local installation configuration in 1 minute

Using Docker is the fastest solution and does not require the tedious steps of manually installing Erlang and configuring environment variables.

1.1 Docker single container startup

# 拉取带管理界面的3.x稳定版镜像
docker pull rabbitmq:3-management

# 后台启动,映射端口、设置默认用户名密码
docker run -d \
  --name my-rabbitmq \
  -p 5672:5672  `# AMQP核心通信端口` \
  -p 15672:15672  `# Web管理界面端口` \
  -e RABBITMQ_DEFAULT_USER=dev_user \
  -e RABBITMQ_DEFAULT_PASS=dev_pass123 \
  rabbitmq:3-management

1.2 Authentication and Access

Wait about 10 seconds after startup and visit **http://localhost:15672**,用上面的`dev_user/dev_pass123`Log in and you will see the management interface~

If you want to see the container status or logs, use these two commands:

# 查看运行状态
docker ps | grep my-rabbitmq

# 查看实时日志
docker logs -f my-rabbitmq

2. Understand the core concepts in 3 minutes

You don’t need to memorize too much, just remember the following 7 most commonly used ones. It will be more intuitive to list them in a table:

ConceptOne sentence explanation
ProducerProgram that sends messages
ConsumerProgram to receive messages
Exchange (switch)The "transfer station" of messages, responsible for assigning messages to queues according to rules
Queue (queue)"Temporary warehouse" of messages, stored waiting for consumers to pick them up
Binding"Association rules" for switches and queues
Routing Key"Specific matching conditions" of binding rules
Virtual HostLogically "independent workspace", different projects can be isolated

3. Python integration practice (most commonly used scenarios)

3.1 Preparation

Install Python firstpikaClient:

pip install pika

3.2 Scenario 1: The simplest "point-to-point" direct connection queue

Suitable for background task processing, such as sending emails and generating reports.

Producer code

import pika
import json
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)

def send_email_task(to: str, subject: str, content: str):
    # 1. 建立连接
    credentials = pika.PlainCredentials('dev_user', 'dev_pass123')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost', credentials=credentials
    ))
    channel = connection.channel()

    # 2. 声明持久化队列(重启RabbitMQ消息/队列还在)
    channel.queue_declare(queue='email_tasks', durable=True)

    # 3. 组装消息
    task = {
        "to": to,
        "subject": subject,
        "content": content
    }

    # 4. 发送持久化消息
    channel.basic_publish(
        exchange='',  # 空字符串是「默认直连交换机」
        routing_key='email_tasks',  # 直连模式直接用队列名当路由键
        body=json.dumps(task),
        properties=pika.BasicProperties(
            delivery_mode=2  # 消息持久化
        )
    )

    logging.info(f"邮件任务已发送:{task}")
    connection.close()

# 测试发送
send_email_task("test@example.com", "测试邮件", "这是RabbitMQ发送的邮件!")

Consumer code

import pika
import json
import logging
import time

logging.basicConfig(level=logging.INFO)

def process_email_task(ch, method, properties, body):
    try:
        # 解析消息
        task = json.loads(body.decode('utf-8'))
        logging.info(f"正在处理邮件任务:{task}")

        # 模拟发送邮件的耗时操作
        time.sleep(2)

        # 手动确认消息(必须!不然重启消费者会重新收到)
        ch.basic_ack(delivery_tag=method.delivery_tag)
        logging.info("邮件任务处理完成!")

    except Exception as e:
        logging.error(f"处理邮件任务失败:{e}")
        # 拒绝消息并重新排队(如果是临时错误),或者不重新排队(永久错误)
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

def start_consumer():
    credentials = pika.PlainCredentials('dev_user', 'dev_pass123')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost', credentials=credentials
    ))
    channel = connection.channel()

    # 再次声明队列(必须!因为不知道生产者是否先启动)
    channel.queue_declare(queue='email_tasks', durable=True)

    # 设置QoS(一次只取1条消息,处理完再取下一条,避免单消费者负载过高)
    channel.basic_qos(prefetch_count=1)

    # 绑定消费者回调函数
    channel.basic_consume(queue='email_tasks', on_message_callback=process_email_task)

    logging.info("邮件任务消费者已启动,等待任务...")
    channel.start_consuming()

# 启动消费者
if __name__ == "__main__":
    start_consumer()

3.3 Scenario 2: "Publish-Subscribe" sector broadcast

Suitable for system notifications, such as sending emails, text messages, and site messages at the same time after the user places an order.

Producer (Order System) Code

import pika
import json
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)

def publish_order_created(order_id: str, user_id: str, total: float):
    credentials = pika.PlainCredentials('dev_user', 'dev_pass123')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost', credentials=credentials
    ))
    channel = connection.channel()

    # 声明扇形交换机(fanout,忽略路由键,直接广播给所有绑定的队列)
    channel.exchange_declare(exchange='order_events', exchange_type='fanout', durable=True)

    # 组装事件
    event = {
        "order_id": order_id,
        "user_id": user_id,
        "total": total,
        "created_at": datetime.now().isoformat()
    }

    # 发布事件
    channel.basic_publish(
        exchange='order_events',
        routing_key='',  # 扇形模式不需要
        body=json.dumps(event),
        properties=pika.BasicProperties(delivery_mode=2)
    )

    logging.info(f"订单创建事件已发布:{event}")
    connection.close()

# 测试发布
publish_order_created("ORD-123456", "USER-789", 99.9)

Consumer 1 (email notification system)

Just bind the queue toorder_eventsAll order events are received on the sector switch.

# 前面的连接、日志配置和场景1一样,只改关键部分
def start_email_notification_consumer():
    # ... 连接和声明交换机的代码 ...
    # 创建临时排他队列(消费者断开后自动删除)
    result = channel.queue_declare(queue='', exclusive=True)
    temp_queue_name = result.method.queue

    # 绑定临时队列到扇形交换机
    channel.queue_bind(exchange='order_events', queue=temp_queue_name)

    # ... 回调函数和启动消费的代码 ...

Consumer 2 (SMS Notification System)

It is almost the same as consumer 1, except that the callback function is changed to the logic of processing SMS messages.


4. Lightweight retry and dead letter mechanism (necessary for production)

Background tasks will inevitably fail (for example, the mail server temporarily hangs). We cannot directly lose the message, nor can we retry infinitely.

You can use the lightweight solution of "retry queue + TTL + dead letter queue":

  1. Message failed → sent to retry queue
  2. The retry queue has a TTL (for example, it expires after 5 seconds)
  3. Automatically send to "Dead Letter Exchange" after expiration
  4. Bind the dead letter switch back to the main queue (continue to retry) or the dead letter queue (give up, manual processing)

5. Key Best Practices

  1. Persistence must be used on demand: Persisting both messages and queues will reduce performance, and temporary tasks can be unnecessary.
  2. Message must be confirmed manually: to avoid message loss when the consumer hangs up
  3. Set QoS prefetch_count: generally 1-10, adjusted according to consumer processing capabilities
  4. No need for default user guest: Create a dedicated user in the production environment and only give necessary permissions
  5. Don’t send too big messages: It’s best to keep a single message within 10MB.

Summarize

Getting started with RabbitMQ is very simple, Docker single container + PythonpikaIt can handle most common scenarios. Master the core concepts, direct connection/sector queue, lightweight retry mechanism, and cooperate with management interface monitoring to build a reliable asynchronous communication system~