Quick introduction and practical combat of Python distributed process (with complete code)

When dealing with CPU-intensive cross-machine tasks such as batch data processing and model training slicing, Python's GIL (global interpreter lock) will completely "flatten" multi-threads, and standard local multi-processes cannot utilize the computing power of multiple machines - at this time, the standard librarymultiprocessing.managersModules can quickly help us build a lightweight distributed system.

This article will start from process/thread selection logic, step by step implement distributed computing of Master‑Worker architecture, and give production-usable improvement suggestions.


1. First clarify: when to use distributed processes?

Many students are new to concurrency/parallelism and are confused about which one to choose between process-thread-coroutine and distributed? First, let’s clarify the core limitations in the Python scenario:

SolutionCore limitations and advantagesApplicable scenarios
Multi-threadingLimited by GIL, only 1 thread runs the CPU at the same time; but the creation/switching overhead is minimal and shared memory is simpleI/O intensive (crawlers, API gateways, database reading and writing)
Local multi-processEach process has an independent interpreter/GIL, which can take advantage of multiple cores; but it cannot cross machines, and the memory space is not sharedCPU-intensive tasks on a single machine
Distributed multi-processInherits the multi-core advantages of local multi-process, can elastically expand and shrink across multiple machines; however, network communication is required, and task serialization has overheadCPU-intensive batch tasks on multiple machines

To put it simply: if your calculations cannot be handled by one machine, or you need to use the idle cores of different machines, distributed multi-processing is the first choice.


2. Lightweight distributed core architecture

multiprocessing.managersThe design is very simple, using the classic Master‑Worker mode:

Disassembly of core components

  1. Master Node
  • Create two core queues:Task Queue(put pending tasks),Result Queue(Receive completed results)
  • passBaseManagerExpose the queue to LAN/public network**
  • Responsible for task distribution and results summary
  1. Worker node
  • passBaseManagerNetwork queue connected to Master
  • Loop fromTask QueueGet the task, process it and stuff it backResult Queue
  • You can start/stop any number of Workers at any time without modifying the Master code

⚡ The advantage of this design is: Workers can be added as they want and stopped as they want, and the Master does not need to be changed at all, which is very suitable for dynamic expansion and contraction scenarios.


3. Complete code actual combat

We use "calculate the square of a bunch of random numbers" as a simulation task (each sleep is calculated for 1 second, the simulation takes real time) to demonstrate the entire deployment process.

