1.多线程、多进程、协程的区别

核心比喻:把你的电脑 CPU 想象成一个工厂。


第一层:进程 (Process) —— 厂房 / 车间

概念: 进程是操作系统分配资源的最小单位。 比喻:

  • 你的电脑是一个工业园区。
  • 当你双击运行一个 Python 脚本时,操作系统就给了你一栋厂房(进程)
  • 这个厂房有自己独立的电表(内存空间)、进货通道(文件句柄)和独立的大门钥匙。
  • 关键特点:厂房和厂房之间是完全隔离、互不干扰的。A 厂房失火(崩溃),绝对不会烧到 B 厂房。同样,A 厂房里的机器也没法直接拿 B 厂房里的零件(进程间通信很麻烦,需要专门拿货车拉——比如使用 Queue 或 Pipe)。

Python API: multiprocessing 模块。


第二层:线程 (Thread) —— 工人

概念: 线程是 CPU 调度的最小单位。 比喻:

  • 线程必须依附于进程而存在。
  • 一个厂房(进程)里,默认至少有一个**工人(主线程)**在干活。
  • 如果活太多,你可以再招聘几十个工人进到这个厂房里一起干(多线程)。
  • 关键特点:这些工人在同一个厂房里,共享这里所有的资源和工具(内存、变量全共享)。这导致一个致命问题:如果两个工人同时抢着用一台切肉机(修改同一个公共变量),手可能就被切断了(数据错乱)。所以多线程编程**必须加锁(Lock)**排队使用资源。

Python API: threading 模块。


🚨 Python 的核心特色:令人又爱又恨的 GIL 锁

在说协程前,必须先讲 Python 多线程的一个“大坑”。 你知道吗?在标准 Python (CPython) 里,多线程是假的!

为什么? 因为 Python 的厂房大门上,挂着一把超级大锁,叫 GIL(全局解释器锁)。 这把锁规定:不管你这厂子里招了多少个工人(开了多少个线程),同一时刻,只能有 1 个工人拿到钥匙,去使用 CPU 这台核心机器!

  • 结果:如果你的任务是疯狂进行数学计算(CPU 密集型),你开 10 个 Python 线程,不仅不会快,反而可能更慢!(因为 10 个工人一直在抢那一把大门钥匙,浪费了时间)。
  • 那 Python 多线程有啥用?:对于**网络爬虫、读写文件(I/O 密集型)**极度有用!当工人 A 等待网页下载时(发呆时间),他会被 GIL 踢走,工人 B 马上拿着钥匙去下载另一个网页。

第三层:协程 (Coroutine) —— 千手观音 / 超级打工人

概念: 协程是用户态的轻量级线程。由程序员自己(或框架)在代码里调度,操作系统根本不知道它的存在。 比喻:

  • 前面说,Python 因为有 GIL 锁,一个厂房只能有一个工人在用 CPU。
  • 那我们干脆就只招一个老员工(单线程)
  • 但是这个老员工练就了《千手观音》绝技!
  • 老员工在煮第一壶水(发起 HTTP 请求 A),水还没开(等待 I/O 网络返回的时间)。换作普通工人,他就在那傻等。但在协程里,他立马转身去跑去切菜(发起 HTTP 请求 B),切着切着转身又去洗锅……
  • 没有任何抢锁的冲突,也没有操作系统频繁切换工人的开销(这就避免了“上下文切换”的巨大损耗)。一个工人,硬生生干出了一万个工人的效率!

Python API: asyncio 模块 (配合 async 和 await 关键字)。这是现代 Python 开发(如 FastAPI 高并发)的核心主流!


终极实战总结:我该选哪一个?

这就是你写代码时面对的真正抉择。请死死记住这张表:

