目 录CONTENT

文章目录

进程、线程、同步/异步 指南

~梓
2026-04-11 / 0 评论 / 0 点赞 / 4 阅读 / 0 字
温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

进程、线程、同步/异步 完全指南

一、进程(Process)

什么是进程?

进程是操作系统进行资源分配和调度的基本单位。每个进程都有自己独立的内存空间、文件描述符等资源。

import os
from multiprocessing import Process

# 每个进程有独立的 PID
print(f"主进程 PID: {os.getpid()}")

def worker(name):
    print(f"子进程 '{name}' PID: {os.getpid()}")
    # 子进程有自己的内存空间
    x = 100  # 这个变量只属于这个子进程

# 创建新进程
p = Process(target=worker, args=("Worker1",))
p.start()
p.join()

进程的特点

# 进程间内存隔离的演示
from multiprocessing import Process

shared_list = []  # 主进程的列表

def add_item():
    shared_list.append(1)  # 子进程会复制一份列表,不影响主进程
    print(f"子进程中的列表: {shared_list}")

p = Process(target=add_item)
p.start()
p.join()
print(f"主进程中的列表: {shared_list}")  # 仍然是 [],互不影响

# 输出:
# 子进程中的列表: [1]
# 主进程中的列表: []

关键特性:

  • ✅ 资源独立,一个进程崩溃不影响其他进程
  • ✅ 真正并行(多核 CPU 可同时运行多个进程)
  • ❌ 创建/销毁开销大
  • ❌ 进程间通信复杂(需要 IPC:管道、队列、共享内存等)
  • ❌ 切换开销大

二、线程(Thread)

什么是线程?

线程是操作系统进行调度的基本单位。一个进程内的多个线程共享进程的内存空间和资源。

import threading
import os

print(f"主线程 PID: {os.getpid()}, 线程ID: {threading.current_thread().name}")

def worker(name):
    print(f"子线程 '{name}' PID: {os.getpid()}, 线程ID: {threading.current_thread().name}")
    # 线程共享进程的内存空间
    shared_list.append(1)  # 可以访问并修改主线程的变量

shared_list = []  # 所有线程共享这个列表
t = threading.Thread(target=worker, args=("Worker1",))
t.start()
t.join()
print(f"主线程中的列表: {shared_list}")  # [1],被子线程修改了

# 输出:
# 主线程 PID: 12345, 线程ID: MainThread
# 子线程 'Worker1' PID: 12345, 线程ID: Thread-1
# 主线程中的列表: [1]

线程的特点

import threading
import time

# 多个线程共享全局变量
counter = 0

def increment():
    global counter
    for _ in range(1000000):
        counter += 1  # 这个操作不是原子的,需要加锁

def increment_with_lock():
    global counter
    for _ in range(1000000):
        with lock:
            counter += 1

# 无锁的情况(会有竞态条件)
counter = 0
threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"无锁结果: {counter}")  # 结果不确定,通常小于 4000000

# 有锁的情况
counter = 0
lock = threading.Lock()
threads = [threading.Thread(target=increment_with_lock) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"有锁结果: {counter}")  # 正好 4000000

关键特性:

  • ✅ 创建/销毁开销小
  • ✅ 切换开销小
  • ✅ 线程间通信简单(共享内存)
  • ❌ 需要处理数据同步(锁、信号量等)
  • ❌ 一个线程崩溃可能影响整个进程
  • ❌ 在 CPython 中受 GIL 限制

三、进程 vs 线程 对比

import threading
import multiprocessing
import time
import os

def cpu_intensive(n):
    """CPU 密集型任务"""
    total = 0
    for i in range(n):
        total += i ** 2
    return total