1. Master node code (task_master.py

import random
import time
from multiprocessing import Queue
from multiprocessing.managers import BaseManager

# 自定义管理器类(必须继承 BaseManager)
class QueueManager(BaseManager):
    pass

def main():
    # 1. 初始化本地队列
    task_queue = Queue()   # 存放待计算的随机数
    result_queue = Queue() # 存放计算结果

    # 2. 把本地队列注册为网络可调用的接口
    # 使用 lambda 匿名函数返回队列实例,避免提前实例化问题
    QueueManager.register('get_task_queue', callable=lambda: task_queue)
    QueueManager.register('get_result_queue', callable=lambda: result_queue)

    # 3. 启动网络管理器
    # address: ('0.0.0.0', 端口) 表示监听本机所有网卡上的该端口
    # authkey: 字节串密钥,Master 和 Worker 必须完全一致,防止非法连接
    manager = QueueManager(
        address=('0.0.0.0', 56789),
        authkey=b'python_distributed_test_202x'
    )
    manager.start()
    print(f"✅ Master 启动成功!监听端口 56789")

    try:
        # 4. 获取代理队列对象(和本地 Queue 用法完全一致)
        proxy_task = manager.get_task_queue()
        proxy_result = manager.get_result_queue()

        # 5. 分发模拟任务:放 10 个 0~10000 的随机数
        print("\n📦 开始分发任务...")
        for _ in range(10):
            n = random.randint(0, 10000)
            print(f"→ 放入任务:计算 {n} 的平方")
            proxy_task.put(n)

        # 6. 阻塞等待 Worker 返回结果(设置 30 秒超时避免死等)
        print("\n⏳ 等待 Worker 返回结果...")
        for _ in range(10):
            res = proxy_result.get(timeout=30)
            print(f"← 收到结果:{res}")

    except KeyboardInterrupt:
        print("\n⚠️  Master 被手动中断")
    finally:
        # 7. 清理资源,关闭网络管理器
        manager.shutdown()
        print("\n👋 Master 已安全退出")

if __name__ == "__main__":
    main()

2. Worker node code (task_worker.py

import time
import sys
from multiprocessing.managers import BaseManager

# 自定义管理器类(和 Master 保持一致,否则注册会失败)
class QueueManager(BaseManager):
    pass

def main():
    # 1. 注册要调用的 Master 接口(只需要名称,不需要具体实现)
    QueueManager.register('get_task_queue')
    QueueManager.register('get_result_queue')

    # 2. 从命令行或配置获取 Master 的 IP(默认 127.0.0.1,方便单测)
    server_ip = sys.argv[1] if len(sys.argv) > 1 else '127.0.0.1'
    print(f"🔗 正在连接 Master: {server_ip}:56789...")

    try:
        # 3. 连接到 Master 的网络管理器
        manager = QueueManager(
            address=(server_ip, 56789),
            authkey=b'python_distributed_test_202x'
        )
        manager.connect()
        print("✅ 连接 Master 成功!")

        # 4. 获取代理队列对象
        proxy_task = manager.get_task_queue()
        proxy_result = manager.get_result_queue()

        # 5. 循环取任务(直到超时 10 秒没有新任务,自动退出)
        print("\n🏃 Worker 开始工作...按 Ctrl+C 可手动停止")
        while True:
            try:
                n = proxy_task.get(timeout=10)
                print(f"→ 收到任务:计算 {n} 的平方")
                # 模拟 1 秒的真实耗时任务
                time.sleep(1)
                res = f"{n} * {n} = {n*n}"
                print(f"← 完成任务:{res}")
                proxy_result.put(res)
            except Exception as e:
                print(f"\n⚠️  Worker 异常 / 无新任务:{str(e)}")
                break

    except ConnectionRefusedError:
        print("❌ 连接失败:请检查 Master 是否启动、IP/端口/密钥是否正确")
    finally:
        print("\n👋 Worker 已安全退出")

if __name__ == "__main__":
    main()

4. Rapid deployment and testing

  1. Open terminal 1 and start Master:

    python task_master.py
  2. Open terminal 2, 3... (simulate multiple Workers), run the Worker (default connection127.0.0.1):

    # 终端 2
    python task_worker.py
    # 终端 3
    python task_worker.py
  3. Observe the terminal output: the task will be "robbed" for processing by two Workers, and the results will be returned to the Master in the order of processing.

Cross-machine deployment

  1. puttask_worker.pyCopy to other machines.
  2. When running the Worker on another machine, pass in the Master’s intranet IP:
    # 假设 Master 内网 IP 是 192.168.1.100
    python task_worker.py 192.168.1.100
  3. ⚠️ Remember to turn off the firewall on the Master machine or enable56789port.

5. Suggestions for improving the production environment

The above code is only the minimum usable demo. It is not enough to be used directly in production. At least the following optimizations are required:

1. Security

  • Don't hardcode keys: use environment variables or config file reading insteadauthkey
  • TLS encrypted communication:multiprocessing.managersNative support for TLS, need to generate SSL certificate
  • IP whitelist: Use iptables or restrict in code that only specified IPs can connect to the Master

2. Fault tolerance mechanism

  • Task Retry: The Master maintains the task status, and tasks that have not received results will be automatically retransmitted after timeout.
  • Heartbeat Detection: The Worker sends heartbeats to the Master regularly. If no heartbeat is received within the timeout, the Worker will be marked offline and tasks will be reassigned.
  • Dead letter queue: Tasks that have failed to process more than 3 times will be placed in a special queue for manual inspection.

3. Performance optimization

  • Batch tasks: put/take a batch of tasks at one time to reduce network round-trip overhead
  • Efficient serialization: pickle serialization is used by default, with average performance, and can be replaced by MessagePack / Protocol Buffers
  • Task sharding: If the task is too heavy, split it into small tasks on the Master side in advance and then distribute them

4. Modern alternatives

If you don’t want to reinvent the wheel, mature tools are recommended for production environments:

  • Celery + Redis/RabbitMQ: The most commonly used distributed task queue, supporting task scheduling, retry, and priority
  • Dask: Specialized in processing large-scale data calculations, the interface is similar to Pandas/NumPy, and the learning cost is low
  • Ray: A high-performance distributed computing framework that supports complex scenarios such as machine learning and reinforcement learning.

Summarize

multiprocessing.managersIt is a lightweight distributed solution built into Python, suitable for quickly verifying cross-machine computing requirements, and can be run without installing any third-party libraries. However, if it is a large-scale production environment, it is still recommended to use mature tools such as Celery and Dask.

I hope this article can help you get started with Python distributed processes quickly! If you have any questions, please feel free to discuss them in the comment area.