Python multi-process programming guide

1. Process basics

1.1 Why use multiple processes?

Have you ever encountered this situation: running a Python script to process a large amount of data, computer CPU only uses less than 10%? Or is the single thread stuck in the I/O gap and slow? At this time, you can try multi-process.

Process is the basic unit of resource allocation and scheduling in the operating system. Each process has its own independent memory space, file descriptor, etc., and does not interfere with each other - it is not as easy to get into trouble as multi-threaded shared global variables. Multiprocessing can truly take advantage of the parallel capabilities of multi-core CPUs.

In order to circumvent the limitations of the interpreter's GIL (global interpreter lock, which will be explained later) on CPU-intensive tasks, Python specifically provides complete multi-process support. Common implementation methods include:

  • os.fork()—— Unix/Linux/macOS only, low-level but not cross-platform
  • multiprocessingModule - Python's official cross-platform solution with fine control
  • concurrent.futures.ProcessPoolExecutor——Python 3.2+ advanced encapsulation, more concise code
  • subprocessModule - mainly used to call external command processes

2. Unix-like low-level creation only:os.fork()

Unix/Linux/macOS providesfork()System call, it will make an almost complete copy of the current parent process (child process), the only difference is the return value:

  • Returns the PID (process ID) of the child process in the parent process
  • Returns 0 in child process

A minimalist example:

import os

print(f"父进程启动!PID:{os.getpid()}")
# 关键分叉点
pid = os.fork()  

if pid == 0:
    # 子进程逻辑
    print(f"我是子进程,PID:{os.getpid()},我的父进程是:{os.getppid()}")
else:
    # 父进程逻辑
    print(f"我是父进程,PID:{os.getpid()},刚创建了子进程:{pid}")

⚠️ Note a few hard restrictions:

  1. Not supported at all by Windowsfork(), an error will be reported when running
  2. Copying memory space is very expensive (although there is a copy-on-write mechanism, it is not as good asspawnsafe and general)
  3. The child process will inherit all resource status of the parent process, but subsequent modifications** will not be synchronized back to the parent process**

3. Official cross-platform first choice:multiprocessingmodule

In order to unify multi-process implementation on different platforms, Python providesmultiprocessingmodule, fully compatible with all mainstream systems, and the API style is close to multi-threading.

3.1 Single sub-process control:Processkind

like usingthreading.ThreadTo create a thread, useProcessThe class specifies the function to be run by the child process,args/kwargsPassing on parameters,start()start up,join()The wait is over.

from multiprocessing import Process
import os

def print_child_info(name: str) -> None:
    print(f"🚀 子进程启动 | 名称:{name} | PID:{os.getpid()}")

if __name__ == '__main__':
    # ⚠️ Windows 上必须加这行!否则子进程会重新导入模块并执行所有顶层代码
    print(f"👨‍💼 父进程启动 | PID:{os.getpid()}")
    p = Process(target=print_child_info, args=("测试子进程",))
    print("⏳ 准备启动子进程...")
    p.start()  # 真正启动子进程
    p.join()   # 阻塞父进程,直到子进程结束
    print("✅ 子进程已结束")

3.2 Batch management:Poolprocess pool

If you need to run dozens or hundreds of small tasks at the same time, don't create Process one by one - the overhead of process creation and destruction is not small. Process pools can be used to reuse created processes, which greatly improves efficiency.

It is recommended to use Context Managerwith Pool(...)(Python 3.3+), automatically release resources:

from multiprocessing import Pool
import os
import time
import random

def simulate_task(task_id: int) -> str:
    start = time.time()
    print(f"🛠️  任务{task_id}开始 | 处理进程:{os.getpid()}")
    time.sleep(random.uniform(0, 3))   # 模拟耗时
    cost = round(time.time() - start, 2)
    print(f"✅ 任务{task_id}完成 | 耗时:{cost}s")
    return f"任务{task_id}的结果"

if __name__ == '__main__':
    print(f"👨‍💼 父进程启动 | PID:{os.getpid()}")
    with Pool(processes=4) as pool:   # 最大容量 4,默认等于 CPU 核心数
        # 用 apply_async 异步提交 5 个任务(非阻塞)
        task_results = [pool.apply_async(simulate_task, args=(i,)) for i in range(5)]
        print("⏳ 所有任务已提交,等待完成...")
        # 获取所有任务的结果(若某任务未完成,会阻塞在这里)
        for res in task_results:
            print(f"📦 拿到:{res.get()}")
    print("🏁 所有任务和进程池已清理完毕")

