Python并发编程
1.多线程、多进程、协程的区别
核心比喻:把你的电脑 CPU 想象成一个工厂。
第一层:进程 (Process) —— 厂房 / 车间
概念: 进程是操作系统分配资源的最小单位。 比喻:
- 你的电脑是一个工业园区。
- 当你双击运行一个 Python 脚本时,操作系统就给了你一栋厂房(进程)。
- 这个厂房有自己独立的电表(内存空间)、进货通道(文件句柄)和独立的大门钥匙。
- 关键特点:厂房和厂房之间是完全隔离、互不干扰的。A 厂房失火(崩溃),绝对不会烧到 B 厂房。同样,A 厂房里的机器也没法直接拿 B 厂房里的零件(进程间通信很麻烦,需要专门拿货车拉——比如使用 Queue 或 Pipe)。
Python API: multiprocessing 模块。
第二层:线程 (Thread) —— 工人
概念: 线程是 CPU 调度的最小单位。 比喻:
- 线程必须依附于进程而存在。
- 一个厂房(进程)里,默认至少有一个**工人(主线程)**在干活。
- 如果活太多,你可以再招聘几十个工人进到这个厂房里一起干(多线程)。
- 关键特点:这些工人在同一个厂房里,共享这里所有的资源和工具(内存、变量全共享)。这导致一个致命问题:如果两个工人同时抢着用一台切肉机(修改同一个公共变量),手可能就被切断了(数据错乱)。所以多线程编程**必须加锁(Lock)**排队使用资源。
Python API: threading 模块。
🚨 Python 的核心特色:令人又爱又恨的 GIL 锁
在说协程前,必须先讲 Python 多线程的一个“大坑”。 你知道吗?在标准 Python (CPython) 里,多线程是假的!
为什么? 因为 Python 的厂房大门上,挂着一把超级大锁,叫 GIL(全局解释器锁)。 这把锁规定:不管你这厂子里招了多少个工人(开了多少个线程),同一时刻,只能有 1 个工人拿到钥匙,去使用 CPU 这台核心机器!
- 结果:如果你的任务是疯狂进行数学计算(CPU 密集型),你开 10 个 Python 线程,不仅不会快,反而可能更慢!(因为 10 个工人一直在抢那一把大门钥匙,浪费了时间)。
- 那 Python 多线程有啥用?:对于**网络爬虫、读写文件(I/O 密集型)**极度有用!当工人 A 等待网页下载时(发呆时间),他会被 GIL 踢走,工人 B 马上拿着钥匙去下载另一个网页。
第三层:协程 (Coroutine) —— 千手观音 / 超级打工人
概念: 协程是用户态的轻量级线程。由程序员自己(或框架)在代码里调度,操作系统根本不知道它的存在。 比喻:
- 前面说,Python 因为有 GIL 锁,一个厂房只能有一个工人在用 CPU。
- 那我们干脆就只招一个老员工(单线程)!
- 但是这个老员工练就了《千手观音》绝技!
- 老员工在煮第一壶水(发起 HTTP 请求 A),水还没开(等待 I/O 网络返回的时间)。换作普通工人,他就在那傻等。但在协程里,他立马转身去跑去切菜(发起 HTTP 请求 B),切着切着转身又去洗锅……。
- 没有任何抢锁的冲突,也没有操作系统频繁切换工人的开销(这就避免了“上下文切换”的巨大损耗)。一个工人,硬生生干出了一万个工人的效率!
Python API: asyncio 模块 (配合 async 和 await 关键字)。这是现代 Python 开发(如 FastAPI 高并发)的核心主流!
终极实战总结:我该选哪一个?
这就是你写代码时面对的真正抉择。请死死记住这张表:
| 并发模型 | 适合什么任务? | 解释 | Python 实现方式 |
|---|---|---|---|
| 多核多进程 (多盖厂房) |
CPU 密集型 (疯狂计算、图像处理、压缩视频) |
既然一个厂房里有 GIL 锁,那我就直接盖 4 个相互独立的厂房(开 4 个进程),彻底绕开 GIL 锁,真正利用电脑的 4 核 CPU 同步计算! | multiprocessing 库 |
| 多线程 (厂房里招工人) |
I/O 密集型 (普通的网络爬虫不太多、简单的并发请求) |
因为有发呆等待时间,GIL 会自动切换工人干活。代码写起来比较简单传统,适合老派程序员。 | threading 库 |
| 异步协程 (单打工人开千手观音) |
海量 I/O 密集型 (超高并发爬虫、FastAPI 后端服务器处理几万个连接、WebSocket) |
现代 Python 的绝对王者!单核单线程跑满 CPU,没有加锁的烦恼,占用极低的内存就能扛住几万个网络连接,简直是为网络请求量身定做的。 | asyncio 库 + aiohttp |
总结:
- 只要是算数、处理大数据,用 多进程!
- 只要是现在的网络请求、写新项目后端,能用 协程 (asyncio) 就用协程,绝对香!
- 要是老项目、或者库不支持协程,再退而求其次用 多线程。
2.线程安全
一、 什么是“线程不安全”?(翻车现场)
想象一个极其简单的场景:厂房的墙上挂着一个小黑板,上面写着这个月卖出的货品总数 total_sales = 0。 现在有 2 个工人(线程 A 和 线程 B),他们的任务都是:去墙上把数字擦掉,加上 1,再重新写上墙。
正常情况下,如果 A 和 B 排队干活:
- A 看数字是 0,加 1,写 1。
- B 看数字是 1,加 1,写 2。 结果很完美,最终卖了 2 件。
但在“多线程”的世界里,是会随时被打断的! 看看真实的翻车情况:
- 线程 A 看到了黑板上写着
0。 - 就在这个时候!操作系统的 CPU 调度把 A 强行踢开了(比如 A 的时间片用完了),换线程 B 上场。
- 线程 B 也跑去看黑板,此时黑板上还是
0(因为 A 还没来得及加 1 并写上去)。 - 线程 B 在脑子里算
0+1=1,然后自信地在黑板上写下了1。 - 此时线程 A 又被唤醒了,A 接着刚才的记忆,脑子里算着
0+1=1,也自信地在黑板上写下了1。
灾难发生了:两个工人明明各自加了一次,但最后黑板上的数字是 1,而不是 2!你的钱不翼而飞了。
只要多个线程同时读写同一个变量,最后的结果可能会因为 CPU 切换时机的不可控随机性导致完全错乱。这就是所谓的**“线程不安全”**(或者叫“数据竞争 / Race Condition”)。
二、 Python 代码演示“翻车现场”
你可以把下面这段代码放在你本地跑一下,结果会非常出人意料:
import threading
# 公共变量(墙上的小黑板)
money = 0
# 工人的老本行:把钱疯狂加 100 万次
def add_money():
global money
for _ in range(1000000):
# 这句代码看似简单,底层其实分三步:1.读值 2.加一 3.写回
# 随时可能在这三步的中间被打断!
money += 1
# 招两个工人(开两个线程)
thread_A = threading.Thread(target=add_money)
thread_B = threading.Thread(target=add_money)
thread_A.start()
thread_B.start()
thread_A.join()
thread_B.join()
# 理论上 2 个工人各加 100 万次,应该是 2000000
print(f"最终的钱: {money}")
# 你会发现:每次运行的结果都不一样,比如可能是 1546732,永远达不到 200万!
三、 什么是真正的“线程安全”?(以及如何实现)
线程安全的意思就是说:不管操作系统怎么丧心病狂地在各个不同线程之间切换,我的公共数据永远不会乱,永远等于预期的结果。
在 Python 里,要实现线程安全,通常有以下两种武器:
武器 1:加锁 (Lock) —— 最经典、最暴力的做法
如果怕别人打断,那我就把整个黑板锁起来! Python 提供了 threading.Lock()。
import threading
money = 0
# 买一把锁
lock = threading.Lock()
def safe_add_money():
global money
for _ in range(1000000):
# 抢锁!抢到锁的人才能往下走,其他工人哪怕醒了,只要看锁着,就只能在门口死等(阻塞)
lock.acquire()
try:
money += 1 # 在锁里的操作,绝对不会被打断,称为“原子操作”
finally:
# 干完活,千万千万记得还要开锁放人!否则变成死锁(Deadlock),所有人都卡死
lock.release()
# 如果你用 with 语句,Python 会自动帮你上锁和解锁,写起来更优雅、更安全(防止你忘了 release):
def safe_add_money_with():
global money
for _ in range(1000000):
with lock:
money += 1
你如果用加上锁的版本重新跑,不管跑多少遍,结果绝对是精准的 2000000!
武器 2:使用线程安全的数据结构 (比如 Queue)
自己加锁很容易忘,稍微写错一点就会变成“死锁”导致程序直接卡死。 所以,Python 大神们早就帮我们写好了一些天生自带锁的“保险箱”。无论多少个工人同时去保险箱里塞东西或者拿东西,它内部都自动排好队了,绝对不会弄丢数据。
最经典的就是 queue.Queue (队列)。多线程爬虫时,通常不用 list 来保存爬到的网页链接,而是用 Queue,就是因为 Queue 是 线程安全的。
总结
- 线程不安全:多个人抢着用大铁锅,你刚放盐准备炒,别人一铲子把你的菜全盛走了。(导致数据错乱或者丢失)。
- 线程安全:无论你这程序开多少个线程去运行,都和单线程跑出来的结果一模一样。
- 怎么做到?:如果多线程要修改公共变量,老老实实加上
threading.Lock();如果只是传输数据,直接用queue.Queue之类的线程安全结构。如果你只是读数据不修改它,那不加锁也是线程安全的。
3.asyncio异步编程
一、asyncio 核心定位:解决 IO 密集型并发
asyncio 是 Python 用于编写单线程并发代码的标准库,核心解决IO 密集型任务(如网络请求、文件读写、数据库查询)的效率问题。
对比:同步 vs 多线程 vs asyncio
表格
| 模式 | 原理 | 适用场景 | 缺点 |
| 同步 | 顺序执行,IO 时阻塞等待 | 简单逻辑、CPU 密集 | IO 等待时浪费 CPU 资源 |
| 多线程 | 多线程切换执行,IO 时切换线程 | IO 密集、任务数少 | 线程切换开销大,GIL 限制并行 |
| asyncio | 单线程内协程切换,IO 时挂起 | 高并发 IO 密集 | 代码需异步化,不能有阻塞调用 |
核心优势:单线程 + 协程切换,开销远小于线程切换,轻松支持上万并发连接(如爬虫、Web 服务器)。
二、核心概念(面试必问)
-
协程(Coroutine)
协程是 asyncio 的最小执行单元,可以理解为「用户态的轻量级线程」,由程序员主动控制切换,而非操作系统抢占。
在 Python 中,用 async def 定义的函数就是协程函数:
import asyncio # 定义协程函数
async def say_hello():
print("Hello")
await asyncio.sleep(1) # 模拟 IO 操作,挂起当前协程
print("World")
-
事件循环(Event Loop)
事件循环是 asyncio 的心脏,负责:
-
调度和执行所有协程;
-
处理 IO 事件(如网络数据到达);
-
在协程挂起 / 恢复时切换上下文。
Python 3.7+ 推荐用 asyncio.run() 自动管理事件循环:
# 运行协程(自动创建和关闭事件循环)
asyncio.run(say_hello())
3. await 关键字
await 是协程切换的触发点,只能在 async def 函数内使用:
-
作用:挂起当前协程,让事件循环去执行其他协程;
-
等待对象:只能是「可等待对象」(协程对象、
Task、Future); -
时机:遇到 IO 操作(如
asyncio.sleep、网络请求)时必须用await。
-
Task(任务)
Task 是对协程的封装,用于并发执行多个协程。协程本身是「串行」的,只有包装成 Task 交给事件循环,才能实现并发。
创建 Task :
async def main():# 方式 1:asyncio.create_task()(推荐,Python 3.7+)
task1 = asyncio.create_task(say_hello())
task2 = asyncio.create_task(say_hello())# 等待两个 Task 完成await task1
await task2
总结:
| 场景 | 选择 |
|---|---|
| 网络请求(HTTP) | aiohttp 库 |
| 读写文件 | aiofiles 库 |
| 操作 MySQL 数据库 | aiomysql 库 |
| 操作 PostgreSQL | asyncpg 库 |
| 需要同时跑多个协程 | asyncio.gather() |
| 需要等一段时间 | await asyncio.sleep() |
| 写 Web 后端 | FastAPI (它天生全 async) |
4.进程间通信
什么是进程间通信?
进程是操作系统中资源分配的基本单位,每个进程都有独立的地址空间。一个进程无法直接访问另一个进程的内存。进程间通信(IPC) 就是操作系统提供的一组机制,让不同进程之间能够交换数据、同步执行。
主要的 IPC 方式
1. 🔧 管道(Pipe)
- 匿名管道:最简单的 IPC 方式,半双工(数据只能单向流动),通常用于父子进程之间。
- 命名管道(FIFO):存在于文件系统中,无亲缘关系的进程也可以通信。
-
from multiprocessing import Process, Pipe def child_process(conn): """子进程:发送数据""" conn.send("你好,我是子进程!") conn.send([1, 2, 3, {"key": "value"}]) # 可以发送任意 Python 对象 conn.close() def pipe_example(): parent_conn, child_conn = Pipe() # 创建双向管道 p = Process(target=child_process, args=(child_conn,)) p.start() print(f"收到消息: {parent_conn.recv()}") print(f"收到数据: {parent_conn.recv()}") p.join() if __name__ == '__main__': pipe_example() #输出 收到消息: 你好,我是子进程! 收到数据: [1, 2, 3, {'key': 'value'}]
特点:简单高效,但只能单向传输,且匿名管道仅限于有亲缘关系的进程。
2. 📨 消息队列(Message Queue)
- 内核中维护的一个消息链表,进程可以按类型选择性地读取消息。
- 消息有明确的边界(不像管道是字节流)。
-
from multiprocessing import Process, Queue import time def producer(queue): """生产者进程""" for i in range(5): msg = f"消息 #{i}" queue.put(msg) print(f"[生产者] 发送: {msg}") time.sleep(0.5) queue.put(None) # 发送结束信号 def consumer(queue): """消费者进程""" while True: msg = queue.get() # 阻塞等待消息 if msg is None: break print(f"[消费者] 收到: {msg}") if __name__ == '__main__': queue = Queue() p1 = Process(target=producer, args=(queue,)) p2 = Process(target=consumer, args=(queue,)) p1.start() p2.start() p1.join() p2.join() print("通信完成!") #输出 [生产者] 发送: 消息 #0 [消费者] 收到: 消息 #0 [生产者] 发送: 消息 #1 [消费者] 收到: 消息 #1 ... 通信完成!
特点:支持消息分类、独立于进程生命周期、但有大小限制。
3. 🧠 共享内存(Shared Memory)
- 最快的 IPC 方式!多个进程映射同一块物理内存到自己的地址空间,直接读写。
- 需要配合同步机制(如信号量)来避免竞态条件。
-
from multiprocessing import shared_memory, Process import numpy as np def writer(shm_name): """写入进程""" existing_shm = shared_memory.SharedMemory(name=shm_name) # 将共享内存映射为 numpy 数组 data = np.ndarray((5,), dtype=np.int64, buffer=existing_shm.buf) data[:] = [10, 20, 30, 40, 50] print(f"[写入者] 写入数据: {list(data)}") existing_shm.close() def reader(shm_name): """读取进程""" import time time.sleep(0.5) existing_shm = shared_memory.SharedMemory(name=shm_name) data = np.ndarray((5,), dtype=np.int64, buffer=existing_shm.buf) print(f"[读取者] 读取数据: {list(data)}") existing_shm.close() if __name__ == '__main__': # 创建共享内存块 shm = shared_memory.SharedMemory(create=True, size=5 * 8) # 5个int64 p1 = Process(target=writer, args=(shm.name,)) p2 = Process(target=reader, args=(shm.name,)) p1.start() p2.start() p1.join() p2.join() shm.close() shm.unlink() # 释放共享内存
特点:速度最快(无需内核拷贝),但需要自行处理同步问题。
4. 🚦 信号量(Semaphore)
- 本质是一个计数器,用于控制多个进程对共享资源的同步访问。
- 常配合共享内存使用。
| 操作 | 含义 |
|---|---|
P 操作(wait) |
计数器 -1,若 < 0 则阻塞 |
V 操作(signal) |
计数器 +1,唤醒等待的进程 |
特点:主要用于同步,不用于传输数据。
5. ⚡ 信号(Signal)
- 一种异步通知机制,用于通知进程某个事件发生。
- 例如
SIGKILL(强制终止)、SIGINT(Ctrl+C)、SIGUSR1(用户自定义)。
特点:传递的信息量极少(只有信号编号),适合通知型场景。
6. 🌐 Socket(套接字)
- 最通用的 IPC 方式,不仅支持本机进程通信,还支持跨网络通信。
- 分为 Unix Domain Socket(本机,高效)和 网络 Socket(TCP/UDP)。
# Python 简单 Socket 示例
import socket
# 服务端
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('localhost', 8080))
server.listen(1)
conn, addr = server.accept()
data = conn.recv(1024)
特点:功能最强大、跨平台跨网络,但相比共享内存开销更大。
📊 各种 IPC 方式对比
| 方式 | 速度 | 方向 | 进程关系 | 数据量 | 适用场景 |
|---|---|---|---|---|---|
| 匿名管道 | 快 | 单向 | 父子进程 | 字节流 | 简单的父子通信 |
| 命名管道 | 快 | 单向/双向 | 任意进程 | 字节流 | 无亲缘关系的进程 |
| 消息队列 | 中等 | 双向 | 任意进程 | 有边界的消息 | 按类型分发消息 |
| 共享内存 | 最快 | 双向 | 任意进程 | 大数据量 | 高性能数据共享 |
| 信号量 | — | — | 任意进程 | 无数据 | 同步/互斥控制 |
| 信号 | 快 | 单向 | 任意进程 | 极少 | 事件通知 |
| Socket | 较慢 | 双向 | 任意/跨机器 | 字节流 | 网络通信、通用 IPC |
💡 如何选择?
- 父子进程简单通信 → 匿名管道
- 无关进程交换少量数据 → 消息队列 / 命名管道
- 大量数据高速共享 → 共享内存 + 信号量
- 跨机器通信 → Socket
- 事件通知 → 信号
5.协程 vs 线程 vs 进程
import asyncio
import threading
import multiprocessing
import time
# ---- 协程版本 ----
async def async_io_task(task_id):
await asyncio.sleep(1)
async def run_async():
start = time.perf_counter()
await asyncio.gather(*[async_io_task(i) for i in range(1000)])
return time.perf_counter() - start
# ---- 线程版本 ----
def thread_io_task(task_id):
time.sleep(1)
def run_threads():
start = time.perf_counter()
threads = [threading.Thread(target=thread_io_task, args=(i,)) for i in range(1000)]
for t in threads: t.start()
for t in threads: t.join()
return time.perf_counter() - start
# 对比
print("1000 个 I/O 任务(每个等待 1 秒):\n")
async_time = asyncio.run(run_async())
print(f" 协程: {async_time:.2f}s (1 个线程,极低开销)")
thread_time = run_threads()
print(f" 线程: {thread_time:.2f}s (1000 个线程,高内存开销)")
print(f"\n协程快 {thread_time/async_time:.1f}x,且内存消耗极小")
核心概念总结
| 概念 | 解释 | 类比 |
|---|---|---|
| 协程 | 可以暂停/恢复的函数 | 做饭时水烧着,去切菜 |
async def |
定义协程函数 | 声明"我是可暂停的" |
await |
暂停当前协程,等待结果 | "水没开,我先干别的" |
| 事件循环 | 调度器,决定谁来执行 | 大脑决定先做什么 |
asyncio.run() |
启动事件循环 | 启动大脑 |
asyncio.gather() |
并发运行多个协程 | 同时煮饭、炒菜、煲汤 |
create_task() |
把协程变成后台任务 | 安排一项工作 |
async for |
异步迭代 | 逐条异步获取数据 |
async with |
异步上下文管理 | 异步开关资源 |
关键理解:协程是在 单线程 内实现的并发,靠
await主动让出控制权,由事件循环调度。没有多线程的锁问题,也没有 GIL 的困扰!
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)