Python Multithreaded Programming Guide

Concurrent programming is a common method to improve the performance of Python applications. Among them, multi-threading is particularly popular in I/O-intensive scenarios (such as crawlers, database operations, and network requests) because of its low creation overhead, convenient data sharing, and high ease of use. This article will take you to quickly sort out the core knowledge points of Python multi-threading, from basic usage, common pitfalls, GIL restrictions to best practices. The code is highlighted and the paragraphs are clear. It can be completed in 3000 words~


1. Thread basics

Python via the standard librarythreadingProvides multi-thread support (the bottom layer encapsulates the system thread interface: Win32 threads are used under Windows, and pthread is used under Linux/macOS). Each process will have a main thread (MainThread) by default, through which we can create additional child threads to perform concurrent tasks.

1.1 Two ways to create threads

The most commonly used is functional creation, the code is concise and intuitive; if you need more fine-grained control (such as reusing thread classes), you can use inherited class creation.

Functional creation (recommended for entry-level use)

import threading
import time

def worker():
    """子线程要做的事:假装忙一会儿(sleep模拟真实I/O)"""
    print(f'✅ 子线程 {threading.current_thread().name} 启动...')
    time.sleep(1)
    print(f'❌ 子线程 {threading.current_thread().name} 结束')

# 创建线程对象:target是要执行的函数,name给线程起个便于调试的名字
t = threading.Thread(target=worker, name='TestWorker')
t.start()  # 启动线程(只是告诉操作系统准备好了,具体什么时候执行看调度器)
t.join()   # join() 会**阻塞主线程**,直到子线程执行完,避免主线程提前退出看不到结果
print(f'🏁 主线程 {threading.current_thread().name} 结束')

Inherited class creation (suitable for complex reuse)

import threading
import time

class CustomWorker(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)  # 必须先调用父类的初始化方法

    # 重写run()方法,子线程启动后执行的就是这个方法
    def run(self):
        print(f'✅ 自定义子线程 {self.name} 启动...')
        time.sleep(1)
        print(f'❌ 自定义子线程 {self.name} 结束')

t = CustomWorker(name='CustomTest')
t.start()
t.join()
print(f'🏁 主线程结束')

1.2 Commonly used thread attributes/methods

threadingThe module comes with several practical global tools, which are very convenient for debugging and managing threads:

  • threading.current_thread(): Get the currently executing thread instance
  • threading.active_count(): Returns the current total number of active threads (including the main thread)
  • threading.enumerate(): Returns the current list of all active threads

2. Thread synchronization and locking

Multithreading has a fatal advantage and a fatal pitfall: they share the memory space of the same process. If multiple threads modify the same global variable at the same time, a Race Condition will occur, resulting in unpredictable results.

2.1 Race condition example

For example, in the following accumulator, 10 threads each add 100,000 times. The expected result is 1 million, but the actual result is different every time it is run, and the probability is less than 1 million:

import threading

counter = 0  # 全局共享变量

def unsafe_increment():
    global counter
    for _ in range(100000):
        # 这一行代码底层其实是3步:读取counter、加1、写回counter
        # 多线程切换可能发生在这3步之间!
        counter += 1

threads = []
for _ in range(10):
    t = threading.Thread(target=unsafe_increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"❌ 不安全累加器最终值: {counter}")  # 多次运行试试,几乎不会是1000000

2.2 Use Lock (mutex lock) to resolve competition

threading.LockIt is the simplest synchronization primitive. Its rule is: Only one thread can get the lock at the same time, and other threads must wait until the lock is released.

RecommendedwithStatement management lock, it will automatically handle acquisition and release to avoid deadlock caused by forgetting to release the lock:

import threading

counter = 0
lock = threading.Lock()  # 创建一个全局互斥锁

def safe_increment():
    global counter
    for _ in range(100000):
        # with lock会自动获取锁,代码块结束自动释放
        with lock:
            counter += 1

threads = []
for _ in range(10):
    t = threading.Thread(target=safe_increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"✅ 安全累加器最终值: {counter}")  # 无论运行多少次,都是1000000

2.3 Other commonly used synchronization primitives

In addition to the basicLock, and there are several advanced primitives to solve more complex scenarios:

  • threading.RLock: Reentrant lock, the same thread can acquire it multiple times (suitable for recursive calls)
  • threading.Condition: Condition variable, used in combination with Lock, used for "wait-notification" communication between threads
  • threading.Semaphore: Semaphore, controls the number of threads accessing a certain resource at the same time (such as limiting the number of database connections)
  • threading.Event: Event Object, used for simple "on/off" notification between threads

3. The unavoidable GIL (global interpreter lock)

Many beginners will wonder: "With Python's multi-threading, why do CPU-intensive tasks (such as scientific computing and image processing) slow down?" The answer lies in the GIL (Global Interpreter Lock) of the CPython interpreter.

3.1 The essence of GIL

GIL is an implementation detail of the CPython interpreter (not available in Jython, IronPython and other interpreters). Its rules are: At any time, only one thread is executing Python bytecode, and other threads must wait for the GIL to be released.

There are two main timings for GIL release:

  1. Encountered I/O operation (such astime.sleep(), network requests, reading and writing files)
  2. After Python 3.2+, the interpreter will forcibly switch threads every 15 milliseconds (regardless of whether there is I/O or not)