并发模型 适合什么任务? 解释 Python 实现方式
多核多进程
(多盖厂房)
CPU 密集型
(疯狂计算、图像处理、压缩视频)
既然一个厂房里有 GIL 锁,那我就直接盖 4 个相互独立的厂房(开 4 个进程),彻底绕开 GIL 锁,真正利用电脑的 4 核 CPU 同步计算! multiprocessing 库
多线程
(厂房里招工人)
I/O 密集型
(普通的网络爬虫不太多、简单的并发请求)
因为有发呆等待时间,GIL 会自动切换工人干活。代码写起来比较简单传统,适合老派程序员。 threading 库
异步协程
(单打工人开千手观音)
海量 I/O 密集型
(超高并发爬虫、FastAPI 后端服务器处理几万个连接、WebSocket)
现代 Python 的绝对王者!单核单线程跑满 CPU,没有加锁的烦恼,占用极低的内存就能扛住几万个网络连接,简直是为网络请求量身定做的。 asyncio 库 + aiohttp

总结

  • 只要是算数、处理大数据,用 多进程
  • 只要是现在的网络请求、写新项目后端,能用 协程 (asyncio) 就用协程,绝对香!
  • 要是老项目、或者库不支持协程,再退而求其次用 多线程

2.线程安全

一、 什么是“线程不安全”?(翻车现场)

想象一个极其简单的场景:厂房的墙上挂着一个小黑板,上面写着这个月卖出的货品总数 total_sales = 0。 现在有 2 个工人(线程 A 和 线程 B),他们的任务都是:去墙上把数字擦掉,加上 1,再重新写上墙。

正常情况下,如果 A 和 B 排队干活:

  1. A 看数字是 0,加 1,写 1。
  2. B 看数字是 1,加 1,写 2。 结果很完美,最终卖了 2 件。

但在“多线程”的世界里,是会随时被打断的! 看看真实的翻车情况:

  1. 线程 A 看到了黑板上写着 0
  2. 就在这个时候!操作系统的 CPU 调度把 A 强行踢开了(比如 A 的时间片用完了),换线程 B 上场。
  3. 线程 B 也跑去看黑板,此时黑板上还是 0(因为 A 还没来得及加 1 并写上去)。
  4. 线程 B 在脑子里算 0+1=1,然后自信地在黑板上写下了 1
  5. 此时线程 A 又被唤醒了,A 接着刚才的记忆,脑子里算着 0+1=1,也自信地在黑板上写下了 1

灾难发生了:两个工人明明各自加了一次,但最后黑板上的数字是 1,而不是 2!你的钱不翼而飞了。

只要多个线程同时读写同一个变量,最后的结果可能会因为 CPU 切换时机的不可控随机性导致完全错乱。这就是所谓的**“线程不安全”**(或者叫“数据竞争 / Race Condition”)。


二、 Python 代码演示“翻车现场”

你可以把下面这段代码放在你本地跑一下,结果会非常出人意料:

import threading

# 公共变量(墙上的小黑板)

money = 0

# 工人的老本行:把钱疯狂加 100 万次

def add_money():

global money

for _ in range(1000000):

# 这句代码看似简单,底层其实分三步:1.读值 2.加一 3.写回

# 随时可能在这三步的中间被打断!

money += 1

# 招两个工人(开两个线程)

thread_A = threading.Thread(target=add_money)

thread_B = threading.Thread(target=add_money)

thread_A.start()

thread_B.start()

thread_A.join()

thread_B.join()

# 理论上 2 个工人各加 100 万次,应该是 2000000

print(f"最终的钱: {money}")

# 你会发现:每次运行的结果都不一样,比如可能是 1546732,永远达不到 200万!

三、 什么是真正的“线程安全”?(以及如何实现)

线程安全的意思就是说:不管操作系统怎么丧心病狂地在各个不同线程之间切换,我的公共数据永远不会乱,永远等于预期的结果。

在 Python 里,要实现线程安全,通常有以下两种武器:

武器 1:加锁 (Lock) —— 最经典、最暴力的做法

如果怕别人打断,那我就把整个黑板锁起来! Python 提供了 threading.Lock()

import threading

money = 0

# 买一把锁

lock = threading.Lock()

def safe_add_money():

global money

for _ in range(1000000):

# 抢锁!抢到锁的人才能往下走,其他工人哪怕醒了,只要看锁着,就只能在门口死等(阻塞)

lock.acquire()

try:

money += 1 # 在锁里的操作,绝对不会被打断,称为“原子操作”

finally:

