Python进阶--多进程与多线程
一、核心概念与基础概述
在现代计算机中,我们经常需要程序“同时”处理多个任务,比如一边下载文件一边播放音乐、同时响应多个用户的网络请求、并行处理大规模数据。这就需要用到多进程与多线程技术,二者是操作系统并发编程的核心实现方式。
1.1 核心基础概念
1.1.1 进程(Process)
- 定义:进程是操作系统分配资源(内存、文件句柄、CPU时间片等)的最小单位,是程序的一次动态执行实例。
- 直观理解:你打开的每一个应用程序(浏览器、微信、PyCharm、Python解释器),在操作系统中都是一个或多个独立的进程。
- 核心特点:每个进程拥有完全独立的内存地址空间,不同进程之间默认无法直接访问对方的资源,进程间通信需要特殊的机制。
- 进程的生命周期状态:就绪 → 运行 → 阻塞 → 终止
-
- 就绪:进程已具备运行条件,等待CPU调度
- 运行:进程正在CPU上执行指令
- 阻塞:进程因等待IO、锁等资源暂停执行,让出CPU
- 终止:进程执行完毕或异常退出,资源被操作系统回收
1.1.2 线程(Thread)
- 定义:线程是操作系统CPU调度的最小单位,是进程内部的一条执行流,也被称为“轻量级进程”。
- 直观理解:一个进程可以包含多个线程,所有线程共享进程的内存空间和系统资源。比如浏览器进程中,一个线程负责渲染页面,一个线程负责下载资源,一个线程负责响应用户操作。
- 核心特点:同一进程内的线程共享全局变量、堆内存,通信成本极低,但并发修改共享资源时会产生线程安全问题。
1.1.3 PID与PPID(进程核心标识)
这是进程管理的核心标识,通过Python内置的os模块可以直接获取,也是本次重点补充的核心内容:
- PID(Process ID):进程ID,操作系统为每个运行的进程分配的唯一数字标识符,用于区分和管理进程。PID在进程的整个生命周期内保持不变,进程终止后,PID会被操作系统回收复用。
- PPID(Parent Process ID):父进程ID,创建子进程的进程称为父进程,子进程的PPID就是其父进程的PID。操作系统中除了内核初始化的0号进程,所有进程都有父进程,形成树形的进程层级结构。
os模块核心操作函数:
|
函数 |
功能说明 |
|
|
获取当前正在执行的进程的PID,可在任意进程内调用,用于日志记录、进程管理、问题排查 |
|
|
获取当前进程的父进程的PPID,可在任意进程内调用,用于排查进程层级关系、孤儿进程问题 |
1.1.4 并发与并行
- 并发:多个任务在宏观上“同时”执行,微观上是CPU在多个任务之间快速切换,同一时刻只有一个任务在执行(单核CPU只能实现并发)。
- 并行:多个任务在物理上同时执行,需要多核CPU支持,同一时刻多个核心分别执行不同的任务。
- 多进程可以实现并行,多线程在CPython中受GIL限制,只能实现并发(IO密集型场景除外)。
1.2 为什么需要多进程/多线程
- 提升程序执行效率:CPU密集型任务通过多进程利用多核CPU实现并行;IO密集型任务通过多线程/多进程利用IO等待的空闲时间。
- 提升用户体验:避免单线程阻塞导致程序界面卡死,实现“同时”响应用户操作和后台任务。
- 实现服务高并发:Web服务器、数据库等服务需要同时处理成千上万个客户端请求,必须依赖多进程/多线程技术。
二、Python 多进程编程详解
Python提供了内置的multiprocessing模块,专门用于实现多进程编程,完美绕过CPython的GIL限制,实现真正的多核并行。
2.1 多进程的核心创建方式
2.1.1 方式1:使用Process类直接创建(最常用)
通过实例化multiprocessing.Process类创建进程,指定进程要执行的目标函数,是最灵活、最通用的方式。
关键注意事项:
- Windows系统下,多进程的启动代码必须放在
if __name__ == "__main__":代码块内。因为Windows创建进程采用spawn模式,会重新导入当前模块,若不做判断会导致递归创建进程,触发报错。 - Linux/macOS系统默认采用
fork模式,会复制父进程的内存空间,无此限制,但为了代码跨平台兼容性,建议统一加上该判断。
import multiprocessing
import time
import os
# 定义子进程要执行的任务函数
def task(name):
# 子进程内获取自身PID和父进程PPID
print(f"[子进程 {name}] 启动,PID={os.getpid()},父进程PPID={os.getppid()}")
time.sleep(2) # 模拟耗时操作
print(f"[子进程 {name}] 执行结束,PID={os.getpid()}")
if __name__ == "__main__":
# 主进程内获取自身PID
print(f"[主进程] 启动,PID={os.getpid()}")
# 1. 创建进程对象
# target:指定子进程要执行的函数名(不要加括号,加括号会直接执行)
# args:以元组形式给目标函数传递位置参数
# name:给进程设置自定义名称,便于管理
p1 = multiprocessing.Process(target=task, args=("A",), name="Process-A")
p2 = multiprocessing.Process(target=task, args=("B",), name="Process-B")
# 2. 启动进程:调用start()后,进程进入就绪状态,等待CPU调度
p1.start()
p2.start()
print(f"[主进程] 所有子进程已启动,等待子进程执行完毕")
2.1.2 方式2:继承Process类重写run方法
适用于复杂的进程逻辑,通过继承multiprocessing.Process类,重写run()方法定义进程的执行逻辑,进程启动后会自动执行run()方法。
import multiprocessing
import time
import os
# 自定义进程类,继承multiprocessing.Process
class MyProcess(multiprocessing.Process):
def __init__(self, name):
# 必须先调用父类的构造方法,否则会报错
super().__init__()
self.task_name = name
# 重写run方法:进程的核心执行逻辑
def run(self):
print(f"[子进程 {self.task_name}] 启动,PID={os.getpid()},父进程PPID={os.getppid()}")
time.sleep(2)
print(f"[子进程 {self.task_name}] 执行结束")
if __name__ == "__main__":
print(f"[主进程] 启动,PID={os.getpid()}")
# 创建自定义进程对象
p1 = MyProcess("A")
p2 = MyProcess("B")
# 启动进程:自动调用run()方法
p1.start()
p2.start()
2.2 带参数的多进程完整演示
给子进程传递参数有两种标准方式,支持任意类型的参数传递:
args:元组形式,传递位置参数,单个参数时必须加逗号(x,),否则会被识别为普通变量而非元组。kwargs:字典形式,传递关键字参数,key为参数名,value为参数值。
import multiprocessing
import time
import os
def calculate(a, b, operation="add"):
"""带参数的计算任务:支持加法和乘法运算"""
print(f"[子进程 PID={os.getpid()}] 开始计算:{a} {operation} {b}")
time.sleep(1) # 模拟计算耗时
if operation == "add":
result = a + b
elif operation == "multiply":
result = a * b
else:
result = None
print(f"[子进程 PID={os.getpid()}] 计算完成,结果={result}")
return result
if __name__ == "__main__":
print(f"[主进程 PID={os.getpid()}] 开始执行")
# 1. 仅通过args传递位置参数
p1 = multiprocessing.Process(target=calculate, args=(10, 20))
# 2. args传递位置参数 + kwargs传递关键字参数
p2 = multiprocessing.Process(target=calculate, args=(10, 20), kwargs={"operation": "multiply"})
# 3. 仅通过kwargs传递所有参数
p3 = multiprocessing.Process(target=calculate, kwargs={"a": 5, "b": 6, "operation": "multiply"})
# 启动所有进程
for p in [p1, p2, p3]:
p.start()
# 等待所有进程执行完毕
for p in [p1, p2, p3]:
p.join()
print(f"[主进程 PID={os.getpid()}] 所有任务执行完成")
2.3 进程对象(Process)常用属性与方法
multiprocessing.Process类提供了丰富的属性和方法,用于管理和控制进程的生命周期,核心内容如下:
|
分类 |
属性/方法 |
详细功能说明 |
|
启动控制 |
|
启动进程,将进程从新建状态转为就绪状态,等待CPU调度,每个进程只能调用一次 |
|
执行逻辑 |
|
进程的核心执行方法,子类可重写,直接调用不会启动新进程,只会在当前进程执行 |
|
等待控制 |
|
阻塞当前进程(通常是主进程),直到被调用的进程执行完毕或超时。 |
|
状态判断 |
|
判断进程是否处于运行状态,返回布尔值 |
|
强制终止 |
|
强制终止进程,不会处理进程的资源释放、子进程管理,不推荐随意使用,可能导致资源泄漏、僵尸进程 |
|
安全退出 |
|
关闭进程对象,释放相关资源,只能在进程结束后调用,调用后不能再对进程进行任何操作 |
|
核心属性 |
|
进程的名称,可在创建时自定义,默认格式为 |
|
核心属性 |
|
进程的PID,进程启动前为 |
|
核心属性 |
|
布尔值,设置进程是否为守护进程,必须在 |
|
核心属性 |
|
进程的退出码,进程运行中为 |
完整演示代码:
import multiprocessing
import time
import os
def task():
print(f"[子进程] 启动,PID={os.getpid()}")
time.sleep(3)
print(f"[子进程] 执行结束")
if __name__ == "__main__":
print(f"[主进程] 启动,PID={os.getpid()}")
# 创建进程对象
p = multiprocessing.Process(target=task, name="DemoProcess")
# 进程启动前的属性
print(f"进程名称:{p.name},PID:{p.pid},是否存活:{p.is_alive()},是否守护进程:{p.daemon}")
# 启动进程
p.start()
print("="*30)
# 进程启动后的属性
print(f"进程名称:{p.name},PID:{p.pid},是否存活:{p.is_alive()}")
# 等待1秒后查看状态
time.sleep(1)
print(f"1秒后,进程是否存活:{p.is_alive()}")
# 等待进程结束
p.join()
print("="*30)
# 进程结束后的属性
print(f"进程结束后,是否存活:{p.is_alive()},退出码:{p.exitcode}")
# 关闭进程对象,释放资源
p.close()
print(f"[主进程] 执行结束")
2.4 进程PID与PPID的深度详解(os模块重点)
本节重点展开os.getpid()和os.getppid()的使用、父子进程的PID关联关系,以及不同场景下的PID变化规律,是多进程编程的核心基础。
2.4.1 父子进程的PID关联演示
import multiprocessing
import time
import os
def child_task():
"""子进程任务"""
# 子进程内获取PID和PPID
child_pid = os.getpid()
parent_pid = os.getppid()
print(f"[子进程] 我的PID={child_pid},我的父进程PPID={parent_pid}")
time.sleep(5) # 保持进程运行,便于观察
print(f"[子进程 PID={child_pid}] 执行结束")
if __name__ == "__main__":
# 主进程获取自身PID
main_pid = os.getpid()
print(f"[主进程] 我的PID={main_pid}")
# 创建并启动子进程
p = multiprocessing.Process(target=child_task)
p.start()
# 等待子进程执行完毕
p.join()
print(f"[主进程 PID={main_pid}] 执行结束")
执行结果说明:
子进程的PPID完全等于主进程的PID,清晰体现了父子进程的层级关系。
2.4.2 孤儿进程的PPID变化
孤儿进程定义:父进程提前结束退出,而子进程还在运行,此时子进程就会成为孤儿进程。
- Linux/macOS系统:孤儿进程会被系统的
init(PID=1)或systemd进程接管,此时子进程的PPID会变为1。 - Windows系统:孤儿进程会被系统的
svchost.exe进程接管,PPID会变为系统进程的PID。
演示代码:
import multiprocessing
import time
import os
def orphan_task():
"""孤儿进程任务"""
print(f"[子进程] 启动,PID={os.getpid()},初始PPID={os.getppid()}")
time.sleep(3) # 等待3秒,此时父进程已经退出
print(f"[子进程] 父进程退出后,我的PPID={os.getppid()}")
print(f"[子进程 PID={os.getpid()}] 执行结束")
if __name__ == "__main__":
print(f"[主进程] 启动,PID={os.getpid()}")
p = multiprocessing.Process(target=orphan_task)
p.start()
# 主进程只等待1秒就退出,子进程还需要运行2秒,成为孤儿进程
time.sleep(1)
print(f"[主进程 PID={os.getpid()}] 提前退出")
2.5 守护进程(Daemon Process)
2.5.1 定义与核心特性
守护进程是一种运行在后台的特殊进程,它的生命周期完全依附于主进程:当主进程的所有非守护子进程执行完毕、主进程准备退出时,会强制终止所有守护进程,无论守护进程是否执行完毕。
2.5.2 核心使用规则
- 必须在进程调用
start()启动前,设置daemon = True,启动后无法修改。 - 守护进程不能创建新的子进程,否则主进程退出时,守护进程的子进程会成为孤儿进程,导致资源泄漏。
- 主进程退出时,只会等待非守护子进程执行完毕,不会等待守护进程。
- 与守护线程的核心差异:守护进程无法创建子进程,而守护线程可以创建新的子线程,新子线程会继承daemon属性。
2.5.3 完整演示代码
import multiprocessing
import time
import os
def daemon_task():
"""守护进程任务:后台日志监控"""
print(f"[守护进程 PID={os.getpid()}] 启动,开始监控日志")
# 模拟持续的后台监控
for i in range(5):
time.sleep(1)
print(f"[守护进程] 监控运行中... 第{i+1}秒")
print(f"[守护进程 PID={os.getpid()}] 正常执行结束") # 这行代码不会执行
def normal_task():
"""非守护进程任务:核心业务逻辑"""
print(f"[非守护进程 PID={os.getpid()}] 启动,开始执行业务")
time.sleep(3)
print(f"[非守护进程 PID={os.getpid()}] 业务执行结束")
if __name__ == "__main__":
print(f"[主进程 PID={os.getpid()}] 启动")
# 创建守护进程
p_daemon = multiprocessing.Process(target=daemon_task)
p_daemon.daemon = True # 设置为守护进程
# 创建非守护进程
p_normal = multiprocessing.Process(target=normal_task)
# 启动进程
p_daemon.start()
p_normal.start()
# 等待非守护进程执行完毕
p_normal.join()
print(f"[主进程 PID={os.getpid()}] 所有非守护进程执行完毕,准备退出")
2.6 主进程与子进程的开始/结束时机
2.6.1 开始时机
- 主进程:Python程序启动时,操作系统自动创建主进程,开始执行代码。
- 子进程:调用
start()方法后,子进程进入就绪队列,等待操作系统的CPU调度,获得CPU时间片后,才会真正开始执行代码。start()调用不等于子进程立即执行。
2.6.2 结束时机
- 子进程:
-
- 正常结束:目标函数/
run()方法执行完毕,自动退出。 - 异常结束:执行过程中发生未捕获的异常,进程终止。
- 强制结束:被主进程调用
terminate()强制终止,或主进程退出时被强制终止的守护进程。
- 正常结束:目标函数/
- 主进程:
-
- 主进程执行完所有代码后,不会立即退出,会等待所有非守护子进程全部执行完毕后,才会正式退出。
- 主进程退出时,会向所有守护子进程发送终止信号,强制结束守护进程。
- 主进程退出后,会回收所有子进程的资源,避免僵尸进程。
2.7 孤儿进程与僵尸进程(重点避坑)
这是多进程编程中最常见的问题,和PID、PPID直接相关,必须重点掌握。
2.7.1 孤儿进程
- 定义:父进程提前退出,子进程仍在运行,此时子进程成为孤儿进程,会被操作系统的init/systemd进程接管,成为系统进程的子进程。
- 风险:孤儿进程没有危害,操作系统会自动回收其资源,不会造成资源泄漏。
- 常见场景:主进程异常崩溃,导致子进程成为孤儿进程。
2.7.2 僵尸进程
- 定义:子进程已经执行完毕退出,但父进程没有调用
wait()/join()回收子进程的资源,子进程的PID、退出状态等信息仍保留在操作系统的进程表中,成为僵尸进程(Zombie Process)。 - 风险:僵尸进程虽然不占用CPU和内存,但会占用操作系统的PID资源,系统的PID数量有限,大量僵尸进程会导致无法创建新进程。
- 产生原因:父进程只创建子进程,不调用
join()等待子进程,也不处理子进程的退出信号。 - 解决方法:
-
- 父进程主动调用
join()等待子进程执行完毕,回收资源。 - 注册信号处理函数,捕获子进程的退出信号,自动回收资源。
- 父进程提前退出,让僵尸进程成为孤儿进程,由系统进程接管并回收。
- 父进程主动调用
僵尸进程演示代码:
import multiprocessing
import time
import os
def zombie_task():
"""子进程任务,执行完后成为僵尸进程"""
print(f"[子进程 PID={os.getpid()}] 启动,1秒后执行结束")
time.sleep(1)
print(f"[子进程 PID={os.getpid()}] 执行结束,即将成为僵尸进程")
if __name__ == "__main__":
print(f"[主进程 PID={os.getpid()}] 启动")
p = multiprocessing.Process(target=zombie_task)
p.start()
# 主进程不调用join(),持续运行10秒,子进程结束后会成为僵尸进程
print(f"[主进程] 不等待子进程,持续运行10秒")
time.sleep(10)
print(f"[主进程] 执行结束,退出时会回收僵尸进程资源")
2.8 进程间通信(IPC)详解
由于进程拥有独立的内存空间,不同进程之间无法直接访问对方的全局变量,必须通过操作系统提供的进程间通信(IPC,Inter-Process Communication)机制实现数据交互。Python的multiprocessing模块提供了多种成熟的IPC方案。
2.8.1 进程全局变量不共享的核心验证
首先明确:子进程会复制父进程的全局变量副本,父子进程的全局变量完全独立,修改一方不会影响另一方。
import multiprocessing
import time
# 全局变量
g_num = 0
def modify_num():
"""子进程修改全局变量"""
global g_num
g_num += 100
print(f"[子进程] 修改后的全局变量g_num={g_num}")
if __name__ == "__main__":
print(f"[主进程] 初始全局变量g_num={g_num}")
p = multiprocessing.Process(target=modify_num)
p.start()
p.join()
# 子进程的修改不会影响主进程的全局变量
print(f"[主进程] 子进程修改后的全局变量g_num={g_num}")
2.8.2 方案1:队列Queue(最常用)
multiprocessing.Queue是基于管道实现的进程安全的先进先出(FIFO)队列,支持多进程同时读写,自动处理锁机制,是进程间通信最常用的方案,适用于生产者-消费者模型。
核心方法:
|
方法 |
功能说明 |
|
|
向队列中放入数据,block=True时队列满会阻塞,timeout为最长等待时间 |
|
|
从队列中取出数据,block=True时队列为空会阻塞 |
|
|
判断队列是否为空,返回布尔值 |
|
|
判断队列是否已满,返回布尔值 |
|
|
返回队列中当前的元素数量 |
完整演示代码(生产者-消费者模型):
import multiprocessing
import time
def producer(queue):
"""生产者进程:向队列中生产数据"""
print("[生产者] 启动,开始生产数据")
for i in range(5):
data = f"商品-{i}"
queue.put(data)
print(f"[生产者] 已生产并放入队列:{data}")
time.sleep(1)
print("[生产者] 生产完成,退出")
def consumer(queue):
"""消费者进程:从队列中消费数据"""
print("[消费者] 启动,开始消费数据")
while True:
# 从队列中获取数据,队列为空时阻塞等待
data = queue.get()
print(f"[消费者] 已从队列中取出并消费:{data}")
time.sleep(2)
# 队列为空且生产者已结束,退出循环
if queue.empty() and data == "商品-4":
break
print("[消费者] 消费完成,退出")
if __name__ == "__main__":
# 创建队列对象,maxsize为队列最大容量,不设置则无限制
q = multiprocessing.Queue(maxsize=10)
# 创建生产者和消费者进程
p_producer = multiprocessing.Process(target=producer, args=(q,))
p_consumer = multiprocessing.Process(target=consumer, args=(q,))
# 启动进程
p_producer.start()
p_consumer.start()
# 等待进程结束
p_producer.join()
p_consumer.join()
print("[主进程] 所有任务执行完成")
2.8.3 方案2:管道Pipe
multiprocessing.Pipe是一种点对点的双向通信管道,创建时会返回两个连接对象,分别代表管道的两端,两个进程分别持有一个端口,实现双向数据收发。适用于两个进程之间的一对一通信,效率比Queue更高。
核心方法:
|
方法 |
功能说明 |
|
|
向管道另一端发送数据,支持Python所有可序列化的对象 |
|
|
从管道另一端接收数据,管道中无数据时会阻塞等待 |
|
|
关闭管道连接,释放资源 |
完整演示代码:
import multiprocessing
import time
def sender(conn):
"""发送方进程:通过管道发送数据"""
print("[发送方] 启动,开始发送消息")
for i in range(3):
msg = f"消息-{i}"
conn.send(msg)
print(f"[发送方] 已发送:{msg}")
time.sleep(1)
# 接收对方的回复
reply = conn.recv()
print(f"[发送方] 收到回复:{reply}")
# 关闭管道
conn.close()
print("[发送方] 退出")
def receiver(conn):
"""接收方进程:通过管道接收数据并回复"""
print("[接收方] 启动,开始接收消息")
while True:
try:
msg = conn.recv()
print(f"[接收方] 已接收:{msg}")
time.sleep(1)
# 收到最后一条消息,回复并退出
if msg == "消息-2":
conn.send("所有消息已收到,感谢!")
break
except EOFError:
# 管道另一端关闭时,抛出EOFError
break
conn.close()
print("[接收方] 退出")
if __name__ == "__main__":
# 创建管道,duplex=True表示双向管道,False表示单向管道
conn1, conn2 = multiprocessing.Pipe(duplex=True)
# 创建进程
p_sender = multiprocessing.Process(target=sender, args=(conn1,))
p_receiver = multiprocessing.Process(target=receiver, args=(conn2,))
# 启动进程
p_sender.start()
p_receiver.start()
# 等待进程结束
p_sender.join()
p_receiver.join()
print("[主进程] 通信完成")
2.8.4 方案3:Manager共享对象
multiprocessing.Manager提供了一种跨进程共享Python对象的方案,支持共享列表、字典、数值、锁等多种数据类型,底层通过管道和代理对象实现,支持多进程同时读写,自动处理进程安全。适用于需要在多个进程之间共享可变对象的场景。
完整演示代码:
import multiprocessing
import time
import os
def modify_dict(shared_dict, lock):
"""子进程修改共享字典"""
with lock: # 加锁保证进程安全
shared_dict["count"] += 1
shared_dict["name"] = f"进程-{os.getpid()}"
print(f"[进程 PID={os.getpid()}] 修改后的共享字典:{shared_dict}")
if __name__ == "__main__":
# 创建Manager对象
with multiprocessing.Manager() as manager:
# 创建共享字典和锁
shared_dict = manager.dict({"count": 0, "name": "主进程"})
lock = manager.Lock()
print(f"[主进程] 初始共享字典:{shared_dict}")
# 创建多个进程,同时修改共享字典
process_list = []
for i in range(5):
p = multiprocessing.Process(target=modify_dict, args=(shared_dict, lock))
process_list.append(p)
p.start()
# 等待所有进程结束
for p in process_list:
p.join()
print(f"[主进程] 最终共享字典:{shared_dict}")
2.8.5 方案4:共享内存Value/Array
multiprocessing.Value和multiprocessing.Array提供了底层的共享内存方案,直接在操作系统的共享内存中创建数据,无需通过管道传输,效率是所有IPC方案中最高的,适用于大量数据的高速共享。
核心说明:
Value(typecode, value):创建共享的单个数值,typecode为数据类型代码(如'i'代表int,'d'代表double)。Array(typecode, size/sequence):创建共享的数组,支持固定长度的数值数组。
完整演示代码:
import multiprocessing
import time
def add_value(shared_num, lock):
"""子进程累加共享数值"""
for _ in range(100000):
with lock:
shared_num.value += 1
if __name__ == "__main__":
# 创建共享int类型数值,初始值为0
shared_num = multiprocessing.Value('i', 0)
# 创建进程安全的锁
lock = multiprocessing.Lock()
print(f"[主进程] 初始共享数值:{shared_num.value}")
# 创建两个进程,同时累加
p1 = multiprocessing.Process(target=add_value, args=(shared_num, lock))
p2 = multiprocessing.Process(target=add_value, args=(shared_num, lock))
p1.start()
p2.start()
p1.join()
p2.join()
# 最终结果为200000,验证共享内存的有效性
print(f"[主进程] 最终共享数值:{shared_num.value}")
2.9 进程池Pool
在实际开发中,不会无限制地创建进程,因为进程的创建和销毁开销极大,大量进程会耗尽系统内存和CPU资源。进程池multiprocessing.Pool可以预先创建固定数量的进程,复用这些进程执行大量任务,避免频繁创建销毁进程的开销,是生产环境中多进程编程的首选方案。
2.9.1 核心特性
- 进程池创建时指定最大进程数,默认等于CPU核心数。
- 任务数量超过最大进程数时,多余任务会在队列中等待,直到有空闲进程。
- 自动管理进程的生命周期,无需手动创建、启动、等待进程。
2.9.2 核心方法
|
方法 |
功能说明 |
|
|
同步执行任务,阻塞当前进程,直到任务执行完毕返回结果,不支持并行 |
|
|
异步执行任务,不阻塞当前进程,任务执行完毕后可调用callback回调函数 |
|
|
同步执行,将可迭代对象中的每个元素作为参数传入func,批量执行任务,返回结果列表 |
|
|
异步版本的map,批量执行任务,执行完毕后调用回调函数 |
|
|
关闭进程池,不再接受新的任务 |
|
|
等待进程池中所有任务执行完毕,必须在 |
|
|
强制终止进程池,立即停止所有正在执行的任务 |
2.9.3 完整演示代码
import multiprocessing
import time
import os
def task(num):
"""进程池执行的任务"""
print(f"[进程 PID={os.getpid()}] 开始处理任务:num={num}")
time.sleep(1)
result = num * num
print(f"[进程 PID={os.getpid()}] 任务处理完成,结果={result}")
return result
if __name__ == "__main__":
print(f"[主进程 PID={os.getpid()}] 启动,CPU核心数={multiprocessing.cpu_count()}")
# 创建进程池,最大进程数为3
pool = multiprocessing.Pool(processes=3)
# 1. 批量异步执行任务
print("="*20 + "批量执行任务" + "="*20)
task_list = [1,2,3,4,5,6]
# 使用map_async批量执行任务
result = pool.map_async(task, task_list)
# 关闭进程池,不再接受新任务
pool.close()
# 等待所有任务执行完毕
pool.join()
# 获取执行结果
print("="*20 + "任务执行结果" + "="*20)
print(f"所有任务的结果列表:{result.get()}")
print(f"[主进程] 所有任务执行完成")
三、Python 多线程编程详解
Python提供了内置的threading模块,专门用于实现多线程编程,API设计和multiprocessing高度一致,学习成本极低,适用于IO密集型任务的并发处理。
3.1 全局解释器锁(GIL)核心说明
在学习多线程之前,必须先理解CPython的GIL机制,这是Python多线程的核心限制。
3.1.1 什么是GIL
GIL(Global Interpreter Lock,全局解释器锁)是CPython解释器中的一把互斥锁,它保证同一时刻,只有一个线程可以在CPU上执行Python字节码。即使是多核CPU,同一时刻也只有一个核心在执行Python线程的代码。
3.1.2 GIL的释放时机
- IO操作时自动释放:线程遇到文件读写、网络请求、time.sleep()等IO操作时,会立即释放GIL,让其他线程执行。
- 时间分片释放:Python3.2之后,线程执行满5ms的时间片后,会自动释放GIL,让其他线程竞争执行。
3.1.3 GIL对多线程的影响
- IO密集型任务:多线程效率极高!因为线程大部分时间在等待IO,GIL会被频繁释放,多个线程可以并发执行,充分利用IO等待的空闲时间。
- CPU密集型任务:多线程效率极低,甚至不如单线程!因为线程一直在占用CPU执行字节码,GIL只会按时间分片释放,同一时刻只有一个线程在执行,无法利用多核CPU,还增加了线程切换的开销。
- 解决方案:CPU密集型任务,优先使用多进程,绕过GIL限制,实现多核并行。
3.2 多线程的核心创建方式
3.2.1 方式1:使用Thread类直接创建(最常用)
通过实例化threading.Thread类创建线程,指定目标函数,是最通用的方式。
注意:多线程代码无需放在if __name__ == "__main__"代码块内,Windows系统也无此限制。
import threading
import time
import os
def task(name):
"""线程执行的任务"""
print(f"[子线程 {name}] 启动,线程ID={threading.current_thread().ident},所属进程PID={os.getpid()}")
time.sleep(2)
print(f"[子线程 {name}] 执行结束")
# 主线程执行
print(f"[主线程] 启动,线程ID={threading.current_thread().ident},所属进程PID={os.getpid()}")
# 创建线程对象
t1 = threading.Thread(target=task, args=("A",), name="Thread-A")
t2 = threading.Thread(target=task, args=("B",), name="Thread-B")
# 启动线程
t1.start()
t2.start()
print(f"[主线程] 所有子线程已启动")
3.2.2 方式2:继承Thread类重写run方法
适用于复杂的线程逻辑,继承threading.Thread类,重写run()方法定义线程的执行逻辑。
import threading
import time
class MyThread(threading.Thread):
def __init__(self, name):
super().__init__()
self.task_name = name
def run(self):
print(f"[子线程 {self.task_name}] 启动,线程ID={self.ident}")
time.sleep(2)
print(f"[子线程 {self.task_name}] 执行结束")
print(f"[主线程] 启动,线程ID={threading.current_thread().ident}")
t1 = MyThread("A")
t2 = MyThread("B")
t1.start()
t2.start()
3.3 带参数的多线程完整演示
和多进程完全一致,通过args和kwargs传递参数:
import threading
import time
def calculate(a, b, operation="add"):
"""带参数的计算任务"""
print(f"[子线程 ID={threading.current_thread().ident}] 开始计算:{a} {operation} {b}")
time.sleep(1)
if operation == "add":
result = a + b
elif operation == "multiply":
result = a * b
else:
result = None
print(f"[子线程] 计算完成,结果={result}")
return result
print(f"[主线程] 开始执行")
# 传递位置参数
t1 = threading.Thread(target=calculate, args=(10, 20))
# 传递位置参数+关键字参数
t2 = threading.Thread(target=calculate, args=(10, 20), kwargs={"operation": "multiply"})
t1.start()
t2.start()
t1.join()
t2.join()
print(f"[主线程] 所有任务执行完成")
3.4 线程对象(Thread)常用属性与方法
threading.Thread类的API和multiprocessing.Process高度一致,核心属性和方法如下:
|
分类 |
属性/方法 |
详细功能说明 |
|
启动控制 |
|
启动线程,将线程转为就绪状态,等待CPU调度,每个线程只能调用一次 |
|
执行逻辑 |
|
线程的核心执行方法,子类可重写,直接调用不会启动新线程 |
|
等待控制 |
|
阻塞当前线程,直到被调用的线程执行完毕或超时 |
|
状态判断 |
|
判断线程是否处于运行状态,返回布尔值 |
|
核心属性 |
|
线程的名称,可自定义,默认格式为 |
|
核心属性 |
|
线程的唯一标识符,线程启动前为None,启动后为非0整数 |
|
核心属性 |
|
布尔值,设置线程是否为守护线程,必须在 |
|
核心属性 |
|
操作系统分配的原生线程ID,Python3.8+支持 |
完整演示代码:
import threading
import time
def task():
print(f"[子线程] 启动,线程ID={threading.current_thread().ident}")
time.sleep(3)
print(f"[子线程] 执行结束")
print(f"[主线程] 启动,线程ID={threading.current_thread().ident}")
t = threading.Thread(target=task, name="DemoThread")
print(f"线程名称:{t.name},线程ID:{t.ident},是否存活:{t.is_alive()}")
t.start()
print("="*30)
print(f"线程名称:{t.name},线程ID:{t.ident},是否存活:{t.is_alive()}")
time.sleep(1)
print(f"1秒后,线程是否存活:{t.is_alive()}")
t.join()
print("="*30)
print(f"线程结束后,是否存活:{t.is_alive()}")
print(f"[主线程] 执行结束")
3.5 线程编号的获取详解
Python提供了两种方式获取线程编号,适用于不同的场景:
threading.current_thread().ident:Python解释器分配的线程标识符,在当前进程内唯一,线程结束后,该ID可能会被复用。threading.get_ident():和上述方法完全一致,是更简洁的调用方式。threading.current_thread().native_id:操作系统内核分配的原生线程ID,全局唯一,可用于系统工具排查线程问题,Python3.8及以上版本支持。
演示代码:
import threading
import time
def task():
# 多种方式获取线程编号
current_thread = threading.current_thread()
print(f"[子线程] 名称:{current_thread.name}")
print(f"[子线程] Python线程ID(ident):{current_thread.ident} = {threading.get_ident()}")
print(f"[子线程] 操作系统原生线程ID(native_id):{current_thread.native_id}")
time.sleep(2)
print(f"[主线程] Python线程ID:{threading.get_ident()}")
print(f"[主线程] 操作系统原生线程ID:{threading.current_thread().native_id}")
t = threading.Thread(target=task, name="WorkerThread")
t.start()
t.join()
3.6 守护线程(Daemon Thread)
3.6.1 定义与核心特性
守护线程是运行在后台的特殊线程,生命周期依附于主线程:当主线程的所有非守护子线程执行完毕、准备退出时,会强制终止所有守护线程,无论守护线程是否执行完毕。
3.6.2 核心使用规则
- 必须在线程调用
start()启动前,设置daemon = True,启动后无法修改。 - 守护线程可以创建新的子线程,新子线程会继承父线程的daemon属性。
- 主线程退出时,会等待所有非守护子线程执行完毕,再终止守护线程。
- 常用于后台辅助任务,如日志收集、心跳检测、定时任务等。
3.6.3 完整演示代码
import threading
import time
def daemon_task():
"""守护线程任务:后台心跳检测"""
print(f"[守护线程] 启动,开始心跳检测")
for i in range(5):
time.sleep(1)
print(f"[守护线程] 心跳正常... 第{i+1}秒")
print(f"[守护线程] 正常执行结束") # 不会执行
def normal_task():
"""非守护线程任务:核心业务逻辑"""
print(f"[非守护线程] 启动,开始执行业务")
time.sleep(3)
print(f"[非守护线程] 业务执行结束")
print(f"[主线程] 启动")
# 创建守护线程
t_daemon = threading.Thread(target=daemon_task)
t_daemon.daemon = True # 设置为守护线程
# 创建非守护线程
t_normal = threading.Thread(target=normal_task)
t_daemon.start()
t_normal.start()
t_normal.join()
print(f"[主线程] 所有非守护线程执行完毕,准备退出")
3.7 主线程与子线程的开始/结束时机
3.7.1 开始时机
- 主线程:Python程序启动时,自动创建主线程,开始执行代码。
- 子线程:调用
start()方法后,进入就绪状态,等待CPU调度后开始执行。
3.7.2 结束时机
- 子线程:目标函数/
run()方法执行完毕,或发生未捕获的异常时结束。 - 主线程:执行完所有代码后,不会立即退出,会等待所有非守护子线程全部执行完毕后,才会正式退出,同时强制终止所有守护线程。
3.8 线程的资源交互与线程安全
同一进程内的所有线程共享进程的内存空间和全局变量,线程间通信无需任何额外机制,直接读写全局变量即可,但并发修改共享资源时,会产生严重的线程安全问题。
3.8.1 线程共享全局变量的验证
import threading
import time
# 全局变量
g_num = 0
def add_num():
"""子线程修改全局变量"""
global g_num
for _ in range(1000000):
g_num += 1
print(f"[主线程] 初始g_num={g_num}")
# 创建两个线程,同时累加全局变量
t1 = threading.Thread(target=add_num)
t2 = threading.Thread(target=add_num)
t1.start()
t2.start()
t1.join()
t2.join()
# 预期结果是2000000,但实际结果往往小于2000000,甚至每次运行结果都不同
print(f"[主线程] 最终g_num={g_num}")
3.8.2 线程安全问题的根源
上述代码中,g_num += 1不是原子操作,它在底层分为3步执行:
- 从内存中读取
g_num的当前值到CPU寄存器。 - 在CPU中计算
g_num + 1的结果。 - 将计算结果写回内存中的
g_num。
由于GIL的时间分片机制,线程可能在任意一步执行完后被切换,导致两个线程同时读取到同一个旧值,分别计算后写回,造成累加结果丢失,这就是线程安全问题。
3.8.3 解决方案:互斥锁Lock
互斥锁(Mutex Lock)可以保证同一时刻,只有一个线程可以执行被锁保护的代码块,从根本上解决线程安全问题。threading.Lock是Python提供的最基础的互斥锁。
核心方法:
acquire(blocking=True, timeout=None):获取锁,blocking=True时锁被占用会阻塞等待。release():释放锁,必须和acquire成对出现,否则会导致死锁。
修复后的代码:
import threading
import time
g_num = 0
# 创建互斥锁
lock = threading.Lock()
def add_num():
global g_num
for _ in range(1000000):
# 加锁:保证同一时刻只有一个线程执行下面的代码
lock.acquire()
g_num += 1
# 释放锁:必须释放,否则其他线程永远无法获取锁
lock.release()
print(f"[主线程] 初始g_num={g_num}")
t1 = threading.Thread(target=add_num)
t2 = threading.Thread(target=add_num)
t1.start()
t2.start()
t1.join()
t2.join()
# 结果正确,始终为2000000
print(f"[主线程] 最终g_num={g_num}")
更安全的写法:with语句自动管理锁
使用with语句包裹锁,会在代码块执行前自动获取锁,代码块执行完毕后自动释放锁,即使发生异常也会释放,避免死锁。
def add_num():
global g_num
for _ in range(1000000):
with lock: # 自动获取和释放锁
g_num += 1
3.9 常用线程同步工具
除了基础的Lock,threading模块还提供了多种高级同步工具,应对不同的并发场景。
3.9.1 可重入锁RLock
可重入锁(Reentrant Lock)允许同一个线程多次获取同一把锁,不会导致死锁,适用于嵌套加锁的场景。获取多少次锁,就必须释放多少次锁。
import threading
# 创建可重入锁
rlock = threading.RLock()
def func1():
with rlock:
print("func1执行,已获取锁")
func2() # 嵌套调用,再次获取同一把锁
def func2():
with rlock:
print("func2执行,嵌套获取锁成功")
t = threading.Thread(target=func1)
t.start()
t.join()
3.9.2 信号量Semaphore
信号量用于控制同时执行的最大线程数量,限制并发数,适用于爬虫、接口请求等需要控制并发量的场景。信号量内部维护了一个计数器,acquire时计数器-1,release时计数器+1,计数器为0时,acquire会阻塞等待。
import threading
import time
# 创建信号量,最大并发数为3
semaphore = threading.Semaphore(3)
def crawl_task(url):
"""爬虫任务,同时最多3个线程执行"""
with semaphore:
print(f"[线程 {threading.get_ident()}] 开始爬取:{url}")
time.sleep(2)
print(f"[线程 {threading.get_ident()}] 爬取完成:{url}")
# 创建10个线程,同时爬取10个网址
url_list = [f"https://example.com/page/{i}" for i in range(10)]
thread_list = []
for url in url_list:
t = threading.Thread(target=crawl_task, args=(url,))
thread_list.append(t)
t.start()
for t in thread_list:
t.join()
print("所有爬取任务完成")
3.9.3 事件Event
事件用于线程间的通知机制,一个线程发出事件信号,其他线程等待信号后执行。Event内部维护了一个布尔标志位,set()将标志位设为True,clear()设为False,wait()会阻塞等待标志位变为True。
import threading
import time
# 创建事件对象
event = threading.Event()
def monitor_task():
"""监控线程:等待事件信号,触发后执行操作"""
print("[监控线程] 启动,等待事件触发")
# 阻塞等待事件信号
event.wait()
print("[监控线程] 收到事件信号,开始执行任务")
time.sleep(2)
print("[监控线程] 任务执行完成")
def trigger_task():
"""触发线程:延迟后发送事件信号"""
print("[触发线程] 启动,3秒后触发事件")
time.sleep(3)
# 发送事件信号,将标志位设为True
event.set()
print("[触发线程] 事件已触发")
t1 = threading.Thread(target=monitor_task)
t2 = threading.Thread(target=trigger_task)
t1.start()
t2.start()
t1.join()
t2.join()
3.10 ThreadLocal线程本地变量
ThreadLocal(线程本地变量)解决了同一个变量在不同线程之间的隔离问题,每个线程都有自己独立的变量副本,修改不会影响其他线程,无需加锁,完美避免线程安全问题。
Python中通过threading.local()创建ThreadLocal对象。
import threading
# 创建ThreadLocal对象
local_data = threading.local()
def process_user(name):
"""处理用户信息,每个线程的user_name独立"""
local_data.user_name = name
print(f"[线程 {threading.get_ident()}] 设置用户名:{local_data.user_name}")
# 调用其他函数,无需传递参数,直接从local_data中获取
show_user()
def show_user():
"""展示用户信息"""
print(f"[线程 {threading.get_ident()}] 获取到用户名:{local_data.user_name}")
# 创建多个线程,处理不同的用户
t1 = threading.Thread(target=process_user, args=("张三",))
t2 = threading.Thread(target=process_user, args=("李四",))
t1.start()
t2.start()
t1.join()
t2.join()
3.11 死锁的产生与避免
死锁是多线程编程中最严重的问题之一,指两个或多个线程互相等待对方持有的锁,导致所有线程都被永久阻塞,无法继续执行。
3.11.1 死锁产生的四个必要条件
- 互斥条件:一个资源同一时间只能被一个线程持有。
- 请求与保持条件:线程已经持有至少一个资源,又请求其他被其他线程持有的资源,同时不释放自己持有的资源。
- 不可剥夺条件:线程持有的资源,只能由自己主动释放,其他线程无法强行剥夺。
- 循环等待条件:多个线程之间形成头尾相接的循环等待资源的关系。
四个条件必须同时满足,才会产生死锁,只要破坏其中一个条件,就能避免死锁。
3.11.2 死锁演示代码
import threading
import time
# 创建两把锁
lock1 = threading.Lock()
lock2 = threading.Lock()
def func1():
with lock1:
print("func1获取了lock1,等待lock2")
time.sleep(1) # 让func2先获取lock2,形成死锁
with lock2:
print("func1获取了lock2,执行完成")
def func2():
with lock2:
print("func2获取了lock2,等待lock1")
time.sleep(1) # 让func1先获取lock1,形成死锁
with lock1:
print("func2获取了lock1,执行完成")
t1 = threading.Thread(target=func1)
t2 = threading.Thread(target=func2)
t1.start()
t2.start()
t1.join()
t2.join()
# 执行结果:两个线程互相等待对方的锁,程序永久阻塞,无法退出
3.11.3 死锁的避免方法
- 固定加锁顺序:所有线程按照相同的顺序获取锁,比如先获取lock1,再获取lock2,破坏循环等待条件。
- 设置超时时间:给
acquire()设置超时时间,超时后释放自己持有的锁,破坏请求与保持条件。 - 一次性获取所有需要的锁:线程一次性获取所有需要的锁,获取失败就释放所有锁,破坏请求与保持条件。
- 减少锁的使用:优先使用ThreadLocal、无锁数据结构,减少锁的使用。
修复死锁的代码(固定加锁顺序):
import threading
import time
lock1 = threading.Lock()
lock2 = threading.Lock()
def func1():
# 固定顺序:先获取lock1,再获取lock2
with lock1:
print("func1获取了lock1,等待lock2")
time.sleep(1)
with lock2:
print("func1获取了lock2,执行完成")
def func2():
# 和func1保持相同的加锁顺序
with lock1:
print("func2获取了lock1,等待lock2")
time.sleep(1)
with lock2:
print("func2获取了lock2,执行完成")
t1 = threading.Thread(target=func1)
t2 = threading.Thread(target=func2)
t1.start()
t2.start()
t1.join()
t2.join()
print("程序正常执行完成,无死锁")
3.12 线程池ThreadPoolExecutor
和进程池一样,线程的创建和销毁也有开销,大量线程会导致系统资源耗尽。Python提供了concurrent.futures.ThreadPoolExecutor线程池,用于复用线程执行大量任务,是生产环境中多线程编程的首选方案。
3.12.1 核心方法
|
方法 |
功能说明 |
|
|
异步提交单个任务,返回Future对象,可通过 |
|
|
批量提交任务,将可迭代对象的元素作为参数传入func,按顺序返回结果 |
|
|
关闭线程池,不再接受新任务,wait=True时等待所有任务执行完毕 |
|
|
迭代Future对象列表,任务执行完毕后立即返回,无需按提交顺序 |
3.12.2 完整演示代码
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time
def crawl_task(url):
"""爬虫任务"""
print(f"[线程 {threading.get_ident()}] 开始爬取:{url}")
time.sleep(1)
result = f"{url} 爬取完成"
print(f"[线程 {threading.get_ident()}] {result}")
return result
# 主线程执行
print("[主线程] 开始执行")
url_list = [f"https://example.com/page/{i}" for i in range(10)]
# 创建线程池,最大线程数为3
with ThreadPoolExecutor(max_workers=3) as executor:
# 1. 批量提交任务,获取所有Future对象
future_list = [executor.submit(crawl_task, url) for url in url_list]
# 2. 按任务完成顺序获取结果
print("="*20 + "任务执行结果" + "="*20)
for future in as_completed(future_list):
print(future.result())
print("[主线程] 所有任务执行完成")
四、多进程与多线程核心区别对比
|
对比维度 |
多进程(Multiprocessing) |
多线程(Threading) |
|
内存空间 |
每个进程拥有完全独立的内存地址空间,资源隔离 |
同一进程内的所有线程共享内存空间和资源 |
|
资源开销 |
极大,创建、销毁、切换进程的开销极高,占用系统资源多 |
极小,线程被称为轻量级进程,创建、销毁、切换开销极低 |
|
通信难度 |
难度高,必须通过Queue、Pipe、Manager等IPC机制,通信成本高 |
难度低,直接共享全局变量,通信成本极低,但需处理线程安全问题 |
|
GIL影响 |
无影响,每个进程有独立的Python解释器和GIL,可利用多核CPU实现并行 |
受GIL严格限制,同一时刻只有一个线程执行字节码,无法利用多核CPU并行 |
|
稳定性 |
极高,进程之间完全隔离,一个进程崩溃不会影响其他进程 |
极低,线程之间共享进程资源,一个线程崩溃会导致整个进程崩溃 |
|
调试难度 |
较高,多进程调试复杂,问题复现难度大 |
较低,多线程调试相对简单,共享内存便于排查问题 |
|
适用场景 |
CPU密集型任务:科学计算、数据处理、视频转码、加密解密等 |
IO密集型任务:网络爬虫、Web服务、文件读写、数据库操作、接口请求等 |
|
并发/并行 |
可实现真正的并行,充分利用多核CPU |
只能实现并发,无法利用多核CPU并行(IO密集型场景除外) |
五、适用场景与选型建议
5.1 优先选择多线程的场景
- IO密集型任务:程序大部分时间在等待IO操作(网络请求、文件读写、数据库查询),多线程可以极大提升效率,开销极小。
- 任务之间需要频繁通信:线程间共享内存,通信成本极低,无需复杂的IPC机制。
- 轻量级并发任务:任务数量多、执行时间短,多线程的低开销可以极大提升程序吞吐量。
- GUI程序开发:使用子线程处理后台任务,避免主线程阻塞导致界面卡死。
5.2 必须选择多进程的场景
- CPU密集型任务:程序大部分时间在占用CPU进行计算,多进程可以绕过GIL,利用多核CPU实现并行,效率远超多线程。
- 高稳定性要求:任务之间需要完全隔离,一个任务崩溃不能影响其他任务,比如Web服务器的worker进程。
- 需要突破进程资源限制:单个进程的内存、文件句柄等资源有限,多进程可以突破单进程的资源限制。
5.3 混合使用场景
大型复杂系统可以采用多进程+多线程的混合架构:
- 主进程启动多个子进程,利用多核CPU处理不同的业务模块。
- 每个子进程内部启动多个线程,处理IO密集型任务,提升并发能力。
- 典型案例:Nginx、Gunicorn等Web服务器,均采用多进程+多线程/多协程的架构。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)