【vllm】(v1 Executor)vLLM V1 Executor 深度分析 — Part2: 多进程执行器
vLLM V1 Executor 深度分析 — Part2: 多进程执行器
一、模块定位
1.1 文件概览
multiproc_executor.py 是 vLLM v1 执行器体系中最核心、最复杂的实现文件(1037行),定义了多进程执行器 MultiprocExecutor。它是 Executor 抽象基类的具体实现,负责:
- 启动和管理多个 Worker 子进程:每个 GPU 对应一个独立进程
- 进程间通信(IPC):通过共享内存消息队列(
MessageQueue)实现零拷贝广播 - 分布式协调:管理 TP/PP/DP/PCP/EP/DCP 等多种并行维度的初始化
- 模型执行调度:将 SchedulerOutput 广播给所有 Worker,收集 ModelRunnerOutput
- 生命周期管理:优雅关闭、健康监控、故障恢复
1.2 核心架构思想
┌─────────────────────────────────────────────────────┐
│ EngineCore (主进程) │
│ │
│ ┌──────────────────────────────────────────────┐ │
│ │ MultiprocExecutor │ │
│ │ │ │
│ │ rpc_broadcast_mq ──────────┐ │ │
│ │ (SchedulerOutput广播) │ │ │
│ │ ▼ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │Worker 0 │ │Worker 1 │ │Worker N │ │ │
│ │ │ (GPU0) │ │ (GPU1) │ │ (GPUN) │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ │ │ │ │ │ │
│ │ response_mq[0] response_mq[1] response_mq[N]│ │
│ │ └────────────┴────────────┘ │ │
│ │ │ │ │
│ │ ModelRunnerOutput │ │
│ └──────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
关键设计决策:
- 每个Worker运行在独立进程中,避免GIL争用
- 使用共享内存消息队列(而非pickle/socket),实现高效数据传输
- 广播-响应模式:一个写入端(Executor),多个读取端(Workers)
- 父子进程存活检测:通过"death pipe"实现父子进程双向监控
1.3 与其他执行器的关系
| 执行器类型 | 适用场景 | 进程模型 |
|---|---|---|
UniProcExecutor |
单GPU | 单进程 |
MultiprocExecutor |
多GPU单节点 / 多节点 | 多进程 |
RayExecutor |
Ray集群 | Ray Actor |
MultiprocExecutor 是 vLLM v1 的默认多GPU执行器,直接使用 Python multiprocessing 而非 Ray,减少了外部依赖和序列化开销。
二、multiproc_executor.py 逐行分析
2.0 模块导入(第1-76行)
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
标准 Apache 2.0 许可证头。
标准库导入
import multiprocessing # 进程创建与管理
import os # 环境变量、文件描述符操作
import pickle # 序列化协议常量
import queue # 线程间队列(用于async调度模式)
import signal # 信号处理(SIGTERM/SIGINT)
import threading # 线程创建
import time # 超时与计时
import traceback # 异常堆栈格式化
import weakref # 弱引用(finalizer、monitor回调)
设计意图:大量使用标准库,减少外部依赖。weakref 的使用尤为关键——它允许对象被垃圾回收时自动触发清理,而不阻止回收本身。
集合与类型导入
from collections import deque # 有序双端队列
from collections.abc import Callable, Sequence # 类型协议
from concurrent.futures import Future, InvalidStateError # 异步结果容器
from contextlib import suppress # 上下文管理器:静默异常
from dataclasses import dataclass # 数据类装饰器
from enum import Enum, auto # 枚举类型
from functools import cached_property, partial # 缓存属性、偏函数
from multiprocessing.connection import Connection # 管道连接类型
from multiprocessing.process import BaseProcess # 进程基类
from multiprocessing.synchronize import Lock as LockType # 跨进程锁类型
from threading import Thread # 线程类
from typing import Any, cast # 类型工具
要点:
deque:用于futures_queue,保证FIFO顺序的响应处理InvalidStateError:Future设置结果/异常时可能抛出,用suppress静默Connection:管道通信的类型注解LockType:跨进程锁的类型别名,避免与threading.Lock混淆
第三方导入
import cloudpickle # 增强序列化:支持lambda、局部函数
import torch # PyTorch:线程数控制
cloudpickle 是关键——当 method 不是字符串而是可调用对象时,需要用 cloudpickle 序列化后在子进程中反序列化执行。
vLLM内部导入
import vllm.envs as envs # 环境变量访问
from vllm.config import VllmConfig # 全局配置对象
from vllm.distributed import ( # 分布式环境管理
destroy_distributed_environment,
destroy_model_parallel,
)
from vllm.distributed.device_communicators.shm_broadcast import (
Handle, MessageQueue, # 共享内存广播核心组件
)
from vllm.distributed.kv_transfer.kv_connector.utils import (
KVOutputAggregator, # KV传输输出聚合器
)
from vllm.distributed.parallel_state import ( # 并行状态查询
get_dcp_group, get_dp_group, get_ep_group,
get_inner_dp_world_group, get_pcp_group,
get_pp_group, get_tp_group,
model_parallel_is_initialized,
)
from vllm.envs import enable_envs_cache # 环境变量缓存开关
from vllm.logger import init_logger # 日志初始化
from vllm.platforms import current_platform # 平台抽象层
from vllm.tracing import instrument, maybe_init_worker_tracer # 追踪
from vllm.utils import numa_utils # NUMA绑定工具
from vllm.utils.network_utils import ( # 网络工具
get_distributed_init_method, get_ip, get_loopback_ip, get_open_port,
)
from vllm.utils.ompmultiprocessing import OMPProcessManager # OpenMP线程管理
from vllm.utils.system_utils import ( # 系统工具
_maybe_force_spawn, decorate_logs, get_mp_context,
set_process_title,
)
from vllm.v1.core.sched.output import GrammarOutput, SchedulerOutput # 调度输出
from vllm.v1.executor.abstract import Executor, FailureCallback # 抽象基类
from vllm.v1.outputs import ( # 模型输出类型
AsyncModelRunnerOutput, DraftTokenIds, ModelRunnerOutput,
)
from vllm.v1.worker.worker_base import WorkerWrapperBase # Worker包装器
核心依赖:
MessageQueue/Handle:共享内存广播机制,是整个通信基础设施parallel_state中的各种get_*_group():查询各并行维度的状态get_inner_dp_world_group():多节点场景下的DP内通信组KVOutputAggregator:聚合跨Worker的KV传输输出
logger = init_logger(__name__)
初始化模块级日志器。
2.1 FutureWrapper类(第79-109行)
class FutureWrapper(Future):
设计目的:包装 concurrent.futures.Future,实现有序响应收集。
问题背景:在多Worker场景中,多个RPC调用可能并发执行,但响应必须按发起顺序处理。直接使用 Future 无法保证这个顺序——第2个RPC的响应可能先于第1个到达。
解决方案:FutureWrapper 维护一个 deque,所有 FutureWrapper 实例共享同一个队列。当调用 result() 时,会从队尾开始逐个等待响应,确保先进先出。
def __init__(
self,
futures_queue: deque["FutureWrapper"], # 共享的有序队列
get_response: Callable[[], Any], # 从MessageQueue获取响应的闭包
aggregate: Callable = lambda x: x, # 可选的聚合函数
):
self.futures_queue = futures_queue
self.get_response = get_response
self.aggregate = aggregate
super().__init__()
self.futures_queue.appendleft(self) # 新Future入队首(左侧)
appendleft 的意义:deque的左侧是"最新"端,右侧是"最老"端。调用 result() 时从右侧pop,即先进先出。appendleft 确保新创建的Future排在队首,而等待处理时从队尾取出最老的。
def result(self, timeout=None):
if timeout is not None:
raise RuntimeError("timeout not implemented")
不支持超时参数——因为 FutureWrapper 的等待机制不是标准 Future 的条件变量等待,而是通过队列逐个处理响应。
# Drain any futures ahead of us in the queue.
while not self.done():
future = self.futures_queue.pop() # 从队尾取最老的Future
future._wait_for_response() # 等待并设置其结果
return super().result() # 调用父类获取已设置的结果
核心逻辑:调用者(最新Future)会先处理所有排在前面的Future。这是一个协作式设计——每个调用者帮助处理前序请求,类似于"帮同伴推进"的工作窃取思想。
示例:
创建顺序: F1, F2, F3
deque状态: [F3, F2, F1] (F1在右侧/队尾)
F3.result() 调用:
1. pop() → F1, _wait_for_response() → F1完成
2. pop() → F2, _wait_for_response() → F2完成
3. F3.done() == True → 返回F3结果
def _wait_for_response(self):
try:
response = self.aggregate(self.get_response())
with suppress(InvalidStateError):
self.set_result(response)
except Exception as e:
with suppress(InvalidStateError):
self.set_exception(e)
_wait_for_response:
- 调用
get_response()闭包——从MessageQueue中读取响应 - 可选地用
aggregate聚合(用于KV输出聚合) - 设置Future结果或异常
suppress(InvalidStateError):如果Future已被取消或已设置,静默忽略
2.2 UnreadyWorkerProcHandle 与 WorkerProcHandle(第439-482行)
在分析 MultiprocExecutor 之前,先理解这两个数据类,它们是Worker句柄的两种状态。
UnreadyWorkerProcHandle(第439-444行)
@dataclass
class UnreadyWorkerProcHandle:
"""WorkerProcess handle before READY."""
proc: BaseProcess # 子进程对象
rank: int # 全局rank
ready_pipe: Connection # 子进程→父进程的就绪通知管道
death_writer: Connection | None = None # 父进程→子进程的死亡通知管道
设计意图:Worker子进程启动后需要经过初始化(加载模型、设置分布式等),初始化完成前用 UnreadyWorkerProcHandle 表示。ready_pipe 用于子进程向父进程报告"我准备好了"。
管道方向:
ready_pipe:子进程写端 → 父进程读端death_writer:父进程写端 → 子进程读端(父进程退出时,子进程收到EOF)
WorkerProcHandle(第447-482行)
@dataclass
class WorkerProcHandle:
proc: BaseProcess # 子进程对象
rank: int # 全局rank
worker_response_mq: MessageQueue | None # 本Worker的响应消息队列
peer_worker_response_mqs: list[MessageQueue | None] # 远程Worker的响应队列
death_writer: Connection | None = None # 死亡通知管道
字段解析:
worker_response_mq:本地Worker写入的响应队列。单节点模式下每个Worker都有自己的。peer_worker_response_mqs:多节点专属。driver节点需要从所有Worker(包括远程节点)收集响应。peer_worker_response_mqs[i]对应 rank=i 的远程Worker的响应队列。death_writer:与UnreadyWorkerProcHandle中含义相同
@classmethod
def from_unready_handle(
cls,
unready_handle: UnreadyWorkerProcHandle,
worker_response_mq: MessageQueue | None,
peer_worker_response_mqs: list[MessageQueue | None],
) -> "WorkerProcHandle":
return cls(
proc=unready_handle.proc,
rank=unready_handle.rank,
worker_response_mq=worker_response_mq,
peer_worker_response_mqs=peer_worker_response_mqs,
death_writer=unready_handle.death_writer,
)
状态转换:UnreadyWorkerProcHandle → WorkerProcHandle,通过 from_unready_handle 工厂方法完成。转换发生在Worker发送READY信号后,此时响应队列已建立。
2.3 MultiprocExecutor类(第113-437行)
class MultiprocExecutor(Executor):
supports_pp: bool = True # 支持流水线并行
类声明:继承自 Executor 抽象基类,声明支持流水线并行(PP)。
2.3.1 构造与初始化
def __init__(self, vllm_config: VllmConfig, monitor_workers: bool = True):
self.monitor_workers = monitor_workers
super().__init__(vllm_config)
monitor_workers 参数:
True(默认):启动后台线程监控Worker进程存活状态False:headless模式下不监控,由外部系统负责
调用父类 Executor.__init__,它会设置 self.vllm_config、self.parallel_config、self.scheduler_config 等属性,并调用 self._init_executor()。
_get_parallel_sizes(第218-230行)
def _get_parallel_sizes(self) -> tuple[int, int, int]:
self.world_size = self.parallel_config.world_size
assert self.world_size % self.parallel_config.nnodes_within_dp == 0, (
f"global world_size ({self.parallel_config.world_size}) must be "
f"divisible by nnodes_within_dp "
f"({self.parallel_config.nnodes_within_dp}). "
)
self.local_world_size = self.parallel_config.local_world_size
tp_size = self.parallel_config.tensor_parallel_size
pp_size = self.parallel_config.pipeline_parallel_size
pcp_size = self.parallel_config.prefill_context_parallel_size
return tp_size, pp_size, pcp_size
功能:提取并验证并行维度参数。
关键校验:world_size % nnodes_within_dp == 0——确保DP组内的节点数能整除全局世界大小,这是分布式训练的基本约束。
返回三个并行维度:TP(张量并行)、PP(流水线并行)、PCP(预填充上下文并行)。
_init_executor(第120-216行)——核心初始化
这是整个执行器最复杂的初始化流程,逐步分析:
def _init_executor(self) -> None:
# Call self.shutdown at exit to clean up
# and ensure workers will be terminated.
self._finalizer = weakref.finalize(self, self.shutdown)
弱引用终结器:当 MultiprocExecutor 对象被垃圾回收时,自动调用 self.shutdown()。使用 weakref.finalize 而非 __del__ 的原因:
__del__在解释器关闭时可能不执行weakref.finalize更可靠,即使对象处于循环引用中也能触发- 回调函数在独立上下文中执行,不会持有对象引用
self.is_failed = False
self.failure_callback: FailureCallback | None = None
故障状态:is_failed 标记执行器是否故障,failure_callback 是故障时的回调函数(由EngineCore注册)。
tp_size, pp_size, pcp_size = self._get_parallel_sizes()
assert self.world_size == tp_size * pp_size * pcp_size, (
f"world_size ({self.world_size}) must be equal to the "
f"tensor_parallel_size ({tp_size}) x pipeline"
f"_parallel_size ({pp_size}) x prefill_context"
f"_parallel_size ({pcp_size}). "
)
关键不变量:world_size = TP × PP × PCP。这三个并行维度是正交的,它们的乘积必须等于总进程数。
set_multiprocessing_worker_envs()
设置多进程环境变量(详见2.6节)。
# use the loopback address get_loopback_ip() for communication.
distributed_init_method = get_distributed_init_method(
get_loopback_ip(), get_open_port()
)
分布式初始化方法:使用回环地址(127.0.0.1)而非外部IP,因为同节点内的Worker通过本地回环通信效率更高。get_open_port() 获取一个可用端口用于Torch分布式后端。
为什么用loopback:在单节点场景下,Worker都在同一机器上,使用loopback避免不必要的网络跳转。多节点场景则通过 master_addr 配置使用外部IP。
self.rpc_broadcast_mq: MessageQueue | None = None
scheduler_output_handle: Handle | None = None
消息队列初始化:
rpc_broadcast_mq:广播消息队列,Executor写入→所有Worker读取scheduler_output_handle:广播队列的序列化句柄,传递给子进程使其能连接到同一共享内存区域
if self.parallel_config.node_rank_within_dp == 0:
Leader/Follower区分:在多节点DP(Data Parallel)场景中,每个DP组有一个leader节点。只有leader节点创建广播消息队列,follower节点通过分布式通信组接收数据。
max_chunk_bytes = envs.VLLM_MQ_MAX_CHUNK_BYTES_MB * 1024 * 1024
mq_connect_ip = get_ip()
logger.info(
"DP group leader: node_rank=%d, node_rank_within_dp=%d, "
"master_addr=%s, mq_connect_ip=%s (local), "
"world_size=%d, local_world_size=%d",
self.parallel_config.node_rank,
self.parallel_config.node_rank_within_dp,
self.parallel_config.master_addr,
mq_connect_ip,
self.world_size,
self.local_world_size,
)
max_chunk_bytes:控制消息队列单个chunk的最大字节数。当 SchedulerOutput 较大时,会被分块传输。默认值来自环境变量 VLLM_MQ_MAX_CHUNK_BYTES_MB。
mq_connect_ip:使用真实IP(get_ip())而非loopback,因为消息队列可能需要跨节点连接。
self.rpc_broadcast_mq = MessageQueue(
self.world_size,
self.local_world_size,
max_chunk_bytes=max_chunk_bytes,
connect_ip=mq_connect_ip,
)
scheduler_output_handle = self.rpc_broadcast_mq.export_handle()
创建广播队列:
self.world_size:总队列容量(全局Worker数)self.local_world_size:本地读取者数量(本节点GPU数)connect_ip:连接IP地址export_handle():将队列的共享内存元数据序列化为Handle对象,可以传递给子进程
Handle的本质:包含共享内存文件路径、偏移量等元信息。子进程通过 Handle 可以映射到同一块共享内存,实现零拷贝通信。
Worker进程创建
context = get_mp_context()
shared_worker_lock = context.Lock()
get_mp_context():获取multiprocessing上下文,可能是 spawn、fork 或 forkserver。vLLM 通常使用 spawn 模式(GPU相关代码不能在fork后正常工作)。
shared_worker_lock:跨进程锁,用于Worker间共享资源的互斥访问。具体用途是Worker初始化时的序列化操作(如模型加载时的NCCL初始化可能需要同步)。
unready_workers: list[UnreadyWorkerProcHandle] = []
success = False
try:
异常安全模式:success 标志 + try/finally 确保初始化失败时清理已创建的Worker进程。
global_start_rank = (
self.local_world_size * self.parallel_config.node_rank_within_dp
)
Rank计算:在多节点场景中,每个节点的Worker rank是连续的。global_start_rank 是当前节点第一个Worker的全局rank。
示例:3节点,每节点4GPU → local_world_size=4
- node_rank_within_dp=0: global_start_rank=0, ranks=[0,1,2,3]
- node_rank_within_dp=1: global_start_rank=4, ranks=[4,5,6,7]
- node_rank_within_dp=2: global_start_rank=8, ranks=[8,9,10,11]
inherited_fds: list[int] | None = (
[] if context.get_start_method() == "fork" else None
)
inherited_fds:仅在 fork 模式下追踪继承的文件描述符。fork时子进程继承父进程所有打开的fd,如果不关闭多余的管道fd,会导致管道无法正常关闭(EOF检测失败)。
spawn 模式下无需关注:spawn创建全新进程,只序列化必要的数据。
cpu_omp_manager = OMPProcessManager(self.vllm_config)
OpenMP管理:针对CPU后端,为每个Worker配置OpenMP线程亲和性(CPU绑核)。
for local_rank in range(self.local_world_size):
global_rank = global_start_rank + local_rank
is_driver_worker = self._is_driver_worker(global_rank)
遍历本地Worker:为每个GPU创建一个Worker进程。
is_driver_worker:rank % tp_size == 0 的Worker是driver worker。在TP组中,只有rank=0的Worker负责采样token和发送输出。
with cpu_omp_manager.configure_omp_envs(
rank=global_rank, local_rank=local_rank
):
unready_worker_handle = WorkerProc.make_worker_process(
vllm_config=self.vllm_config,
local_rank=local_rank,
rank=global_rank,
distributed_init_method=distributed_init_method,
input_shm_handle=scheduler_output_handle,
shared_worker_lock=shared_worker_lock,
is_driver_worker=is_driver_worker,
inherited_fds=inherited_fds,
)
创建Worker进程:调用 WorkerProc.make_worker_process 静态方法(详见2.4节)。传入的参数包括配置、rank信息、分布式初始化方法、共享内存句柄等。
unready_workers.append(unready_worker_handle)
if inherited_fds is not None:
inherited_fds.append(unready_worker_handle.death_writer.fileno())
inherited_fds.append(unready_worker_handle.ready_pipe.fileno())
Fork模式下的FD追踪:记录已创建管道的文件描述符,后续Worker进程需要关闭这些"不属于自己"的fd。
等待Worker就绪
# Workers must be created before wait_for_ready to avoid
# deadlock, since worker.init_device() does a device sync.
# Wait for all local workers to be ready.
self.workers = WorkerProc.wait_for_ready(unready_workers)
关键注释:所有Worker进程必须先创建完毕,再等待就绪。因为 init_device() 中包含设备同步操作(NCCL通信初始化),如果Worker进程尚未全部启动,同步操作会死锁。
wait_for_ready 使用 multiprocessing.connection.wait() 高效等待多个管道,类似 select/poll。
启动Worker监控
if self.monitor_workers:
self.start_worker_monitor()
启动后台线程监控Worker进程存活状态(详见2.3.2节)。
构建响应队列列表
self.response_mqs = []
if self.parallel_config.node_rank_within_dp == 0:
for rank in range(self.world_size):
if rank < self.local_world_size:
local_message_queue = self.workers[rank].worker_response_mq
assert local_message_queue is not None
self.response_mqs.append(local_message_queue)
else:
remote_message_queue = self.workers[0].peer_worker_response_mqs[rank]
assert remote_message_queue is not None
self.response_mqs.append(remote_message_queue)
构建全局响应队列列表:self.response_mqs[rank] 可以获取 rank 对应 Worker 的响应队列。
单节点:rank < local_world_size 都走本地路径,直接从 self.workers[rank] 获取。
多节点:远程Worker的响应队列通过 peer_worker_response_mqs 获取。这些队列实际上是远程Worker通过分布式通信组暴露的句柄创建的。
注意:self.workers[0].peer_worker_response_mqs[rank]——即使远程Worker不在本节点,其响应队列句柄也通过Worker 0代理(因为所有Worker的 peer_response_handles 在READY消息中传递给了父进程)。
等待消息队列就绪
# Ensure message queues are ready. Will deadlock if re-ordered
# Must be kept consistent with the WorkerProc.
if self.rpc_broadcast_mq is not None:
self.rpc_broadcast_mq.wait_until_ready()
for response_mq in self.response_mqs:
response_mq.wait_until_ready()
死锁风险:消息队列的握手顺序必须与Worker端一致。wait_until_ready 会阻塞直到所有读者/写者都完成注册。如果顺序不一致,可能A端在等B端注册,而B端在等A端发送数据,形成死锁。
self.futures_queue = deque[FutureWrapper]()
self._post_init_executor()
success = True
futures_queue:所有 FutureWrapper 共享的有序队列。
_post_init_executor:钩子方法,子类可覆写以添加额外初始化逻辑。
异常处理
finally:
if not success:
for uw in unready_workers:
if uw.death_writer is not None:
uw.death_writer.close()
uw.death_writer = None
self._ensure_worker_termination([uw.proc for uw in unready_workers])
失败清理:关闭 death_writer 通知子进程退出,然后等待/强制终止所有Worker进程。
2.3.2 Worker健康监控
def start_worker_monitor(self, inline=False) -> None:
workers = self.workers
self_ref = weakref.ref(self)
self_ref = weakref.ref(self):监控线程持有Executor的弱引用,避免循环引用阻止垃圾回收。
def monitor_workers():
sentinels = [h.proc.sentinel for h in workers]
died = multiprocessing.connection.wait(sentinels)
proc.sentinel:每个进程对象的哨兵fd,进程退出时变为可读。multiprocessing.connection.wait() 类似 select(),等待任一fd变为可读。
died:返回第一个变为可读的sentinel列表(可能有多个同时退出)。
_self = self_ref()
if not _self or getattr(_self, "shutting_down", False):
logger.debug("MultiprocWorkerMonitor: shutdown already initiated")
return
检查Executor状态:
_self为None:Executor已被GC回收,无需处理shutting_down为True:正在正常关闭,无需额外处理
_self.is_failed = True
proc_name = next(h.proc.name for h in workers if h.proc.sentinel == died[0])
logger.error(
"Worker proc %s died unexpectedly, shutting down executor.", proc_name
)
_self.shutdown()
callback = _self.failure_callback
if callback is not None:
_self.failure_callback = None
callback()
故障处理流程:
- 标记执行器为故障状态
- 找到死亡进程名称
- 执行关闭流程
- 调用故障回调(通知EngineCore)
_self.failure_callback = None:先保存再清空,防止回调中再次触发shutdown。
if not inline:
Thread(
target=monitor_workers, daemon=True, name="MultiprocWorkerMonitor"
).start()
return
monitor_workers()
inline 模式:某些测试场景可能需要在当前线程中直接运行监控,而非后台线程。
2.3.3 故障回调注册
def register_failure_callback(self, callback: FailureCallback):
if self.is_failed:
callback()
else:
self.failure_callback = callback
设计精妙:如果执行器已经故障,立即调用回调;否则注册回调等待将来触发。这处理了竞态条件——注册回调时执行器可能已经故障。
2.3.4 execute_model(第278-288行)
def execute_model(
self, scheduler_output: SchedulerOutput, non_block: bool = False
) -> ModelRunnerOutput | None | Future[ModelRunnerOutput | None]:
return self.collective_rpc(
"execute_model",
args=(scheduler_output,),
unique_reply_rank=self.output_rank,
non_block=non_block,
timeout=envs.VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS,
kv_output_aggregator=self.kv_output_aggregator,
)
执行模型:将 SchedulerOutput 广播给所有Worker执行模型推理。
关键参数:
unique_reply_rank=self.output_rank:只需要最后一个PP阶段的TP rank=0的Worker返回结果non_block:异步模式下返回Future而非阻塞等待kv_output_aggregator:聚合KV传输输出(跨Worker的KV缓存传输)timeout:从环境变量读取超时时间,防止无限等待
2.3.5 sample_tokens(第290-300行)
def sample_tokens(
self, grammar_output: GrammarOutput | None, non_block: bool = False
) -> ModelRunnerOutput | Future[ModelRunnerOutput]:
return self.collective_rpc(
"sample_tokens",
args=(grammar_output,),
unique_reply_rank=self.output_rank,
non_block=non_block,
timeout=envs.VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS,
kv_output_aggregator=self.kv_output_aggregator,
)
语法约束采样:将 GrammarOutput 发送给Worker进行受限token采样(如JSON模式、正则约束等)。
2.3.6 execute_dummy_batch / take_draft_token_ids
def execute_dummy_batch(self) -> None:
self.collective_rpc("execute_dummy_batch", unique_reply_rank=self.output_rank)
空批处理:用于预热(warmup)或填充流水线气泡。在PP场景中,需要保持流水线满载,空批处理填充不完整的流水线阶段。
def take_draft_token_ids(self) -> DraftTokenIds | None:
return self.collective_rpc(
"take_draft_token_ids", unique_reply_rank=self.output_rank
)
推测解码:从draft模型获取候选token ID。只从 output_rank 获取即可。
2.3.7 collective_rpc——核心RPC机制(第314-380行)
def collective_rpc(
self,
method: str | Callable,
timeout: float | None = None,
args: tuple = (),
kwargs: dict | None = None,
non_block: bool = False,
unique_reply_rank: int | None = None,
kv_output_aggregator: KVOutputAggregator | None = None,
) -> Any:
"""Returns single result if unique_reply_rank and/or kv_output_aggregator
is provided, otherwise list."""
这是整个执行器最核心的方法——所有Worker通信都通过它。
参数解析:
method:字符串方法名或可调用对象。字符串→直接getattr,可调用→cloudpickle序列化timeout:RPC超时时间args/kwargs:方法参数non_block:是否异步(返回Future而非阻塞)unique_reply_rank:指定只有某个rank回复(优化:不需要所有Worker都发回结果)kv_output_aggregator:KV输出聚合器(用于KV传输场景)
assert self.rpc_broadcast_mq is not None, (
"collective_rpc should not be called on follower node"
)
if self.is_failed:
raise RuntimeError("Executor failed.")
前置检查:follower节点不应调用此方法(它没有广播队列),故障状态也不应继续。
deadline = None if timeout is None else time.monotonic() + timeout
kwargs = kwargs or {}
超时截止时间:使用单调时钟(time.monotonic),不受系统时间调整影响。
if kv_output_aggregator is not None:
output_rank = None
aggregate: Callable[[Any], Any] = partial(
kv_output_aggregator.aggregate, output_rank=unique_reply_rank or 0
)
else:
output_rank = unique_reply_rank
aggregate = lambda x: x
KV聚合 vs 唯一回复:
- 如果有
kv_output_aggregator:需要所有Worker的输出来聚合KV信息,所以output_rank=None(所有Worker都回复),然后通过aggregate函数合并 - 否则:只有
unique_reply_rank的Worker回复
这体现了两种通信模式:
- 聚合模式:所有Worker回复 → 聚合器合并 → 单一结果
- 唯一回复模式:只有指定rank回复 → 直接返回
if isinstance(method, str):
send_method = method
else:
send_method = cloudpickle.dumps(method, protocol=pickle.HIGHEST_PROTOCOL)
self.rpc_broadcast_mq.enqueue((send_method, args, kwargs, output_rank))
广播RPC请求:
- 字符串方法名直接发送(零开销)
- 可调用对象用
cloudpickle序列化(支持lambda、局部函数等) - 将四元组
(method, args, kwargs, output_rank)入队
所有Worker从同一个广播队列读取相同的数据——这就是"广播"的含义。MessageQueue 的共享内存设计确保零拷贝。
response_mqs: Sequence[MessageQueue] = self.response_mqs
if output_rank is not None:
response_mqs = (response_mqs[output_rank],)
响应队列选择:如果只需要特定rank的回复,缩小监听范围。
def get_response():
responses = []
for mq in response_mqs:
dequeue_timeout = (
None if deadline is None else (deadline - time.monotonic())
)
try:
status, result = mq.dequeue(timeout=dequeue_timeout)
except TimeoutError as e:
raise TimeoutError(f"RPC call to {method} timed out.") from e
if status != WorkerProc.ResponseStatus.SUCCESS:
raise RuntimeError(
f"Worker failed with error '{result}', please check the"
" stack trace above for the root cause"
)
responses.append(result)
return responses[0] if output_rank is not None else responses
get_response 闭包:从响应队列中读取结果。
超时处理:每次 dequeue 的剩余超时 = deadline - 当前时间。这确保总等待时间不超过 timeout。
错误处理:
TimeoutError:重新抛出更清晰的错误消息FAILURE状态:Worker执行异常,抛出RuntimeError
返回格式:
- 唯一回复模式:单个结果
- 全部回复模式:结果列表
future = FutureWrapper(
self.futures_queue,
get_response=get_response,
aggregate=aggregate,
)
return future if non_block else future.result()
创建Future并返回:
non_block=True:返回Future,调用者稍后获取结果non_block=False:直接调用future.result()阻塞等待
2.3.8 _ensure_worker_termination(第382-405行)
@staticmethod
def _ensure_worker_termination(worker_procs: list[BaseProcess]):
"""Ensure that all worker processes are terminated."""
三级终止策略:
def wait_for_termination(procs, timeout):
if not time:
# If we are in late stage shutdown, the interpreter may replace
# `time` with `None`.
return all(not proc.is_alive() for proc in procs)
start_time = time.time()
while time.time() - start_time < timeout:
if all(not proc.is_alive() for proc in procs):
return True
time.sleep(0.1)
return False
if not time 检查:Python解释器关闭时,模块级变量可能被设为 None。这是一个防御性编程技巧,确保在解释器关闭阶段也能安全处理。
active_procs = lambda: [proc for proc in worker_procs if proc.is_alive()]
logger.debug("Worker Termination: allow workers to gracefully shutdown")
if wait_for_termination(active_procs(), 4):
return
第一级:等待4秒,允许Worker优雅关闭。
logger.debug("Worker Termination: workers still running sending SIGTERM")
for p in active_procs():
p.terminate()
if not wait_for_termination(active_procs(), 4):
第二级:发送 SIGTERM,再等4秒。
logger.debug(
"Worker Termination: resorting to SIGKILL to take down workers"
)
for p in active_procs():
p.kill()
第三级:发送 SIGKILL,强制终止。这是最终手段,不留给Worker任何清理机会。
设计哲学:优雅→礼貌→强制,与Linux进程管理惯例一致。
2.3.9 shutdown(第407-434行)
def shutdown(self):
if not getattr(self, "shutting_down", False):
logger.debug("Triggering shutdown of workers")
self.shutting_down = True
幂等性:通过 shutting_down 标志确保只执行一次。getattr 而非 self.shutting_down 是因为 shutdown 可能在 _init_executor 完成前被调用(此时属性可能不存在)。
if workers := getattr(self, "workers", None):
for w in workers:
if w.death_writer is not None:
w.death_writer.close()
w.death_writer = None
self._ensure_worker_termination([w.proc for w in workers])
关闭流程:
- 关闭
death_writer→ 子进程的death_reader收到EOF → 子进程知道父进程要退出 - 等待Worker进程终止
for w in workers:
if w.worker_response_mq is not None:
w.worker_response_mq.shutdown()
w.worker_response_mq = None
关闭响应队列:释放共享内存资源。
if rpc_broadcast_mq := getattr(self, "rpc_broadcast_mq", None):
rpc_broadcast_mq.shutdown()
self.rpc_broadcast_mq = None
if response_mqs := getattr(self, "response_mqs", None):
for mq in response_mqs:
mq.shutdown()
self.response_mqs = []
关闭广播队列和远程响应队列:注意关闭顺序——先关闭Worker级别的资源,再关闭Executor级别的资源。
2.3.10 其他方法
def check_health(self) -> None:
self.collective_rpc("check_health", timeout=10)
健康检查:向所有Worker发送 check_health RPC,10秒超时。任何一个Worker无响应或异常都会传播到EngineCore。
@cached_property
def max_concurrent_batches(self) -> int:
pp_size = self.parallel_config.pipeline_parallel_size
return 2 if pp_size <= 1 and self.scheduler_config.async_scheduling else pp_size
最大并发批次数:
- PP > 1:需要PP-size个并发批次填满流水线
- PP = 1 + 异步调度:允许2个并发批次(overlap调度和执行)
- PP = 1 + 同步调度:隐式返回1(由父类默认值或调用方决定)
def _get_output_rank(self) -> int:
return (
self.world_size
- self.parallel_config.tensor_parallel_size
* self.parallel_config.prefill_context_parallel_size
)
```
**输出rank计算**:最后一个PP阶段的第一个TP Worker。
**示例**(TP=8, PP=4, PCP=1, world_size=32):
- PP rank 0: ranks 0-7
- PP rank 1: ranks 8-15
- PP rank 2: ranks 16-23
- PP rank 3: ranks 24-31
- output_rank = 32 - 8 * 1 = 24(PP rank 3的TP rank 0)
```python
@classmethod
def supports_async_scheduling(cls) -> bool:
return True
声明支持异步调度模式。
2.4 WorkerProc类(第485-940行)
WorkerProc 是在子进程中运行的Worker封装,是整个多进程执行器的"另一半"。
class WorkerProc:
"""Wrapper that runs one Worker in a separate process."""
READY_STR = "READY"
rpc_broadcast_mq: MessageQueue | None
worker_response_mq: MessageQueue | None
类属性:READY_STR 是就绪信号字符串,两个消息队列属性在运行时赋值。
2.4.1 _init_message_queues(第494-528行)
def _init_message_queues(
self, input_shm_handle: Handle, vllm_config: VllmConfig
) -> None:
消息队列初始化——单节点和多节点走不同路径。
if vllm_config.parallel_config.nnodes_within_dp == 1:
# Initialize MessageQueue for receiving SchedulerOutput
self.rpc_broadcast_mq = MessageQueue.create_from_handle(
input_shm_handle, self.worker.rank
)
单节点模式:
- 通过
Handle创建连接到Executor端广播队列的读取端 self.worker.rank指定当前Worker是第几个读取者
# Initializes a message queue for sending the model output
self.worker_response_mq = MessageQueue(1, 1)
self.peer_response_handles = []
响应队列:创建一个容量=1、读者=1的消息队列。Executor端通过READY消息中传递的Handle连接到此队列。
else:
# Initialize remote MessageQueue for receiving SchedulerOutput across nodes
self.rpc_broadcast_mq = get_inner_dp_world_group().create_mq_broadcaster(
external_writer_handle=input_shm_handle,
blocking=False,
)
多节点模式:使用分布式通信组创建跨节点广播器。external_writer_handle 来自Executor端,blocking=False 因为握手机制在 wait_until_ready 中完成。
self.worker_response_mq, self.peer_response_handles = (
get_inner_dp_world_group().create_single_reader_mq_broadcasters(
reader_rank_in_group=0
)
)
远程响应队列:创建单读者广播器,reader_rank_in_group=0 表示只有driver节点(rank 0)读取。peer_response_handles 包含所有Worker的响应队列句柄,通过READY消息传回Executor。
2.4.2 WorkerProc.init(第530-590行)
@instrument(span_name="Worker init")
def __init__(
self,
vllm_config: VllmConfig,
local_rank: int,
rank: int,
distributed_init_method: str,
input_shm_handle: Handle,
shared_worker_lock: LockType,
is_driver_worker: bool,
):
@instrument:追踪装饰器,记录Worker初始化耗时。
self.rank = rank
wrapper = WorkerWrapperBase(rpc_rank=local_rank, global_rank=rank)
Worker包装器:WorkerWrapperBase 封装了实际的模型执行逻辑。rpc_rank 和 global_rank 用于区分不同并行组的角色。
all_kwargs: list[dict] = [
{} for _ in range(vllm_config.parallel_config.world_size)
]
all_kwargs[local_rank] = {
"vllm_config": vllm_config,
"local_rank": local_rank,
"rank": rank,
"distributed_init_method": distributed_init_method,
"is_driver_worker": is_driver_worker,
"shared_worker_lock": shared_worker_lock,
}
wrapper.init_worker(all_kwargs)
初始化Worker:创建一个长度为 world_size 的参数列表,只填充当前Worker的参数。init_worker 方法会根据 rpc_rank 选取自己的参数。这种设计允许在Ray执行器中所有Worker共享同一个参数列表,而在MultiprocExecutor中只有当前Worker的参数非空。
self.worker = wrapper
self.setup_proc_title_and_log_prefix(
enable_ep=vllm_config.parallel_config.enable_expert_parallel
)
设置进程标题:如 Worker_TP0_PP0 等,便于在 ps / top 中识别。
# Load model
self.worker.init_device()
初始化设备:设置CUDA设备、初始化分布式后端(NCCL)、创建并行通信组。这是最耗时的初始化步骤之一。
self.setup_proc_title_and_log_prefix(
enable_ep=vllm_config.parallel_config.enable_expert_parallel
)
再次设置标题:init_device 后并行组已初始化,标题可以包含更详细的并行信息(DP/PP/TP/PCP/DCP/EP)。
if envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH:
self.worker.elastic_ep_execute("load_model")
else:
self.worker.load_model()
加载模型:支持弹性专家并行(Elastic EP)模式,使用 elastic_ep_execute 进行特殊加载。
scheduler_config = vllm_config.scheduler_config
self.use_async_scheduling = scheduler_config.async_scheduling
if self.use_async_scheduling:
self.async_output_queue: queue.Queue = queue.Queue()
self.async_output_copy_thread = Thread(
target=self.async_output_busy_loop,
daemon=True,
name="WorkerAsyncOutputCopy",
)
self.async_output_copy_thread.start()
异步调度模式:
- 启动一个后台线程
async_output_busy_loop - Worker主循环的输出先放入
async_output_queue - 后台线程从队列取出输出,通过
worker_response_mq发送 - 这样Worker主循环可以立即处理下一个请求,无需等待输出传输
current_platform.update_block_size_for_backend(vllm_config)
更新块大小:根据注意力后端更新KV缓存的块大小配置。
self._init_message_queues(input_shm_handle, vllm_config)
enable_envs_cache()
最后初始化消息队列:必须在 init_device 之后,因为多节点场景需要分布式组已初始化。enable_envs_cache 冻结环境变量,之后不再从 os.environ 读取。
2.4.3 make_worker_process——创建子进程(第592-642行)
@staticmethod
def make_worker_process(
vllm_config: VllmConfig,
local_rank: int,
rank: int,
distributed_init_method: str,
input_shm_handle,
shared_worker_lock: LockType,
is_driver_worker: bool,
inherited_fds: list[int] | None = None,
) -> UnreadyWorkerProcHandle:
工厂方法:在父进程中调用,创建Worker子进程并返回未就绪句柄。
context = get_mp_context()
ready_reader, ready_writer = context.Pipe(duplex=False)
death_reader, death_writer = context.Pipe(duplex=False)
创建两对管道:
ready_pipe:子进程 → 父进程,报告初始化完成death_pipe:父进程 → 子进程,检测父进程退出
duplex=False:单向管道,更高效且语义更清晰。
if inherited_fds is not None:
inherited_fds = inherited_fds.copy()
inherited_fds.extend((ready_reader.fileno(), death_writer.fileno()))
Fork模式FD追踪:记录本Worker创建的管道中"属于父进程端"的fd(ready_reader 和 death_writer)。后续Worker需要关闭这些fd。
为什么不追踪 ready_writer 和 death_reader?这两个在子进程中使用,子进程应该保留它们。
process_kwargs = {
"vllm_config": vllm_config,
"local_rank": local_rank,
"rank": rank,
"distributed_init_method": distributed_init_method,
"input_shm_handle": input_shm_handle,
"ready_pipe": ready_writer,
"death_pipe": death_reader,
"shared_worker_lock": shared_worker_lock,
"is_driver_worker": is_driver_worker,
"inherited_fds": inherited_fds if inherited_fds is not None else [],
}
子进程参数:注意传递的是管道的"子进程端"。
proc = context.Process(
target=WorkerProc.worker_main,
kwargs=process_kwargs,
name=f"VllmWorker-{rank}",
daemon=True,
)
创建进程对象:
target=WorkerProc.worker_main:子进程入口函数daemon=True:守护进程,主进程退出时自动终止name=f"VllmWorker-{rank}":便于调试
with numa_utils.configure_subprocess(
vllm_config, local_rank, process_kind="worker"
):
proc.start()
NUMA绑定:如果配置了NUMA亲和性,在启动子进程时设置CPU绑核。
ready_writer.close()
death_reader.close()
关闭子进程端的管道:父进程不需要写入ready_pipe或读取death_pipe,关闭以避免fd泄漏。
return UnreadyWorkerProcHandle(proc, rank, ready_reader, death_writer)
返回未就绪句柄,包含父进程需要的信息。
2.4.4 wait_for_ready——等待Worker就绪(第661-699行)
@staticmethod
def wait_for_ready(
unready_proc_handles: list[UnreadyWorkerProcHandle],
) -> list[WorkerProcHandle]:
等待所有Worker完成初始化,将 UnreadyWorkerProcHandle 转换为 WorkerProcHandle。
e = Exception(
"WorkerProc initialization failed due to an exception in a "
"background process. See stack trace for the root cause."
)
预构造异常对象——如果Worker初始化失败,抛出此异常。
pipes = {handle.ready_pipe: handle for handle in unready_proc_handles}
ready_proc_handles: list[WorkerProcHandle | None] = [None] * len(
unready_proc_handles
)
管道→句柄映射:通过 multiprocessing.connection.wait() 等待多个管道。
while pipes:
ready = multiprocessing.connection.wait(pipes.keys())
高效等待:wait() 类似 select(),返回可读的管道列表。不会CPU空转。
for pipe in ready:
assert isinstance(pipe, Connection)
try:
unready_proc_handle = pipes.pop(pipe)
response: dict[str, Any] = pipe.recv()
if response["status"] != "READY":
raise e
处理就绪信号:
- 从映射中移除已就绪的管道(
pop确保不会重复处理) - 读取Worker发送的字典
- 验证状态为
READY
idx = unready_proc_handle.rank % len(ready_proc_handles)
ready_proc_handles[idx] = WorkerProc.wait_for_response_handle_ready(
response, unready_proc_handle
)
索引计算:rank % len(ready_proc_handles) 将全局rank映射到本地索引。单节点下 rank == idx;多节点下需要取模。
except EOFError:
e.__suppress_context__ = True
raise e from None
EOFError处理:Worker进程在发送READY前崩溃,管道关闭产生EOF。from None 和 __suppress_context__ 清理异常链,避免暴露内部细节。
finally:
pipe.close()
关闭管道:就绪信号读取完毕,不再需要。
return cast(list[WorkerProcHandle], ready_proc_handles)
类型断言:所有 None 都应被替换为实际的 WorkerProcHandle,否则上面的检查会抛出异常。
2.4.5 wait_for_response_handle_ready(第644-659行)
@staticmethod
def wait_for_response_handle_ready(
handles: dict[str, Any], proc_handle: UnreadyWorkerProcHandle
) -> WorkerProcHandle:
response_handle = handles["handle"]
worker_response_mq: MessageQueue | None = None
if len(response_handle.local_reader_ranks) > 0:
worker_response_mq = MessageQueue.create_from_handle(response_handle, 0)
创建本地响应队列连接:如果有本地读取者,创建消息队列连接。reader_rank=0 因为Executor是唯一的读取者。
peer_response_handles = handles["peer_response_handles"]
peer_worker_response_mqs = [
MessageQueue.create_from_handle(handle, -1)
if handle.remote_subscribe_addr is not None
else None
for handle in peer_response_handles
]
创建远程响应队列连接:遍历所有远程Worker的句柄,创建连接。reader_rank=-1 表示远程订阅者(不直接读取,通过分布式通信中继)。
2.4.6 WorkerProc.shutdown(第701-710行)
def shutdown(self):
if self.rpc_broadcast_mq is not None:
self.rpc_broadcast_mq.shutdown()
if self.worker_response_mq is not None:
self.worker_response_mq.shutdown()
self.worker.shutdown()
self.rpc_broadcast_mq = None
self.worker_response_mq = None
destroy_model_parallel()
destroy_distributed_environment()
Worker关闭流程:
- 关闭消息队列
- 关闭Worker(释放GPU内存等)
- 销毁模型并行状态
- 销毁分布式环境
2.4.7 monitor_death_pipe——父进程存活监控(第712-738行)
def monitor_death_pipe(self, death_pipe, shutdown_requested: threading.Event):
if death_pipe is None:
return
可选监控:如果没有death_pipe(某些特殊配置),跳过。
def death_pipe_monitor(queues_to_shutdown: list[MessageQueue]):
try:
death_pipe.recv() # 阻塞等待,父进程写入或关闭
except EOFError:
logger.info_once("Parent process exited, terminating worker queues")
shutdown_requested.set()
for mq in queues_to_shutdown:
if mq is not None:
mq.shutdown()
except Exception as e:
logger.warning("Death monitoring error: %s", e)
EOF检测:death_pipe.recv() 正常返回意味着父进程写入了数据(不应该发生)。EOFError 表示管道关闭——父进程退出了。
为什么要关闭消息队列:如果Worker的busy_loop正在等待 rpc_broadcast_mq.dequeue(),需要通过shutdown唤醒它。
Thread(
target=death_pipe_monitor,
args=([self.rpc_broadcast_mq, self.worker_response_mq],),
daemon=True,
name="DeathPipeMonitor",
).start()
启动监控线程:直接传递队列引用而非self,避免GC问题。
2.4.8 worker_main——子进程入口(第740-816行)
这是Worker子进程的入口函数,是理解整个多进程架构的关键。
@staticmethod
def worker_main(*args, **kwargs):
"""Worker initialization and execution loops.
This runs a background process"""
注意:这是一个静态方法,在子进程中执行。WorkerProc 对象在 worker_main 内部创建。
信号处理
shutdown_requested = threading.Event()
def signal_handler(signum, frame):
nonlocal shutdown_requested
if not shutdown_requested.is_set():
shutdown_requested.set()
logger.debug(
"WorkerProc handling signal %d, raising SystemExit", signum
)
raise SystemExit()
优雅信号处理:
shutdown_requested事件确保信号只处理一次raise SystemExit()而非直接sys.exit()——SystemExit是异常,可以被except捕获做清理
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
注册SIGTERM和SIGINT处理器。
参数提取
worker = None
ready_writer = kwargs.pop("ready_pipe")
death_pipe = kwargs.pop("death_pipe", None)
从kwargs中提取管道:这些是 make_worker_process 传入的参数,提取后不再传递给 WorkerProc.__init__。
for fd in kwargs.pop("inherited_fds", []):
try:
os.close(fd)
except Exception as e:
logger.warning("Error closing inherited connection: %s: %s", type(e), e)
关闭继承的fd:Fork模式下,子进程继承了父进程中其他Worker的管道fd。关闭它们确保管道的正确生命周期。
初始化Worker
try:
rank = kwargs.get("rank", 0)
maybe_init_worker_tracer(
instrumenting_module_name="vllm.worker",
process_kind="worker",
process_name=f"Worker_{rank}",
)
追踪初始化:为Worker进程设置分布式追踪。
worker = WorkerProc(*args, **kwargs)
创建WorkerProc实例:在子进程中执行完整的初始化流程(设备初始化、模型加载、消息队列建立)。
assert worker.worker_response_mq is not None
if kwargs["vllm_config"].parallel_config.numa_bind:
numa_utils.log_current_affinity_state(f"Worker_{worker.rank}")
验证和日志:确保响应队列已创建,可选地记录NUMA亲和性状态。
启动死亡监控
worker.monitor_death_pipe(death_pipe, shutdown_requested)
发送就绪信号
ready_writer.send(
{
"status": WorkerProc.READY_STR,
"handle": worker.worker_response_mq.export_handle(),
"peer_response_handles": worker.peer_response_handles,
}
)
READY消息内容:
status:“READY”handle:响应消息队列的序列化句柄peer_response_handles:远程Worker的响应队列句柄列表
这是父子进程握手的关键步骤:父进程通过这些句柄建立到Worker响应队列的连接。
等待消息队列就绪
if worker.rpc_broadcast_mq is not None:
worker.rpc_broadcast_mq.wait_until_ready()
worker.worker_response_mq.wait_until_ready()
ready_writer.close()
ready_writer = None
握手同步:确保所有消息队列的读取者和写入者都已注册。完成后关闭ready_pipe(不再需要)。
进入主循环
worker.worker_busy_loop()
异常处理
except Exception:
if ready_writer is not None:
logger.exception("WorkerProc failed to start.")
elif shutdown_requested.is_set():
logger.info("WorkerProc shutting down.")
else:
logger.exception("WorkerProc failed.")
shutdown_requested.set()
异常分类:
ready_writer is not None:初始化阶段失败(还没发送READY)shutdown_requested.is_set():信号触发的关闭- 其他:运行时异常
shutdown_requested.set():防止后续信号处理再次抛出 SystemExit。
except SystemExit as e:
logger.warning("WorkerProc was terminated")
raise e
SystemExit不静默:必须重新抛出,否则子进程不会退出。
finally:
if ready_writer is not None:
ready_writer.close()
if death_pipe is not None:
death_pipe.close()
if worker is not None:
worker.shutdown()
最终清理:关闭管道、释放资源。
2.4.9 ResponseStatus枚举(第818-820行)
class ResponseStatus(Enum):
SUCCESS = auto()
FAILURE = auto()
响应状态:Worker的RPC结果通过 (status, result) 元组返回。SUCCESS 表示正常结果,FAILURE 表示异常。
2.4.10 enqueue_output——输出入队(第822-837行)
def enqueue_output(self, output: Any):
if isinstance(output, AsyncModelRunnerOutput):
output = output.get_output()
处理异步输出:AsyncModelRunnerOutput 是延迟计算的结果,调用 get_output() 获取实际值。
if isinstance(output, Exception):
result = (WorkerProc.ResponseStatus.FAILURE, str(output))
else:
result = (WorkerProc.ResponseStatus.SUCCESS, output)
异常序列化:异常对象可能不可pickle,转换为字符串以确保可靠传输。
if (response_mq := self.worker_response_mq) is not None:
response_mq.enqueue(result)
入队:注意使用walrus运算符+None检查,因为消息队列可能在shutdown后变为None。
2.4.11 handle_output——输出处理分发(第839-847行)
def handle_output(self, output: Any):
if self.use_async_scheduling:
self.async_output_queue.put(output)
else:
self.enqueue_output(output)
调度模式分发:
- 异步调度:输出先进入线程间队列,由后台线程负责入队到MessageQueue
- 同步调度:直接入队
异步模式的好处:Worker的 worker_busy_loop 不需要等待输出传输完成,可以立即开始处理下一个输入,实现计算与通信的overlap。
2.4.12 async_output_busy_loop——异步输出线程(第849-867行)
def async_output_busy_loop(self):
from vllm.platforms import current_platform
if hasattr(self.worker, "device"):
current_platform.set_device(self.worker.device)
设置CUDA设备:关键步骤!新线程不会继承主线程的CUDA上下文。如果不设置,任何CUDA操作都会在设备0上创建新上下文,浪费GPU内存。
while True:
output = self.async_output_queue.get()
self.enqueue_output(output)
无限循环:从线程间队列取出输出,入队到消息队列。queue.get() 在队列为空时阻塞,不会浪费CPU。
没有退出机制:线程是daemon的,随主进程退出而终止。这在正常关闭时没问题,但如果需要优雅退出,可能需要添加哨兵值。
2.4.13 worker_busy_loop——Worker主循环(第869-895行)
这是Worker的核心执行循环。
def worker_busy_loop(self):
"""Main busy loop for Multiprocessing Workers"""
assert self.rpc_broadcast_mq is not None
前置断言:确保广播队列已初始化。
while True:
method, args, kwargs, output_rank = self.rpc_broadcast_mq.dequeue(
indefinite=True
)
等待RPC调用:dequeue(indefinite=True) 无限等待新消息。返回四元组 (method, args, kwargs, output_rank)。
try:
if isinstance(method, str):
func = getattr(self.worker, method)
elif isinstance(method, bytes):
func = partial(cloudpickle.loads(method), self.worker)
方法解析:
- 字符串:通过
getattr获取Worker的属性方法(如"execute_model") - 字节串:用
cloudpickle反序列化,返回一个接受Worker作为第一个参数的函数,用partial绑定
output = func(*args, **kwargs)
执行方法:调用Worker的方法并获取输出。
except Exception as e:
if hasattr(e, "add_note"):
e.add_note(traceback.format_exc())
logger.exception("WorkerProc hit an exception.")
if output_rank is None or self.rank == output_rank:
self.handle_output(e)
continue
异常处理:
- 附带堆栈信息(Python 3.11+的
add_note) - 记录日志
- 只向需要回复的rank发送异常
output_rank is None:表示所有Worker都需要回复(聚合模式),当前Worker是其中之一。
if output_rank is None or self.rank == output_rank:
self.handle_output(output)
正常输出处理:同样只向需要的rank发送输出。这是性能优化——不需要的Worker无需序列化和传输结果。
2.4.14 setup_proc_title_and_log_prefix(第897-940行)
@staticmethod
def setup_proc_title_and_log_prefix(enable_ep: bool) -> None:
if not model_parallel_is_initialized():
set_process_title(name="Worker")
decorate_logs("Worker")
return
初始化前:并行组未初始化时使用默认标题。
dp_size = get_dp_group().world_size
dp_rank = get_dp_group().rank_in_group
pp_size = get_pp_group().world_size
pp_rank = get_pp_group().rank_in_group
pcp_size = get_pcp_group().world_size
pcp_rank = get_pcp_group().rank_in_group
tp_size = get_tp_group().world_size
tp_rank = get_tp_group().rank_in_group
dcp_size = get_dcp_group().world_size
dcp_rank = get_dcp_group().rank_in_group
查询各并行维度:获取每个维度的世界大小和当前rank。
process_name = "Worker"
if dp_size > 1:
process_name += f"_DP{dp_rank}"
if pp_size > 1:
process_name += f"_PP{pp_rank}"
if pcp_size > 1:
process_name += f"_PCP{pcp_rank}"
if tp_size > 1:
process_name += f"_TP{tp_rank}"
if dcp_size > 1:
process_name += f"_DCP{dcp_rank}"
if enable_ep:
ep_rank = get_ep_group().rank_in_group
process_name += f"_EP{ep_rank}"
set_process_title(name=process_name)
decorate_logs(process_name)
构建进程名称:按需追加各并行维度信息。例如 Worker_DP0_PP2_TP3。
只显示大小>1的维度:避免 Worker_DP0_PP0_TP0_PCP0_DCP0_EP0 这种冗余名称。
2.5 通信机制详解
2.5.1 整体通信架构
Executor (父进程) Workers (子进程)
┌─────────────────┐ ┌─────────────────────┐
│ │ rpc_broadcast_mq │ │
│ collective_rpc │ ────enqueue─────→ │ worker_busy_loop │
│ │ (SchedulerOutput)│ │dequeue │
│ │ │ ▼ │
│ │ │ func(*args,**kwargs)│
│ │ │ │ │
│ │ response_mqs[i] │ ▼ │
│ get_response │ ←──dequeue────── │ handle_output │
│ │ (ModelRunnerOutput)│ │enqueue │
└─────────────────┘ └─────────────────────┘
2.5.2 消息队列(MessageQueue)详解
MessageQueue 是 vLLM 自研的共享内存广播机制(定义在 shm_broadcast.py 中),具有以下特性:
- 零拷贝广播:写入一次,多个读取者通过映射同一块共享内存读取
- 分块传输:大消息自动分块,每块大小不超过
max_chunk_bytes - 读/写同步:通过原子变量和fence确保读写顺序
- 跨进程安全:使用POSIX共享内存,可被不同进程映射
Handle机制:
# 创建端
mq = MessageQueue(num_writers, num_readers, ...)
handle = mq.export_handle() # 序列化共享内存元信息
# 连接端
mq = MessageQueue.create_from_handle(handle, reader_rank)
Handle包含:
- 共享内存文件路径
- 数据区偏移量
- 元数据区偏移量
- 连接IP和端口(用于跨节点场景)
2.5.3 单节点通信流程
步骤1: Executor创建rpc_broadcast_mq并export_handle
步骤2: handle传递给Worker子进程
步骤3: Worker通过create_from_handle连接到同一共享内存
步骤4: Worker创建worker_response_mq并export_handle
步骤5: handle通过ready_pipe传回Executor
步骤6: Executor通过create_from_handle连接到Worker的响应队列
步骤7: 双方wait_until_ready完成握手
步骤8: 通信就绪
2.5.4 多节点通信流程
步骤1: Leader节点Executor创建rpc_broadcast_mq
步骤2: handle通过分布式通信组广播给所有节点
步骤3: Worker通过get_inner_dp_world_group().create_mq_broadcaster连接
步骤4: Worker创建响应队列并通过create_single_reader_mq_broadcasters暴露句柄
步骤5: 句柄通过ready_pipe传回Executor
步骤6: Executor通过peer_worker_response_mqs访问远程Worker的响应
步骤7: 握手完成
2.5.5 广播-响应模式的效率分析
优点:
- SchedulerOutput对所有Worker相同,一次写入多次读取
- 共享内存避免序列化/反序列化开销
- 只有output_rank的Worker需要回复,减少通信量
缺点:
- Worker必须同步消费广播消息(最慢的Worker决定整体吞吐)
- 共享内存有固定容量,极端情况下可能溢出
2.6 生命周期管理
2.6.1 启动流程
1. MultiprocExecutor._init_executor()
├── 设置环境变量
├── 创建distributed_init_method
├── 创建rpc_broadcast_mq (leader节点)
├── 为每个local_rank创建Worker子进程
│ └── WorkerProc.make_worker_process()
│ ├── 创建ready_pipe和death_pipe
│ ├── 启动子进程 (WorkerProc.worker_main)
│ └── 返回UnreadyWorkerProcHandle
├── wait_for_ready()
│ ├── 等待所有Worker发送READY
│ ├── 从READY消息中提取响应队列句柄
│ └── 返回WorkerProcHandle列表
├── 构建response_mqs列表
├── wait_until_ready() 所有消息队列
└── _post_init_executor()
2. Worker子进程 (worker_main)
├── 设置信号处理器
├── 关闭继承的fd
├── 创建WorkerProc实例
│ ├── WorkerWrapperBase初始化
│ ├── init_device() (CUDA, NCCL, 并行组)
│ ├── load_model()
│ ├── 初始化消息队列
│ └── 启动async_output线程(如果启用)
├── monitor_death_pipe()
├── 发送READY信号
├── wait_until_ready() 消息队列
└── worker_busy_loop() 进入主循环
2.6.2 运行时流程
execute_model(scheduler_output) 调用
│
▼
collective_rpc("execute_model", args=(scheduler_output,), ...)
│
├── rpc_broadcast_mq.enqueue((method, args, kwargs, output_rank))
│ └── 所有Worker从共享内存读取相同数据
│
├── 创建FutureWrapper(futures_queue, get_response, aggregate)
│
├── Worker端:
│ ├── worker_busy_loop: dequeue → func(*args, **kwargs) → output
│ ├── handle_output(output)
│ │ ├── 同步模式: 直接enqueue_output
│ │ └── 异步模式: put to async_output_queue
│ │ └── async_output_busy_loop: get → enqueue_output
│ └── worker_response_mq.enqueue((SUCCESS, output))
│
└── Executor端:
├── get_response(): response_mqs[output_rank].dequeue()
├── aggregate(responses) 如果有kv_output_aggregator
└── future.set_result(response) → 返回给调用者
2.6.3 关闭流程
shutdown() 调用
│
├── shutting_down = True (幂等保护)
│
├── 关闭所有death_writer
│ └── Worker子进程检测到EOF
│ ├── death_pipe_monitor: shutdown_requested.set()
│ ├── shutdown消息队列
│ └── worker_busy_loop退出 (mq.dequeue返回错误/空)
│
├── _ensure_worker_termination()
│ ├── 等待4秒 (优雅退出)
│ ├── SIGTERM + 等待4秒 (礼貌退出)
│ └── SIGKILL (强制退出)
│
├── 关闭worker_response_mq
├── 关闭rpc_broadcast_mq
└── 关闭response_mqs
2.6.4 故障处理流程
Worker异常 → worker_busy_loop捕获
│
├── logger.exception() 记录
├── handle_output(exception) → enqueue_output → (FAILURE, str(e))
└── Executor端get_response() 检测到FAILURE → raise RuntimeError
Worker进程崩溃 → WorkerMonitor检测
│
├── is_failed = True
├── logger.error()
├── shutdown()
└── failure_callback() → 通知EngineCore
父进程退出 → Worker检测
│
├── death_pipe EOFError
├── shutdown_requested.set()
├── shutdown消息队列 (唤醒busy_loop)
└── Worker自行退出
2.7 set_multiprocessing_worker_envs(第942-972行)
def set_multiprocessing_worker_envs():
_maybe_force_spawn()
强制spawn模式:GPU相关代码通常不支持fork(CUDA上下文不能在fork后重用),此函数检查并可能强制使用spawn。
if not current_platform.is_cpu():
default_omp_num_threads = 1
if (
"OMP_NUM_THREADS" not in os.environ
and (current_parallelism := torch.get_num_threads())
> default_omp_num_threads
):
logger.warning_once(
"Reducing Torch parallelism from %d threads to %d to avoid "
"unnecessary CPU contention. Set OMP_NUM_THREADS in the "
"external environment to tune this value as needed.",
current_parallelism,
default_omp_num_threads,
)
os.environ["OMP_NUM_THREADS"] = str(default_omp_num_threads)
torch.set_num_threads(default_omp_num_threads)
OMP线程数控制:
- GPU模式下,每个Worker进程默认只使用1个OpenMP线程
- 原因:多GPU × 多OMP线程 = CPU争用严重,导致性能下降
- 容器环境下CPU限制更严格,争用问题更突出
- 用户可通过
OMP_NUM_THREADS环境变量覆盖
warning_once:避免每个Worker都打印相同警告。
三、Mermaid图表汇总
3.1 MultiprocExecutor 类图
3.2 初始化时序图
3.3 collective_rpc 通信时序图
3.4 WorkerBusyLoop 状态机
3.5 关闭流程图
3.6 故障检测与传播图
3.7 管道与消息队列拓扑图
读取端] -----------------------^ Expecting 'SQE', 'DOUBLECIRCLEEND', 'PE', '-)', 'STADIUMEND', 'SUBROUTINEEND', 'PIPE', 'CYLINDEREND', 'DIAMOND_STOP', 'TAGEND', 'TRAPEND', 'INVTRAPEND', 'UNICODE_TEXT', 'TEXT', 'TAGSTART', got 'SQS'
3.8 FutureWrapper 队列处理时序图
四、关键设计模式与权衡
4.1 共享内存广播 vs Socket/RPC
| 方面 | 共享内存广播 (vLLM) | Socket/RPC (传统) |
|---|---|---|
| 延迟 | 微秒级(内存映射) | 毫秒级(网络栈) |
| 拷贝 | 零拷贝 | 至少2次拷贝 |
| 序列化 | 仅metadata | 整个SchedulerOutput |
| 扩展性 | 单节点最优 | 天然跨节点 |
| 复杂度 | 共享内存管理复杂 | 成熟简单 |
vLLM的选择:单节点用共享内存,多节点用分布式通信组——兼顾了性能和扩展性。
4.2 有序响应 vs 无序响应
FutureWrapper 的有序设计是必要的:
- 调度器发送的请求有严格顺序
- 响应乱序会导致错误的请求-响应匹配
- 例如:request1要采样A,request2要采样B,如果响应乱序可能把B的结果当A的
4.3 Daemon进程 + Death Pipe
Worker设为daemon进程但还使用death pipe的原因:
- Daemon确保主进程退出时Worker也被终止(兜底)
- Death pipe提供更早的退出检测——主进程还在但Executor已关闭时,Worker可以优雅退出
- 不依赖daemon的粗暴终止,给Worker清理资源的机会
4.4 唯一回复优化
unique_reply_rank 的设计避免了不必要的通信:
- TP组中只有rank=0的Worker需要返回采样结果
- 其他Worker的输出只在TP组内部使用(通过NCCL all-reduce)
- 节省了N-1个Worker的响应序列化和传输开销
4.5 弱引用的使用
多处使用 weakref:
_finalizer = weakref.finalize(self, self.shutdown):GC时自动清理self_ref = weakref.ref(self):monitor线程不阻止Executor被GCdeath_pipe_monitor直接传递队列引用:避免通过self间接持有Executor引用
这些设计避免了循环引用和内存泄漏,特别是在长期运行的服务中。
4.6 Fork模式的FD管理
fork模式下文件描述符的继承是一个经典陷阱:
- 子进程继承父进程所有打开的fd
- 如果不关闭其他Worker的管道fd,管道的写端永远不会关闭
- 读端无法收到EOF,导致阻塞等待
vLLM的解决方案:显式追踪所有已创建的fd,在新Worker中关闭不属于自己的fd。
五、总结
MultiprocExecutor 是 vLLM v1 多GPU推理的核心执行器,其设计体现了以下工程智慧:
- 性能优先的通信架构:共享内存广播实现零拷贝、低延迟的SchedulerOutput分发
- 分层抽象的状态管理:
UnreadyWorkerProcHandle→WorkerProcHandle的状态转换清晰表达了Worker生命周期 - 健壮的故障处理:三级进程终止策略、death pipe双向监控、failure callback传播链
- 灵活的并行支持:TP/PP/DP/PCP/EP/DCP多维度并行,通过rank计算和并行组查询统一管理
- 优雅的资源管理:weakref终结器、幂等shutdown、daemon+pipe双重保障
- 可扩展的RPC框架:
collective_rpc支持字符串方法名和cloudpickle序列化的可调用对象,支持聚合和唯一回复模式
这个1037行的文件浓缩了多进程GPU推理的所有关键机制,是理解vLLM v1执行引擎的必经之路。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)