# 干完活,千万千万记得还要开锁放人!否则变成死锁(Deadlock),所有人都卡死

lock.release()

# 如果你用 with 语句,Python 会自动帮你上锁和解锁,写起来更优雅、更安全(防止你忘了 release):

def safe_add_money_with():

global money

for _ in range(1000000):

with lock:

money += 1

你如果用加上锁的版本重新跑,不管跑多少遍,结果绝对是精准的 2000000!

武器 2:使用线程安全的数据结构 (比如 Queue)

自己加锁很容易忘,稍微写错一点就会变成“死锁”导致程序直接卡死。 所以,Python 大神们早就帮我们写好了一些天生自带锁的“保险箱”。无论多少个工人同时去保险箱里塞东西或者拿东西,它内部都自动排好队了,绝对不会弄丢数据。

最经典的就是 queue.Queue (队列)。多线程爬虫时,通常不用 list 来保存爬到的网页链接,而是用 Queue,就是因为 Queue 是 线程安全的


总结

  • 线程不安全:多个人抢着用大铁锅,你刚放盐准备炒,别人一铲子把你的菜全盛走了。(导致数据错乱或者丢失)。
  • 线程安全:无论你这程序开多少个线程去运行,都和单线程跑出来的结果一模一样。
  • 怎么做到?:如果多线程要修改公共变量,老老实实加上 threading.Lock();如果只是传输数据,直接用 queue.Queue 之类的线程安全结构。如果你只是数据不修改它,那不加锁也是线程安全的。

3.asyncio异步编程

一、asyncio 核心定位:解决 IO 密集型并发

asyncio 是 Python 用于编写单线程并发代码的标准库,核心解决IO 密集型任务(如网络请求、文件读写、数据库查询)的效率问题。

对比:同步 vs 多线程 vs asyncio

表格

模式 原理 适用场景 缺点
同步 顺序执行,IO 时阻塞等待 简单逻辑、CPU 密集 IO 等待时浪费 CPU 资源
多线程 多线程切换执行,IO 时切换线程 IO 密集、任务数少 线程切换开销大,GIL 限制并行
asyncio 单线程内协程切换,IO 时挂起 高并发 IO 密集 代码需异步化,不能有阻塞调用

核心优势:单线程 + 协程切换,开销远小于线程切换,轻松支持上万并发连接(如爬虫、Web 服务器)。


二、核心概念(面试必问)
  1. 协程(Coroutine)

协程是 asyncio 的最小执行单元,可以理解为「用户态的轻量级线程」,由程序员主动控制切换,而非操作系统抢占。

在 Python 中,用 async def 定义的函数就是协程函数:

import asyncio # 定义协程函数
async def say_hello():
print("Hello")
await asyncio.sleep(1) # 模拟 IO 操作,挂起当前协程
print("World")
  1. 事件循环(Event Loop)

事件循环是 asyncio 的心脏,负责:

  • 调度和执行所有协程;

  • 处理 IO 事件(如网络数据到达);

  • 在协程挂起 / 恢复时切换上下文。

Python 3.7+ 推荐用 asyncio.run() 自动管理事件循环:

# 运行协程(自动创建和关闭事件循环) 
asyncio.run(say_hello())
3. await 关键字

await 是协程切换的触发点,只能在 async def 函数内使用:

  • 作用:挂起当前协程,让事件循环去执行其他协程;

  • 等待对象:只能是「可等待对象」(协程对象、TaskFuture);

  • 时机:遇到 IO 操作(如 asyncio.sleep、网络请求)时必须用 await

  1. Task(任务)

Task 是对协程的封装,用于并发执行多个协程。协程本身是「串行」的,只有包装成 Task 交给事件循环,才能实现并发。

创建 Task :

async def main():# 方式 1:asyncio.create_task()(推荐,Python 3.7+)
    task1 = asyncio.create_task(say_hello())
    task2 = asyncio.create_task(say_hello())# 等待两个 Task 完成await task1
    await task2

总结:

场景 选择
网络请求(HTTP) aiohttp 库
读写文件 aiofiles 库
操作 MySQL 数据库 aiomysql 库
操作 PostgreSQL asyncpg 库
需要同时跑多个协程 asyncio.gather()
需要等一段时间 await asyncio.sleep()
写 Web 后端 FastAPI (它天生全 async)