def compare_performance():
    """对比进程和线程在 CPU 密集型任务上的表现"""
    n_workers = 4
    n_iterations = 10_000_000
  
    # 多线程(受 GIL 影响)
    start = time.time()
    threads = [threading.Thread(target=cpu_intensive, args=(n_iterations,)) 
               for _ in range(n_workers)]
    for t in threads: t.start()
    for t in threads: t.join()
    thread_time = time.time() - start
  
    # 多进程(不受 GIL 影响)
    start = time.time()
    processes = [multiprocessing.Process(target=cpu_intensive, args=(n_iterations,)) 
                 for _ in range(n_workers)]
    for p in processes: p.start()
    for p in processes: p.join()
    process_time = time.time() - start
  
    print(f"多线程耗时: {thread_time:.2f}s (受到 GIL 限制)")
    print(f"多进程耗时: {process_time:.2f}s (真正的并行)")
  
    # 输出示例(4核CPU):
    # 多线程耗时: 8.5s (串行执行)
    # 多进程耗时: 2.3s (并行执行)

# 内存隔离演示
def memory_isolation():
    shared = []  # 主进程的列表
  
    def modify_list():
        shared.append("item")
        print(f"子进程看到: {shared}")
  
    p = multiprocessing.Process(target=modify_list)
    p.start()
    p.join()
    print(f"主进程看到: {shared}")  # 还是 [],因为内存隔离

四、同步 vs 异步

同步(Synchronous)

任务按顺序执行,必须等待当前任务完成后才能继续下一个任务。

import time

def sync_example():
    """同步执行:等一个完成再做下一个"""
    print("开始同步任务")
  
    def task(name, duration):
        print(f"{name} 开始")
        time.sleep(duration)  # 模拟耗时操作
        print(f"{name} 完成")
        return f"{name} 结果"
  
    # 串行执行
    result1 = task("任务A", 2)
    result2 = task("任务B", 1)
    result3 = task("任务C", 1.5)
  
    print(f"所有结果: {result1}, {result2}, {result3}")
    # 总耗时: 2 + 1 + 1.5 = 4.5 秒

sync_example()

异步(Asynchronous)

发起任务后不等待完成,可以继续执行其他任务,完成后通过回调或轮询获取结果。

import asyncio
import time

async def async_example():
    """异步执行:并发等待,充分利用等待时间"""
    print("开始异步任务")
  
    async def task(name, duration):
        print(f"{name} 开始")
        await asyncio.sleep(duration)  # 模拟异步 I/O 操作
        print(f"{name} 完成")
        return f"{name} 结果"
  
    # 并发执行
    tasks = [
        task("任务A", 2),
        task("任务B", 1),
        task("任务C", 1.5)
    ]
    results = await asyncio.gather(*tasks)
  
    print(f"所有结果: {results}")
    # 总耗时: max(2, 1, 1.5) = 2 秒(并发执行)

# 运行异步代码
asyncio.run(async_example())

同步 vs 异步 直观对比

import asyncio
import threading
import time

# 模拟网络请求
def sync_network_request(url):
    """同步网络请求"""
    print(f"同步请求 {url} 开始")
    time.sleep(1)  # 模拟网络延迟
    print(f"同步请求 {url} 完成")
    return f"{url} 数据"

async def async_network_request(url):
    """异步网络请求"""
    print(f"异步请求 {url} 开始")
    await asyncio.sleep(1)  # 模拟异步网络延迟
    print(f"异步请求 {url} 完成")
    return f"{url} 数据"

def sync_demo():
    """同步方式:串行等待"""
    start = time.time()
    urls = ["url1", "url2", "url3"]
    results = []
    for url in urls:
        results.append(sync_network_request(url))
    print(f"同步总耗时: {time.time() - start:.2f}s")
    # 输出: 同步总耗时: 3.01s

async def async_demo():
    """异步方式:并发等待"""
    start = time.time()
    urls = ["url1", "url2", "url3"]
    tasks = [async_network_request(url) for url in urls]
    results = await asyncio.gather(*tasks)
    print(f"异步总耗时: {time.time() - start:.2f}s")
    # 输出: 异步总耗时: 1.01s

# 执行对比
sync_demo()
asyncio.run(async_demo())

五、阻塞 vs 非阻塞

阻塞(Blocking)

调用函数后,当前线程被挂起,等待操作完成。

import time
import socket

