多进程

Python多进程编程指南

1. 进程基础

1.1 进程概念

进程是操作系统进行资源分配和调度的基本单位。每个进程都有自己独立的内存空间,使得多个进程可以同时运行而互不干扰。

Python提供了多种方式来实现多进程编程,包括:

  • os.fork() (Unix/Linux/macOS)
  • multiprocessing 模块 (跨平台)
  • concurrent.futures 模块 (Python 3.2+)
  • subprocess 模块 (运行外部命令)

2. 使用fork创建进程

Unix/Linux系统提供了fork()系统调用,它创建当前进程的一个副本(子进程)。

import os

print(f'Process ({os.getpid()}) start...')
pid = os.fork()  # 只在Unix/Linux/macOS上有效

if pid == 0:
    # 子进程
    print(f'I am child process ({os.getpid()}) and my parent is {os.getppid()}.')
else:
    # 父进程
    print(f'I ({os.getpid()}) just created a child process ({pid}).')

注意:Windows系统没有fork()调用,因此这段代码在Windows上无法运行。

3. 跨平台多进程编程

Python的multiprocessing模块提供了跨平台的多进程支持。

3.1 使用Process类

from multiprocessing import Process
import os

def run_proc(name):
    print(f'Run child process {name} ({os.getpid()})...')

if __name__ == '__main__':
    print(f'Parent process {os.getpid()}.')
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()  # 等待子进程结束
    print('Child process end.')

3.2 进程池(Pool)

当需要创建大量子进程时,使用进程池更高效:

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print(f'Run task {name} ({os.getpid()})...')
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print(f'Task {name} runs {end - start:.2f} seconds.')

if __name__ == '__main__':
    print(f'Parent process {os.getpid()}.')
    with Pool(4) as p:  # 推荐使用上下文管理器
        for i in range(5):
            p.apply_async(long_time_task, args=(i,))
        print('Waiting for all subprocesses done...')
        p.close()  # 关闭进程池,不再接受新任务
        p.join()   # 等待所有子进程完成
    print('All subprocesses done.')

最佳实践

  1. 使用with语句管理进程池资源
  2. 默认池大小等于CPU核心数
  3. apply_async用于非阻塞调用,apply用于阻塞调用

4. 运行外部进程

subprocess模块可以运行外部命令并管理其输入/输出:

4.1 简单调用

import subprocess

# 运行简单命令
result = subprocess.run(['ls', '-l'], capture_output=True, text=True)
print(result.stdout)

4.2 复杂交互

import subprocess

# 与子进程交互
with subprocess.Popen(
    ['python3', '-i'], 
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    text=True
) as proc:
    out, err = proc.communicate('print("Hello from subprocess")\n')
    print(out)

5. 进程间通信

5.1 使用Queue

from multiprocessing import Process, Queue
import os, time, random

def write(q):
    print(f'Process to write: {os.getpid()}')
    for value in ['A', 'B', 'C']:
        print(f'Put {value} to queue...')
        q.put(value)
        time.sleep(random.random())

def read(q):
    print(f'Process to read: {os.getpid()}')
    while True:
        value = q.get(True)
        print(f'Get {value} from queue.')

if __name__ == '__main__':
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    
    pw.start()
    pr.start()
    
    pw.join()
    pr.terminate()  # 终止读进程

5.2 使用Pipe

from multiprocessing import Process, Pipe

def sender(conn):
    conn.send('Hello from sender')
    conn.close()

def receiver(conn):
    msg = conn.recv()
    print(f'Received: {msg}')
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=sender, args=(child_conn,))
    p.start()
    receiver(parent_conn)
    p.join()

6. 现代多进程编程

Python 3.2+引入了concurrent.futures模块,提供了更高级的接口:

from concurrent.futures import ProcessPoolExecutor
import os

def task(name):
    print(f'Running task {name} in process {os.getpid()}')
    return name.upper()

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(task, f'task-{i}') for i in range(5)]
        for future in futures:
            print(f'Result: {future.result()}')

7. 最佳实践与注意事项

  1. 平台兼容性

    • 在Unix-like系统上,multiprocessing使用fork()
    • 在Windows上,使用spawn启动方式
  2. if name == 'main'

    • 在Windows上必须使用这个保护,避免子进程重复执行代码
  3. 资源共享

    • 避免共享状态,使用消息传递(Queue/Pipe)
    • 如果需要共享数据,使用multiprocessing.ValueArray
  4. 性能考虑

    • 进程创建开销比线程大
    • 适合CPU密集型任务
    • 对于IO密集型任务,考虑多线程或异步IO
  5. 错误处理

    • 子进程中的异常不会自动传播到父进程
    • 使用Process.exitcode检查子进程退出状态

8. 总结

Python提供了丰富的多进程编程工具,从低级的os.fork()到高级的concurrent.futures。选择哪种方式取决于:

  • 平台要求(跨平台/Unix-only)
  • 任务类型(CPU密集型/IO密集型)
  • 代码复杂度需求

对于现代Python开发,推荐:

  1. 简单任务:multiprocessing.Pool
  2. 复杂任务:concurrent.futures.ProcessPoolExecutor
  3. 精细控制:multiprocessing.Process

通过合理使用多进程,可以充分利用多核CPU的计算能力,显著提高程序性能。