4.进程间通信

什么是进程间通信?

进程是操作系统中资源分配的基本单位,每个进程都有独立的地址空间。一个进程无法直接访问另一个进程的内存。进程间通信(IPC) 就是操作系统提供的一组机制,让不同进程之间能够交换数据、同步执行。

主要的 IPC 方式
1. 🔧 管道(Pipe)
  • 匿名管道:最简单的 IPC 方式,半双工(数据只能单向流动),通常用于父子进程之间。
  • 命名管道(FIFO):存在于文件系统中,无亲缘关系的进程也可以通信。
  • from multiprocessing import Process, Pipe
    
    def child_process(conn):
        """子进程:发送数据"""
        conn.send("你好,我是子进程!")
        conn.send([1, 2, 3, {"key": "value"}])  # 可以发送任意 Python 对象
        conn.close()
    
    def pipe_example():
        parent_conn, child_conn = Pipe()  # 创建双向管道
        
        p = Process(target=child_process, args=(child_conn,))
        p.start()
        
        print(f"收到消息: {parent_conn.recv()}")
        print(f"收到数据: {parent_conn.recv()}")
        
        p.join()
    
    if __name__ == '__main__':
        pipe_example()
    
    #输出
    收到消息: 你好,我是子进程!
    收到数据: [1, 2, 3, {'key': 'value'}]
    

特点:简单高效,但只能单向传输,且匿名管道仅限于有亲缘关系的进程。


2. 📨 消息队列(Message Queue)
  • 内核中维护的一个消息链表,进程可以按类型选择性地读取消息。
  • 消息有明确的边界(不像管道是字节流)。
  • from multiprocessing import Process, Queue
    import time
    
    def producer(queue):
        """生产者进程"""
        for i in range(5):
            msg = f"消息 #{i}"
            queue.put(msg)
            print(f"[生产者] 发送: {msg}")
            time.sleep(0.5)
        queue.put(None)  # 发送结束信号
    
    def consumer(queue):
        """消费者进程"""
        while True:
            msg = queue.get()  # 阻塞等待消息
            if msg is None:
                break
            print(f"[消费者] 收到: {msg}")
    
    if __name__ == '__main__':
        queue = Queue()
        
        p1 = Process(target=producer, args=(queue,))
        p2 = Process(target=consumer, args=(queue,))
        
        p1.start()
        p2.start()
        
        p1.join()
        p2.join()
        print("通信完成!")
    
    #输出
    [生产者] 发送: 消息 #0
    [消费者] 收到: 消息 #0
    [生产者] 发送: 消息 #1
    [消费者] 收到: 消息 #1
    ...
    通信完成!
    

特点:支持消息分类、独立于进程生命周期、但有大小限制。


3. 🧠 共享内存(Shared Memory)
  • 最快的 IPC 方式!多个进程映射同一块物理内存到自己的地址空间,直接读写。
  • 需要配合同步机制(如信号量)来避免竞态条件。
  • from multiprocessing import shared_memory, Process
    import numpy as np
    
    def writer(shm_name):
        """写入进程"""
        existing_shm = shared_memory.SharedMemory(name=shm_name)
        # 将共享内存映射为 numpy 数组
        data = np.ndarray((5,), dtype=np.int64, buffer=existing_shm.buf)
        data[:] = [10, 20, 30, 40, 50]
        print(f"[写入者] 写入数据: {list(data)}")
        existing_shm.close()
    
    def reader(shm_name):
        """读取进程"""
        import time
        time.sleep(0.5)
        existing_shm = shared_memory.SharedMemory(name=shm_name)
        data = np.ndarray((5,), dtype=np.int64, buffer=existing_shm.buf)
        print(f"[读取者] 读取数据: {list(data)}")
        existing_shm.close()
    
    if __name__ == '__main__':
        # 创建共享内存块
        shm = shared_memory.SharedMemory(create=True, size=5 * 8)  # 5个int64
        
        p1 = Process(target=writer, args=(shm.name,))
        p2 = Process(target=reader, args=(shm.name,))
        
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        
        shm.close()
        shm.unlink()  # 释放共享内存
    