def blocking_example():
    """阻塞 I/O 示例"""
    print("1. 开始阻塞连接")
  
    # socket.connect() 会阻塞直到连接成功
    sock = socket.socket()
    sock.connect(("www.python.org", 80))  # 阻塞在这里
    print("2. 连接成功")
  
    # time.sleep() 也是阻塞的
    time.sleep(2)  # 线程被挂起 2 秒
    print("3. 睡眠结束")
  
    # input() 也是阻塞的
    user_input = input("4. 等待输入: ")  # 阻塞等待用户输入
    print(f"5. 收到输入: {user_input}")

# 阻塞期间,线程不能做任何其他事情

非阻塞(Non-blocking)

调用函数后立即返回,不等待操作完成。

import socket

def nonblocking_example():
    """非阻塞 I/O 示例(设置套接字为非阻塞)"""
    sock = socket.socket()
    sock.setblocking(False)  # 设置为非阻塞模式
  
    try:
        # 不会阻塞,但可能立即抛出错误
        sock.connect(("www.python.org", 80))
    except BlockingIOError:
        print("连接正在进行中(非阻塞模式)")
        # 可以在这里做其他事情,稍后再检查连接状态
  
    # 立即返回,不需要等待
    print("可以执行其他代码")

# 注意:非阻塞模式通常需要配合 select/poll/epoll 使用

六、并发 vs 并行

这是最容易被混淆的两个概念:

并发(Concurrency)

逻辑上同时处理多件事情,但不一定在同一时刻执行。

import threading
import time

def concurrency_demo():
    """并发:多个线程交替执行"""
  
    def task(name):
        for i in range(3):
            print(f"{name} 执行步骤 {i}")
            time.sleep(0.1)  # 主动让出 CPU
  
    # 两个线程并发执行(在单核 CPU 上)
    t1 = threading.Thread(target=task, args=("任务A",))
    t2 = threading.Thread(target=task, args=("任务B",))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
  
    # 可能的输出(交错执行):
    # 任务A 执行步骤 0
    # 任务B 执行步骤 0
    # 任务A 执行步骤 1
    # 任务B 执行步骤 1
    # 任务A 执行步骤 2
    # 任务B 执行步骤 2

并行(Parallelism)

物理上同时执行多个任务(需要多核 CPU)。

from multiprocessing import Pool
import time

def parallel_demo():
    """并行:多个进程同时执行"""
  
    def heavy_computation(n):
        """CPU 密集型计算"""
        total = 0
        for i in range(n):
            total += i ** 2
        return total
  
    # 使用进程池,在多个 CPU 核心上并行执行
    with Pool(processes=4) as pool:
        results = pool.map(heavy_computation, [10_000_000] * 4)
  
    # 4 个任务真正地同时在不同核心上运行
    print(f"计算结果: {results}")

# 并发的比喻:一个人同时煮咖啡和烤面包(交替进行)
# 并行的比喻:两个人,一个煮咖啡,一个烤面包(同时进行)

七、GIL 到底影响哪部分?

这是最关键的部分!GIL 不影响所有代码

GIL 影响的代码

import threading
import time
import numpy as np

# ❌ GIL 影响:纯 Python 的 CPU 密集型代码
def pure_python_cpu():
    """纯 Python 计算 - 受 GIL 严重影响"""
    total = 0
    for i in range(100_000_000):  # 纯 Python 循环
        total += i ** 2
    return total

# 多线程执行纯 Python 计算
def test_gil_effect():
    start = time.time()
    threads = []
    for _ in range(4):
        t = threading.Thread(target=pure_python_cpu)
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    print(f"纯 Python 多线程耗时: {time.time() - start:.2f}s")
    # 输出: ~8s (几乎等于单线程 × 4,因为 GIL 串行化)

GIL 不影响的代码

# ✅ GIL 不影响 1: I/O 操作
def io_operation():
    """I/O 操作会自动释放 GIL"""
    import requests
    response = requests.get("https://www.python.org")  # 释放 GIL
    return response.status_code

# ✅ GIL 不影响 2: NumPy/SciPy 等 C 扩展
def numpy_operation():
    """NumPy 的底层 C 代码会释放 GIL"""
    import numpy as np
    a = np.random.rand(1000, 1000)
    b = np.random.rand(1000, 1000)
    # 这个矩阵乘法在 C 层执行,会释放 GIL
    c = np.dot(a, b)  # 多核并行!
    return c