3.2 Impact of GIL

  • I/O intensive tasks: multi-threading can still benefit! Because the GIL will be released when I/O is encountered, the CPU will not wait idle.
  • CPU-intensive tasks: Multi-threading cannot effectively utilize multiple cores! Because only one thread is running bytecode at the same time, forced switching will increase the overhead.
  • C extended calculation: If the calculation logic is placed in a C/C++ extension, you can manually release the GIL. At this time, multi-threads can also run multi-core

3.3 Solutions for dealing with GIL restrictions

  1. Use multiple processes:multiprocessingModule creates independent interpreter and GIL for each process, completely bypassing limitations (suitable for CPU intensive)
  2. Asynchronous Programming:asyncioIt is single-threaded collaborative concurrency without GIL switching overhead (suitable for high concurrent I/O intensive)
  3. Use other interpreters: Jython (based on Java) and IronPython (based on .NET) do not have GIL, but the ecology is not as complete as CPython
  4. C extension/Cython: Use C/Cython to implement and release GIL in the core computing part

4. Thread pool and best practices

It is troublesome to manually create and manage a large number of threads (such as controlling the number of threads, recycling resources, and handling exceptions). Python 3.2+ providesconcurrent.futures.ThreadPoolExecutorIt can help us complete these things automatically.

Basic usage

from concurrent.futures import ThreadPoolExecutor
import time

def square_task(n):
    """模拟一个带I/O的计算任务"""
    print(f"处理任务 {n}...")
    time.sleep(1)  # 模拟I/O
    return n * n

# 使用上下文管理器with,自动关闭线程池
with ThreadPoolExecutor(max_workers=4) as executor:  # max_workers建议设为CPU核心数*2左右(I/O密集型)
    # 提交10个任务,返回Future对象列表
    futures = [executor.submit(square_task, i) for i in range(10)]
    # 获取所有任务的结果(按提交顺序)
    results = [f.result() for f in futures]

print(f"🏁 所有任务结果: {results}")

Get results quickly (not in order)

If you don't need to get the results in the order of submission, you can useexecutor.map()orconcurrent.futures.as_completed()

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def square_task(n):
    time.sleep(n % 3)  # 让任务执行时间不一样
    return n * n

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(square_task, i): i for i in range(10)}
    # as_completed()会在任意一个任务完成时立即返回它的Future
    for future in as_completed(futures):
        n = futures[future]
        try:
            result = future.result()
            print(f"任务 {n} 完成,结果: {result}")
        except Exception as e:
            print(f"任务 {n} 出错: {e}")

4.2 Best practices for multi-threaded programming

  1. Try to avoid sharing global variables: Use locks first when they must be shared, and the granularity of the locks must be as small as possible (locking the entire loop will turn multi-threads into single-threads, which is a waste of time)
  2. Prioritize the use of thread-safe data structures: For examplequeue.Queue(Thread-safe queue, suitable for inter-thread communication)
  3. Use thread local storage: Each thread holds a copy of data independently to avoid sharing (see next section)
  4. Properly handle thread exceptions: Exceptions of sub-threads will not be propagated to the main thread by default. They must be captured inside the task function, or use Futureresult()/exception()get
  5. Don’t abuse multi-threading: When the number of tasks is small or purely CPU-intensive, multi-threading may be slower.

5. Thread local storage

If each thread needs to save its own temporary data (such as database connection, user session), you can usethreading.local()Create a thread local storage object, and each thread can read and write it independently.

import threading

# 创建一个全局的线程局部存储对象
local_db = threading.local()

def init_db_connection():
    """为当前线程初始化一个独立的数据库连接(模拟)"""
    local_db.conn = f"DBConn-{threading.current_thread().name}"

def use_db():
    """使用当前线程的数据库连接"""
    print(f"{threading.current_thread().name} 使用连接: {local_db.conn}")

def worker():
    init_db_connection()
    use_db()

threads = [
    threading.Thread(target=worker, name=f"Worker-{i}")
    for i in range(3)
]

for t in threads:
    t.start()
for t in threads:
    t.join()

6. Multi-threading vs. multi-process selection guide

FeaturesMulti-threadingMulti-process
Memory sharingShared (convenient but prone to competition)Not shared (independent process space)
Creation/switching overheadSmall (only a few resources such as stack)Large (independent interpreter, memory space)
Communication costLow (direct reading and writing shared memory locking)High (requires IPC: pipes, queues, etc.)
GIL impactRestricted (available for I/O-intensive applications)Unrestricted (suitable for CPU-intensive applications)
Applicable scenariosCrawlers, network requests, database operationsScientific computing, image processing, video encoding

7. Summary of modern Python concurrent programming

  1. I/O intensive, low concurrency: use directlyThreadPoolExecutor
  2. I/O intensive, high concurrency: priorityasyncio(Single-threaded cooperative, no switching overhead)
  3. CPU intensive: usemultiprocessingorProcessPoolExecutor
  4. Complex Scenario: For example, if distributed execution is required, useCelerywaiting task queue

Python 3.10+ also provides more powerful concurrency tools (such asasyncio.TaskGroup, more flexible Executor timeout control), developers can choose the appropriate model according to specific needs~