特点:速度最快(无需内核拷贝),但需要自行处理同步问题。


4. 🚦 信号量(Semaphore)
  • 本质是一个计数器,用于控制多个进程对共享资源的同步访问
  • 常配合共享内存使用。
操作 含义
P 操作(wait) 计数器 -1,若 < 0 则阻塞
V 操作(signal) 计数器 +1,唤醒等待的进程

特点:主要用于同步,不用于传输数据。


5. ⚡ 信号(Signal)
  • 一种异步通知机制,用于通知进程某个事件发生。
  • 例如 SIGKILL(强制终止)、SIGINT(Ctrl+C)、SIGUSR1(用户自定义)。

特点:传递的信息量极少(只有信号编号),适合通知型场景。


6. 🌐 Socket(套接字)
  • 最通用的 IPC 方式,不仅支持本机进程通信,还支持跨网络通信。
  • 分为 Unix Domain Socket(本机,高效)和 网络 Socket(TCP/UDP)。
# Python 简单 Socket 示例

import socket

# 服务端

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

server.bind(('localhost', 8080))

server.listen(1)

conn, addr = server.accept()

data = conn.recv(1024)

特点:功能最强大、跨平台跨网络,但相比共享内存开销更大。


📊 各种 IPC 方式对比
方式 速度 方向 进程关系 数据量 适用场景
匿名管道 单向 父子进程 字节流 简单的父子通信
命名管道 单向/双向 任意进程 字节流 无亲缘关系的进程
消息队列 中等 双向 任意进程 有边界的消息 按类型分发消息
共享内存 最快 双向 任意进程 大数据量 高性能数据共享
信号量 任意进程 无数据 同步/互斥控制
信号 单向 任意进程 极少 事件通知
Socket 较慢 双向 任意/跨机器 字节流 网络通信、通用 IPC

💡 如何选择?
  1. 父子进程简单通信 → 匿名管道
  2. 无关进程交换少量数据 → 消息队列 / 命名管道
  3. 大量数据高速共享 → 共享内存 + 信号量
  4. 跨机器通信 → Socket
  5. 事件通知 → 信号

5.协程 vs 线程 vs 进程

import asyncio
import threading
import multiprocessing
import time

# ---- 协程版本 ----
async def async_io_task(task_id):
    await asyncio.sleep(1)

async def run_async():
    start = time.perf_counter()
    await asyncio.gather(*[async_io_task(i) for i in range(1000)])
    return time.perf_counter() - start

# ---- 线程版本 ----
def thread_io_task(task_id):
    time.sleep(1)

def run_threads():
    start = time.perf_counter()
    threads = [threading.Thread(target=thread_io_task, args=(i,)) for i in range(1000)]
    for t in threads: t.start()
    for t in threads: t.join()
    return time.perf_counter() - start

# 对比
print("1000 个 I/O 任务(每个等待 1 秒):\n")

async_time = asyncio.run(run_async())
print(f"  协程:   {async_time:.2f}s  (1 个线程,极低开销)")

thread_time = run_threads()
print(f"  线程:   {thread_time:.2f}s  (1000 个线程,高内存开销)")

print(f"\n协程快 {thread_time/async_time:.1f}x,且内存消耗极小")

核心概念总结

概念 解释 类比
协程 可以暂停/恢复的函数 做饭时水烧着,去切菜
async def 定义协程函数 声明"我是可暂停的"
await 暂停当前协程,等待结果 "水没开,我先干别的"
事件循环 调度器,决定谁来执行 大脑决定先做什么
asyncio.run() 启动事件循环 启动大脑
asyncio.gather() 并发运行多个协程 同时煮饭、炒菜、煲汤
create_task() 把协程变成后台任务 安排一项工作
async for 异步迭代 逐条异步获取数据
async with 异步上下文管理 异步开关资源

关键理解:协程是在 单线程 内实现的并发,靠 await 主动让出控制权,由事件循环调度。没有多线程的锁问题,也没有 GIL 的困扰!

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