📌 Two core methods of Pool:

  • apply_async(func, args): Non-blocking asynchronous submission, suitable for batch tasks
  • apply(func, args): Blocking synchronous submission, wait until the current task is completed before submitting the next one, almost useless

4. External command process call:subprocessmodule

If the task is not to write Python functions, but to call shell scripts, system commands, and other language programs, usesubprocessmodule. It completely replaces the oldos.systemos.popen, safer and more controllable.

4.1 Simple call (get return value and output)

It is recommended to use Python 3.5+subprocess.run(), supports capturing output, setting timeouts, etc.:

import subprocess

# 调用 ls -l,捕获输出并转为文本
result = subprocess.run(
    ["ls", "-l"],          # 参数必须用列表,避免 Shell 注入风险
    capture_output=True,
    text=True,
    check=True             # 命令失败(返回码非 0)时抛出异常
)
print("📁 当前目录内容:")
print(result.stdout)

4.2 Interacting with external processes

If you need to send input to an external process and read real-time output, you can usePopenClass (with context manager):

import subprocess

with subprocess.Popen(
    ["python3", "-i"],
    stdin=subprocess.PIPE,   # 允许向子进程发送输入
    stdout=subprocess.PIPE,  # 接收子进程的输出
    stderr=subprocess.PIPE,  # 接收子进程的错误输出
    text=True,
    bufsize=1,
    universal_newlines=True
) as proc:
    # 发送命令并换行
    proc.stdin.write('print("👋 来自子进程的问候!")\n')
    proc.stdin.write('exit()\n')
    # 读取所有输出和错误
    out, err = proc.communicate()
    print("📤 子进程输出:")
    print(out.strip())

5. Inter-process communication (IPC): Queue and Pipe

Because the process has an independent memory space and cannot directly share global variables like multi-threads, a special IPC mechanism must be used to transfer data.multiprocessingTwo commonly used methods are built-in:

5.1 Multi-producer-multi-consumer:Queue

Queue is the most commonly used IPC method, thread safe, process safe, allowing multiple processes to write and multiple processes to read.

from multiprocessing import Process, Queue
import os
import time
import random

def producer(q: Queue) -> None:
    print(f"🏭 生产者启动 | PID:{os.getpid()}")
    for item in ["苹果", "香蕉", "橙子"]:
        print(f"📤 生产者放入:{item}")
        q.put(item)
        time.sleep(random.uniform(0.5, 1.5))

def consumer(q: Queue) -> None:
    print(f"🍽️  消费者启动 | PID:{os.getpid()}")
    while True:
        item = q.get(True)   # 阻塞直到队列有数据
        if item == "END":
            print("🍽️  收到结束信号,消费者退出")
            break
        print(f"📥 消费者取出:{item}")

if __name__ == '__main__':
    q = Queue()
    p_prod = Process(target=producer, args=(q,))
    p_cons = Process(target=consumer, args=(q,))
    p_prod.start()
    p_cons.start()
    p_prod.join()
    # 往队列塞结束信号(有几个消费者就塞几个)
    q.put("END")
    p_cons.join()

5.2 One-to-one communication:Pipe

Pipe is a bidirectional (default) or unidirectional pipe, especially suitable for quickly transferring data between two processes.

from multiprocessing import Process, Pipe

def send_msg(child_conn):
    msg = "👋 你好,接收者!这是通过 Pipe 传的消息"
    print(f"📤 发送者发送:{msg}")
    child_conn.send(msg)
    child_conn.close()

def recv_msg(parent_conn):
    msg = parent_conn.recv()
    print(f"📥 接收者收到:{msg}")
    parent_conn.close()

if __name__ == '__main__':
    # 创建双向管道,返回两个连接对象(父进程用 parent,子进程用 child)
    parent_conn, child_conn = Pipe()
    p = Process(target=send_msg, args=(child_conn,))
    p.start()
    recv_msg(parent_conn)
    p.join()

6. Modern Python high-level packaging:concurrent.futures.ProcessPoolExecutor