# ✅ GIL 不影响 3: 系统调用
def system_call():
    """大多数系统调用会释放 GIL"""
    import os
    # 文件读写会释放 GIL
    with open("/dev/null", "w") as f:
        f.write("x" * 1000000)
  
    # time.sleep 也会释放 GIL
    time.sleep(1)

# ✅ GIL 不影响 4: 加密/压缩等 C 库
def c_library_operation():
    """调用 C 库通常会释放 GIL"""
    import hashlib
    data = b"x" * 10000000
    # hashlib 的底层 C 实现会释放 GIL
    hash_obj = hashlib.sha256(data)  # 多线程中也能并行
    return hash_obj.hexdigest()

实际测试:GIL 的边界

import threading
import time
import numpy as np

def benchmark_gil_boundary():
    """测试 GIL 影响边界"""
  
    # 1. 纯 Python 循环(受 GIL 影响)
    def python_loop():
        total = 0
        for i in range(50_000_000):
            total += i
  
    # 2. NumPy 操作(不受 GIL 影响)
    def numpy_ops():
        arr = np.random.rand(1000, 1000)
        result = np.dot(arr, arr.T)
  
    # 3. 混合操作(部分受 GIL 影响)
    def mixed_ops():
        arr = np.random.rand(1000, 1000)
        result = np.dot(arr, arr.T)  # 无 GIL
        total = 0
        for i in range(10_000_000):  # 有 GIL
            total += i
  
    # 测试纯 Python
    start = time.time()
    threads = [threading.Thread(target=python_loop) for _ in range(4)]
    for t in threads: t.start()
    for t in threads: t.join()
    python_time = time.time() - start
  
    # 测试 NumPy
    start = time.time()
    threads = [threading.Thread(target=numpy_ops) for _ in range(4)]
    for t in threads: t.start()
    for t in threads: t.join()
    numpy_time = time.time() - start
  
    print(f"纯 Python 循环(受 GIL 限制): {python_time:.2f}s")
    print(f"NumPy 操作(突破 GIL): {numpy_time:.2f}s")
  
# benchmark_gil_boundary()

八、总结对比表

特性 进程 线程(CPython) 异步
资源开销 很大 较小 极小
切换开销 几乎无
数据共享 困难(IPC) 容易(共享内存) 容易
GIL 影响 无影响 CPU 密集型受影响 I/O 密集型受影响
真正并行 ✅ 是 ❌ 否(GIL限制) ❌ 否
适用场景 CPU 密集型 I/O 密集型 高并发 I/O
调试难度 中等 困难(竞态条件) 较难
崩溃影响 隔离 可能影响整个进程 影响当前任务

九、实战选择指南

def choose_concurrency_model(task_type, requirements):
    """根据场景选择合适的并发模型"""
  
    if task_type == "cpu_intensive":
        if requirements.get("need_shared_memory", False):
            return "使用 multiprocessing + 共享内存"
        else:
            return "使用 ProcessPoolExecutor"
  
    elif task_type == "io_intensive":
        if requirements.get("high_concurrency", 0) > 1000:
            return "使用 asyncio(高并发 I/O)"
        else:
            return "使用 ThreadPoolExecutor(简单场景)"
  
    elif task_type == "mixed":
        return "组合使用:asyncio + ProcessPoolExecutor"
  
    elif task_type == "real_time":
        return "考虑使用 C 扩展或 PyPy"
  
    else:
        return "单线程足够,不要过度设计"

# 使用示例
print(choose_concurrency_model("cpu_intensive", {}))
# 输出: 使用 ProcessPoolExecutor

print(choose_concurrency_model("io_intensive", {"high_concurrency": 10000}))
# 输出: 使用 asyncio(高并发 I/O)

理解了这些基础概念后,GIL 的影响范围就清晰了:

  • GIL 只影响纯 Python 代码的 CPU 密集型任务
  • I/O 操作、C 扩展、NumPy 等都不受 GIL 限制
  • 多进程是绕过 GIL 的最简单方式
0

评论区