Introduced in Python 3.2concurrent.futuresmodule, unifies the multi-process and multi-thread API, the code is more concise, and it also supportsmapAdvanced functions such as batch submission, exception capture, and timeout control.

It is also recommended to use the context manager to automatically release resources:

from concurrent.futures import ProcessPoolExecutor
import os
import time
import random

def simple_task(task_id: int) -> str:
    start = time.time()
    print(f"🛠️  任务{task_id} | 进程:{os.getpid()}")
    time.sleep(random.uniform(0.2, 1.2))
    cost = round(time.time() - start, 2)
    return f"任务{task_id}耗时{cost}s"

if __name__ == '__main__':
    print(f"👨‍💼 主进程 | PID:{os.getpid()}")
    with ProcessPoolExecutor(max_workers=3) as executor:
        # 方式1:用 map 批量提交(按顺序返回结果)
        print("\n📋 用 map 批量提交:")
        for res in executor.map(simple_task, range(3)):
            print(f"📦 顺序结果:{res}")
        
        # 方式2:用 submit + as_completed(谁先完成就返回谁)
        print("\n🚀 用 submit + as_completed:")
        futures = [executor.submit(simple_task, i+3) for i in range(3)]
        for future in futures:
            print(f"📦 完成结果:{future.result()}")

7. Best Practices and Pitfall Guidelines

7.1 Avoid pitfalls 1: Must addif __name__ == '__main__'

Windows usespawnWhen starting a subprocess in this way, the current module will be re-imported as the entry point of the subprocess. If this line is not added, the child process will execute all top-level code again, resulting in unlimited process creation and error reporting. It is safest to add this sentence to all multi-process code.

7.2 Avoid Pitfall 2: Impact of GIL

The GIL (Global Interpreter Lock) of the Python interpreter ensures that only one thread executes Python bytecode at the same time. Therefore, multi-threading cannot use multi-core CPUs to do true parallel computing, but it can be competent for I/O-intensive tasks (because the GIL will be released during I/O operations).

  • Suitable for multi-process: CPU-intensive tasks (data compression, image processing, mathematical calculations, etc.)
  • Suitable for multi-threaded/asynchronous I/O: I/O intensive tasks (network requests, file reading and writing, etc.)

7.3 Avoid Pitfall 3: Resource Sharing

Try not to share state directly, and use Queue/Pipe to deliver messages to avoid deadlocks and data competition. If small amounts of data must be shared, usemultiprocessing.ValueorArray, and must be locked and protected:

from multiprocessing import Process, Value

def add_num(num: Value) -> None:
    for _ in range(10000):
        with num.get_lock():     # 加锁修改共享变量
            num.value += 1

if __name__ == '__main__':
    shared_num = Value('i', 0)   # 创建共享的 int,初始为 0
    p1 = Process(target=add_num, args=(shared_num,))
    p2 = Process(target=add_num, args=(shared_num,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(f"🔢 共享变量最终值:{shared_num.value}")  # 应为 20000

7.4 Avoid pitfall 4: Child process exceptions will not be propagated automatically

If an exception is thrown inside the child process, the parent process will not see it by default and will onlyjoin()After seeing non-zeroexitcode
useProcessPoolExecutoroffuture.result()orPoolofget()Method, you can catch the exception of the child process and expose the error explicitly.


8. Quick summary

ImplementationApplicable scenariosAdvantagesDisadvantages
os.fork()Unix-like low-level experimentsSimpleNot cross-platform and unsafe
multiprocessing.ProcessFinely control a single sub-processCross-platform, full-featuredTroublesome batch task management
multiprocessing.PoolBatch CPU-intensive tasksCross-platform, process reuseAPI not as concise as high-level packaging
concurrent.futures.ProcessPoolExecutorModern Python batch tasksUnified API, concise code, supports advanced functionsLess fine controlProcess
subprocessCall external commands/programsSafe, controllable, full-featuredNot suitable for Python internal function tasks

Recommended choice for modern Python (≥3.5):

  1. Batch TaskProcessPoolExecutor
  2. Fine control of individual child processesProcess
  3. Call external commandssubprocess.run() / Popen

Proper use of multiple processes can fully unleash the potential of multi-core CPUs and make your programs fly! 🚀