vLLM V1 Executor 深度分析 — Part2: 多进程执行器

一、模块定位

1.1 文件概览

multiproc_executor.py 是 vLLM v1 执行器体系中最核心、最复杂的实现文件(1037行),定义了多进程执行器 MultiprocExecutor。它是 Executor 抽象基类的具体实现,负责:

  • 启动和管理多个 Worker 子进程:每个 GPU 对应一个独立进程
  • 进程间通信(IPC):通过共享内存消息队列(MessageQueue)实现零拷贝广播
  • 分布式协调:管理 TP/PP/DP/PCP/EP/DCP 等多种并行维度的初始化
  • 模型执行调度:将 SchedulerOutput 广播给所有 Worker,收集 ModelRunnerOutput
  • 生命周期管理:优雅关闭、健康监控、故障恢复

1.2 核心架构思想

┌─────────────────────────────────────────────────────┐
│                  EngineCore (主进程)                   │
│                                                       │
│  ┌──────────────────────────────────────────────┐    │
│  │           MultiprocExecutor                    │    │
│  │                                                │    │
│  │  rpc_broadcast_mq ──────────┐                 │    │
│  │  (SchedulerOutput广播)       │                 │    │
│  │                              ▼                 │    │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐        │    │
│  │  │Worker 0 │ │Worker 1 │ │Worker N │        │    │
│  │  │ (GPU0)  │ │ (GPU1)  │ │ (GPUN)  │        │    │
│  │  └────┬────┘ └────┬────┘ └────┬────┘        │    │
│  │       │            │            │              │    │
│  │  response_mq[0] response_mq[1] response_mq[N]│    │
│  │       └────────────┴────────────┘             │    │
│  │                    │                           │    │
│  │            ModelRunnerOutput                   │    │
│  └──────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────┘

关键设计决策

  1. 每个Worker运行在独立进程中,避免GIL争用
  2. 使用共享内存消息队列(而非pickle/socket),实现高效数据传输
  3. 广播-响应模式:一个写入端(Executor),多个读取端(Workers)
  4. 父子进程存活检测:通过"death pipe"实现父子进程双向监控

1.3 与其他执行器的关系

执行器类型 适用场景 进程模型
UniProcExecutor 单GPU 单进程
MultiprocExecutor 多GPU单节点 / 多节点 多进程
RayExecutor Ray集群 Ray Actor

MultiprocExecutor 是 vLLM v1 的默认多GPU执行器,直接使用 Python multiprocessing 而非 Ray,减少了外部依赖和序列化开销。


二、multiproc_executor.py 逐行分析

2.0 模块导入(第1-76行)

# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

标准 Apache 2.0 许可证头。

标准库导入
import multiprocessing          # 进程创建与管理
import os                       # 环境变量、文件描述符操作
import pickle                   # 序列化协议常量
import queue                    # 线程间队列(用于async调度模式)
import signal                   # 信号处理(SIGTERM/SIGINT)
import threading                # 线程创建
import time                     # 超时与计时
import traceback                # 异常堆栈格式化
import weakref                  # 弱引用(finalizer、monitor回调)

设计意图:大量使用标准库,减少外部依赖。weakref 的使用尤为关键——它允许对象被垃圾回收时自动触发清理,而不阻止回收本身。

集合与类型导入
from collections import deque                           # 有序双端队列
from collections.abc import Callable, Sequence          # 类型协议
from concurrent.futures import Future, InvalidStateError # 异步结果容器
from contextlib import suppress                          # 上下文管理器:静默异常
from dataclasses import dataclass                        # 数据类装饰器
from enum import Enum, auto                              # 枚举类型
from functools import cached_property, partial           # 缓存属性、偏函数
from multiprocessing.connection import Connection        # 管道连接类型
from multiprocessing.process import BaseProcess          # 进程基类
from multiprocessing.synchronize import Lock as LockType # 跨进程锁类型
from threading import Thread                             # 线程类
from typing import Any, cast                             # 类型工具

要点

  • deque:用于 futures_queue,保证FIFO顺序的响应处理
  • InvalidStateErrorFuture 设置结果/异常时可能抛出,用 suppress 静默
  • Connection:管道通信的类型注解
  • LockType:跨进程锁的类型别名,避免与 threading.Lock 混淆
第三方导入
import cloudpickle   # 增强序列化:支持lambda、局部函数
import torch         # PyTorch:线程数控制

cloudpickle 是关键——当 method 不是字符串而是可调用对象时,需要用 cloudpickle 序列化后在子进程中反序列化执行。

vLLM内部导入
import vllm.envs as envs                    # 环境变量访问
from vllm.config import VllmConfig          # 全局配置对象
from vllm.distributed import (              # 分布式环境管理
    destroy_distributed_environment,
    destroy_model_parallel,
)
from vllm.distributed.device_communicators.shm_broadcast import (
    Handle, MessageQueue,                    # 共享内存广播核心组件
)
from vllm.distributed.kv_transfer.kv_connector.utils import (
    KVOutputAggregator,                      # KV传输输出聚合器
)
from vllm.distributed.parallel_state import ( # 并行状态查询
    get_dcp_group, get_dp_group, get_ep_group,
    get_inner_dp_world_group, get_pcp_group,
    get_pp_group, get_tp_group,
    model_parallel_is_initialized,
)
from vllm.envs import enable_envs_cache      # 环境变量缓存开关
from vllm.logger import init_logger          # 日志初始化
from vllm.platforms import current_platform  # 平台抽象层
from vllm.tracing import instrument, maybe_init_worker_tracer  # 追踪
from vllm.utils import numa_utils            # NUMA绑定工具
from vllm.utils.network_utils import (       # 网络工具
    get_distributed_init_method, get_ip, get_loopback_ip, get_open_port,
)
from vllm.utils.ompmultiprocessing import OMPProcessManager  # OpenMP线程管理
from vllm.utils.system_utils import (        # 系统工具
    _maybe_force_spawn, decorate_logs, get_mp_context,
    set_process_title,
)
from vllm.v1.core.sched.output import GrammarOutput, SchedulerOutput  # 调度输出
from vllm.v1.executor.abstract import Executor, FailureCallback  # 抽象基类
from vllm.v1.outputs import (                # 模型输出类型
    AsyncModelRunnerOutput, DraftTokenIds, ModelRunnerOutput,
)
from vllm.v1.worker.worker_base import WorkerWrapperBase  # Worker包装器

核心依赖

  • MessageQueue / Handle:共享内存广播机制,是整个通信基础设施
  • parallel_state 中的各种 get_*_group():查询各并行维度的状态
  • get_inner_dp_world_group():多节点场景下的DP内通信组
  • KVOutputAggregator:聚合跨Worker的KV传输输出
logger = init_logger(__name__)

初始化模块级日志器。


2.1 FutureWrapper类(第79-109行)

class FutureWrapper(Future):

设计目的:包装 concurrent.futures.Future,实现有序响应收集

问题背景:在多Worker场景中,多个RPC调用可能并发执行,但响应必须按发起顺序处理。直接使用 Future 无法保证这个顺序——第2个RPC的响应可能先于第1个到达。

解决方案FutureWrapper 维护一个 deque,所有 FutureWrapper 实例共享同一个队列。当调用 result() 时,会从队尾开始逐个等待响应,确保先进先出。

    def __init__(
        self,
        futures_queue: deque["FutureWrapper"],   # 共享的有序队列
        get_response: Callable[[], Any],          # 从MessageQueue获取响应的闭包
        aggregate: Callable = lambda x: x,        # 可选的聚合函数
    ):
        self.futures_queue = futures_queue
        self.get_response = get_response
        self.aggregate = aggregate
        super().__init__()
        self.futures_queue.appendleft(self)      # 新Future入队首(左侧)

appendleft 的意义:deque的左侧是"最新"端,右侧是"最老"端。调用 result() 时从右侧pop,即先进先出。appendleft 确保新创建的Future排在队首,而等待处理时从队尾取出最老的。

    def result(self, timeout=None):
        if timeout is not None:
            raise RuntimeError("timeout not implemented")

不支持超时参数——因为 FutureWrapper 的等待机制不是标准 Future 的条件变量等待,而是通过队列逐个处理响应。

        # Drain any futures ahead of us in the queue.
        while not self.done():
            future = self.futures_queue.pop()   # 从队尾取最老的Future
            future._wait_for_response()          # 等待并设置其结果
        return super().result()                  # 调用父类获取已设置的结果

核心逻辑:调用者(最新Future)会先处理所有排在前面的Future。这是一个协作式设计——每个调用者帮助处理前序请求,类似于"帮同伴推进"的工作窃取思想。

示例

创建顺序: F1, F2, F3
deque状态: [F3, F2, F1]  (F1在右侧/队尾)

F3.result() 调用:
  1. pop() → F1, _wait_for_response() → F1完成
  2. pop() → F2, _wait_for_response() → F2完成
  3. F3.done() == True → 返回F3结果
    def _wait_for_response(self):
        try:
            response = self.aggregate(self.get_response())
            with suppress(InvalidStateError):
                self.set_result(response)
        except Exception as e:
            with suppress(InvalidStateError):
                self.set_exception(e)

_wait_for_response

  1. 调用 get_response() 闭包——从MessageQueue中读取响应
  2. 可选地用 aggregate 聚合(用于KV输出聚合)
  3. 设置Future结果或异常
  4. suppress(InvalidStateError):如果Future已被取消或已设置,静默忽略

2.2 UnreadyWorkerProcHandle 与 WorkerProcHandle(第439-482行)

在分析 MultiprocExecutor 之前,先理解这两个数据类,它们是Worker句柄的两种状态。

UnreadyWorkerProcHandle(第439-444行)
@dataclass
class UnreadyWorkerProcHandle:
    """WorkerProcess handle before READY."""
    proc: BaseProcess           # 子进程对象
    rank: int                   # 全局rank
    ready_pipe: Connection      # 子进程→父进程的就绪通知管道
    death_writer: Connection | None = None  # 父进程→子进程的死亡通知管道

设计意图:Worker子进程启动后需要经过初始化(加载模型、设置分布式等),初始化完成前用 UnreadyWorkerProcHandle 表示。ready_pipe 用于子进程向父进程报告"我准备好了"。

管道方向

  • ready_pipe:子进程写端 → 父进程读端
  • death_writer:父进程写端 → 子进程读端(父进程退出时,子进程收到EOF)
WorkerProcHandle(第447-482行)
@dataclass
class WorkerProcHandle:
    proc: BaseProcess                               # 子进程对象
    rank: int                                       # 全局rank
    worker_response_mq: MessageQueue | None         # 本Worker的响应消息队列
    peer_worker_response_mqs: list[MessageQueue | None]  # 远程Worker的响应队列
    death_writer: Connection | None = None          # 死亡通知管道

字段解析

  • worker_response_mq:本地Worker写入的响应队列。单节点模式下每个Worker都有自己的。
  • peer_worker_response_mqs多节点专属。driver节点需要从所有Worker(包括远程节点)收集响应。peer_worker_response_mqs[i] 对应 rank=i 的远程Worker的响应队列。
  • death_writer:与 UnreadyWorkerProcHandle 中含义相同
    @classmethod
    def from_unready_handle(
        cls,
        unready_handle: UnreadyWorkerProcHandle,
        worker_response_mq: MessageQueue | None,
        peer_worker_response_mqs: list[MessageQueue | None],
    ) -> "WorkerProcHandle":
        return cls(
            proc=unready_handle.proc,
            rank=unready_handle.rank,
            worker_response_mq=worker_response_mq,
            peer_worker_response_mqs=peer_worker_response_mqs,
            death_writer=unready_handle.death_writer,
        )

状态转换UnreadyWorkerProcHandleWorkerProcHandle,通过 from_unready_handle 工厂方法完成。转换发生在Worker发送READY信号后,此时响应队列已建立。


2.3 MultiprocExecutor类(第113-437行)

class MultiprocExecutor(Executor):
    supports_pp: bool = True    # 支持流水线并行

类声明:继承自 Executor 抽象基类,声明支持流水线并行(PP)。

2.3.1 构造与初始化
    def __init__(self, vllm_config: VllmConfig, monitor_workers: bool = True):
        self.monitor_workers = monitor_workers
        super().__init__(vllm_config)

monitor_workers 参数

  • True(默认):启动后台线程监控Worker进程存活状态
  • False:headless模式下不监控,由外部系统负责

调用父类 Executor.__init__,它会设置 self.vllm_configself.parallel_configself.scheduler_config 等属性,并调用 self._init_executor()

_get_parallel_sizes(第218-230行)
    def _get_parallel_sizes(self) -> tuple[int, int, int]:
        self.world_size = self.parallel_config.world_size
        assert self.world_size % self.parallel_config.nnodes_within_dp == 0, (
            f"global world_size ({self.parallel_config.world_size}) must be "
            f"divisible by nnodes_within_dp "
            f"({self.parallel_config.nnodes_within_dp}). "
        )
        self.local_world_size = self.parallel_config.local_world_size
        tp_size = self.parallel_config.tensor_parallel_size
        pp_size = self.parallel_config.pipeline_parallel_size
        pcp_size = self.parallel_config.prefill_context_parallel_size
        return tp_size, pp_size, pcp_size

功能:提取并验证并行维度参数。

关键校验world_size % nnodes_within_dp == 0——确保DP组内的节点数能整除全局世界大小,这是分布式训练的基本约束。

返回三个并行维度:TP(张量并行)、PP(流水线并行)、PCP(预填充上下文并行)。

_init_executor(第120-216行)——核心初始化

这是整个执行器最复杂的初始化流程,逐步分析:

    def _init_executor(self) -> None:
        # Call self.shutdown at exit to clean up
        # and ensure workers will be terminated.
        self._finalizer = weakref.finalize(self, self.shutdown)

弱引用终结器:当 MultiprocExecutor 对象被垃圾回收时,自动调用 self.shutdown()。使用 weakref.finalize 而非 __del__ 的原因:

  1. __del__ 在解释器关闭时可能不执行
  2. weakref.finalize 更可靠,即使对象处于循环引用中也能触发
  3. 回调函数在独立上下文中执行,不会持有对象引用
        self.is_failed = False
        self.failure_callback: FailureCallback | None = None

故障状态is_failed 标记执行器是否故障,failure_callback 是故障时的回调函数(由EngineCore注册)。

        tp_size, pp_size, pcp_size = self._get_parallel_sizes()
        assert self.world_size == tp_size * pp_size * pcp_size, (
            f"world_size ({self.world_size}) must be equal to the "
            f"tensor_parallel_size ({tp_size}) x pipeline"
            f"_parallel_size ({pp_size}) x prefill_context"
            f"_parallel_size ({pcp_size}). "
        )

关键不变量world_size = TP × PP × PCP。这三个并行维度是正交的,它们的乘积必须等于总进程数。

        set_multiprocessing_worker_envs()

设置多进程环境变量(详见2.6节)。

        # use the loopback address get_loopback_ip() for communication.
        distributed_init_method = get_distributed_init_method(
            get_loopback_ip(), get_open_port()
        )

分布式初始化方法:使用回环地址(127.0.0.1)而非外部IP,因为同节点内的Worker通过本地回环通信效率更高。get_open_port() 获取一个可用端口用于Torch分布式后端。

为什么用loopback:在单节点场景下,Worker都在同一机器上,使用loopback避免不必要的网络跳转。多节点场景则通过 master_addr 配置使用外部IP。

        self.rpc_broadcast_mq: MessageQueue | None = None
        scheduler_output_handle: Handle | None = None

消息队列初始化

  • rpc_broadcast_mq:广播消息队列,Executor写入→所有Worker读取
  • scheduler_output_handle:广播队列的序列化句柄,传递给子进程使其能连接到同一共享内存区域
        if self.parallel_config.node_rank_within_dp == 0:

Leader/Follower区分:在多节点DP(Data Parallel)场景中,每个DP组有一个leader节点。只有leader节点创建广播消息队列,follower节点通过分布式通信组接收数据。

            max_chunk_bytes = envs.VLLM_MQ_MAX_CHUNK_BYTES_MB * 1024 * 1024
            mq_connect_ip = get_ip()
            logger.info(
                "DP group leader: node_rank=%d, node_rank_within_dp=%d, "
                "master_addr=%s, mq_connect_ip=%s (local), "
                "world_size=%d, local_world_size=%d",
                self.parallel_config.node_rank,
                self.parallel_config.node_rank_within_dp,
                self.parallel_config.master_addr,
                mq_connect_ip,
                self.world_size,
                self.local_world_size,
            )

max_chunk_bytes:控制消息队列单个chunk的最大字节数。当 SchedulerOutput 较大时,会被分块传输。默认值来自环境变量 VLLM_MQ_MAX_CHUNK_BYTES_MB

mq_connect_ip:使用真实IP(get_ip())而非loopback,因为消息队列可能需要跨节点连接。

            self.rpc_broadcast_mq = MessageQueue(
                self.world_size,
                self.local_world_size,
                max_chunk_bytes=max_chunk_bytes,
                connect_ip=mq_connect_ip,
            )
            scheduler_output_handle = self.rpc_broadcast_mq.export_handle()

创建广播队列

  • self.world_size:总队列容量(全局Worker数)
  • self.local_world_size:本地读取者数量(本节点GPU数)
  • connect_ip:连接IP地址
  • export_handle():将队列的共享内存元数据序列化为 Handle 对象,可以传递给子进程

Handle的本质:包含共享内存文件路径、偏移量等元信息。子进程通过 Handle 可以映射到同一块共享内存,实现零拷贝通信。

Worker进程创建
        context = get_mp_context()
        shared_worker_lock = context.Lock()

get_mp_context():获取multiprocessing上下文,可能是 spawnforkforkserver。vLLM 通常使用 spawn 模式(GPU相关代码不能在fork后正常工作)。

shared_worker_lock:跨进程锁,用于Worker间共享资源的互斥访问。具体用途是Worker初始化时的序列化操作(如模型加载时的NCCL初始化可能需要同步)。

        unready_workers: list[UnreadyWorkerProcHandle] = []
        success = False
        try:

异常安全模式success 标志 + try/finally 确保初始化失败时清理已创建的Worker进程。

            global_start_rank = (
                self.local_world_size * self.parallel_config.node_rank_within_dp
            )

Rank计算:在多节点场景中,每个节点的Worker rank是连续的。global_start_rank 是当前节点第一个Worker的全局rank。

示例:3节点,每节点4GPU → local_world_size=4

  • node_rank_within_dp=0: global_start_rank=0, ranks=[0,1,2,3]
  • node_rank_within_dp=1: global_start_rank=4, ranks=[4,5,6,7]
  • node_rank_within_dp=2: global_start_rank=8, ranks=[8,9,10,11]
            inherited_fds: list[int] | None = (
                [] if context.get_start_method() == "fork" else None
            )

inherited_fds:仅在 fork 模式下追踪继承的文件描述符。fork时子进程继承父进程所有打开的fd,如果不关闭多余的管道fd,会导致管道无法正常关闭(EOF检测失败)。

spawn 模式下无需关注:spawn创建全新进程,只序列化必要的数据。

            cpu_omp_manager = OMPProcessManager(self.vllm_config)

OpenMP管理:针对CPU后端,为每个Worker配置OpenMP线程亲和性(CPU绑核)。

            for local_rank in range(self.local_world_size):
                global_rank = global_start_rank + local_rank
                is_driver_worker = self._is_driver_worker(global_rank)

遍历本地Worker:为每个GPU创建一个Worker进程。

is_driver_worker:rank % tp_size == 0 的Worker是driver worker。在TP组中,只有rank=0的Worker负责采样token和发送输出。

                with cpu_omp_manager.configure_omp_envs(
                    rank=global_rank, local_rank=local_rank
                ):
                    unready_worker_handle = WorkerProc.make_worker_process(
                        vllm_config=self.vllm_config,
                        local_rank=local_rank,
                        rank=global_rank,
                        distributed_init_method=distributed_init_method,
                        input_shm_handle=scheduler_output_handle,
                        shared_worker_lock=shared_worker_lock,
                        is_driver_worker=is_driver_worker,
                        inherited_fds=inherited_fds,
                    )

创建Worker进程:调用 WorkerProc.make_worker_process 静态方法(详见2.4节)。传入的参数包括配置、rank信息、分布式初始化方法、共享内存句柄等。

                unready_workers.append(unready_worker_handle)
                if inherited_fds is not None:
                    inherited_fds.append(unready_worker_handle.death_writer.fileno())
                    inherited_fds.append(unready_worker_handle.ready_pipe.fileno())

Fork模式下的FD追踪:记录已创建管道的文件描述符,后续Worker进程需要关闭这些"不属于自己"的fd。

等待Worker就绪
            # Workers must be created before wait_for_ready to avoid
            # deadlock, since worker.init_device() does a device sync.

            # Wait for all local workers to be ready.
            self.workers = WorkerProc.wait_for_ready(unready_workers)

关键注释:所有Worker进程必须先创建完毕,再等待就绪。因为 init_device() 中包含设备同步操作(NCCL通信初始化),如果Worker进程尚未全部启动,同步操作会死锁。

wait_for_ready 使用 multiprocessing.connection.wait() 高效等待多个管道,类似 select/poll

启动Worker监控
            if self.monitor_workers:
                self.start_worker_monitor()

启动后台线程监控Worker进程存活状态(详见2.3.2节)。

构建响应队列列表
            self.response_mqs = []
            if self.parallel_config.node_rank_within_dp == 0:
                for rank in range(self.world_size):
                    if rank < self.local_world_size:
                        local_message_queue = self.workers[rank].worker_response_mq
                        assert local_message_queue is not None
                        self.response_mqs.append(local_message_queue)
                    else:
                        remote_message_queue = self.workers[0].peer_worker_response_mqs[rank]
                        assert remote_message_queue is not None
                        self.response_mqs.append(remote_message_queue)

构建全局响应队列列表self.response_mqs[rank] 可以获取 rank 对应 Worker 的响应队列。

单节点rank < local_world_size 都走本地路径,直接从 self.workers[rank] 获取。

多节点:远程Worker的响应队列通过 peer_worker_response_mqs 获取。这些队列实际上是远程Worker通过分布式通信组暴露的句柄创建的。

注意self.workers[0].peer_worker_response_mqs[rank]——即使远程Worker不在本节点,其响应队列句柄也通过Worker 0代理(因为所有Worker的 peer_response_handles 在READY消息中传递给了父进程)。

等待消息队列就绪
            # Ensure message queues are ready. Will deadlock if re-ordered
            # Must be kept consistent with the WorkerProc.

            if self.rpc_broadcast_mq is not None:
                self.rpc_broadcast_mq.wait_until_ready()
            for response_mq in self.response_mqs:
                response_mq.wait_until_ready()

死锁风险:消息队列的握手顺序必须与Worker端一致。wait_until_ready 会阻塞直到所有读者/写者都完成注册。如果顺序不一致,可能A端在等B端注册,而B端在等A端发送数据,形成死锁。

            self.futures_queue = deque[FutureWrapper]()
            self._post_init_executor()
            success = True

futures_queue:所有 FutureWrapper 共享的有序队列。

_post_init_executor:钩子方法,子类可覆写以添加额外初始化逻辑。

异常处理
        finally:
            if not success:
                for uw in unready_workers:
                    if uw.death_writer is not None:
                        uw.death_writer.close()
                        uw.death_writer = None
                self._ensure_worker_termination([uw.proc for uw in unready_workers])

失败清理:关闭 death_writer 通知子进程退出,然后等待/强制终止所有Worker进程。

2.3.2 Worker健康监控
    def start_worker_monitor(self, inline=False) -> None:
        workers = self.workers
        self_ref = weakref.ref(self)

self_ref = weakref.ref(self):监控线程持有Executor的弱引用,避免循环引用阻止垃圾回收。

        def monitor_workers():
            sentinels = [h.proc.sentinel for h in workers]
            died = multiprocessing.connection.wait(sentinels)

proc.sentinel:每个进程对象的哨兵fd,进程退出时变为可读。multiprocessing.connection.wait() 类似 select(),等待任一fd变为可读。

died:返回第一个变为可读的sentinel列表(可能有多个同时退出)。

            _self = self_ref()
            if not _self or getattr(_self, "shutting_down", False):
                logger.debug("MultiprocWorkerMonitor: shutdown already initiated")
                return

检查Executor状态

  • _selfNone:Executor已被GC回收,无需处理
  • shutting_downTrue:正在正常关闭,无需额外处理
            _self.is_failed = True
            proc_name = next(h.proc.name for h in workers if h.proc.sentinel == died[0])
            logger.error(
                "Worker proc %s died unexpectedly, shutting down executor.", proc_name
            )
            _self.shutdown()
            callback = _self.failure_callback
            if callback is not None:
                _self.failure_callback = None
                callback()

故障处理流程

  1. 标记执行器为故障状态
  2. 找到死亡进程名称
  3. 执行关闭流程
  4. 调用故障回调(通知EngineCore)

_self.failure_callback = None:先保存再清空,防止回调中再次触发shutdown。

        if not inline:
            Thread(
                target=monitor_workers, daemon=True, name="MultiprocWorkerMonitor"
            ).start()
            return
        monitor_workers()

inline 模式:某些测试场景可能需要在当前线程中直接运行监控,而非后台线程。

2.3.3 故障回调注册
    def register_failure_callback(self, callback: FailureCallback):
        if self.is_failed:
            callback()
        else:
            self.failure_callback = callback

设计精妙:如果执行器已经故障,立即调用回调;否则注册回调等待将来触发。这处理了竞态条件——注册回调时执行器可能已经故障。

2.3.4 execute_model(第278-288行)
    def execute_model(
        self, scheduler_output: SchedulerOutput, non_block: bool = False
    ) -> ModelRunnerOutput | None | Future[ModelRunnerOutput | None]:
        return self.collective_rpc(
            "execute_model",
            args=(scheduler_output,),
            unique_reply_rank=self.output_rank,
            non_block=non_block,
            timeout=envs.VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS,
            kv_output_aggregator=self.kv_output_aggregator,
        )

执行模型:将 SchedulerOutput 广播给所有Worker执行模型推理。

关键参数

  • unique_reply_rank=self.output_rank:只需要最后一个PP阶段的TP rank=0的Worker返回结果
  • non_block:异步模式下返回 Future 而非阻塞等待
  • kv_output_aggregator:聚合KV传输输出(跨Worker的KV缓存传输)
  • timeout:从环境变量读取超时时间,防止无限等待
2.3.5 sample_tokens(第290-300行)
    def sample_tokens(
        self, grammar_output: GrammarOutput | None, non_block: bool = False
    ) -> ModelRunnerOutput | Future[ModelRunnerOutput]:
        return self.collective_rpc(
            "sample_tokens",
            args=(grammar_output,),
            unique_reply_rank=self.output_rank,
            non_block=non_block,
            timeout=envs.VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS,
            kv_output_aggregator=self.kv_output_aggregator,
        )

语法约束采样:将 GrammarOutput 发送给Worker进行受限token采样(如JSON模式、正则约束等)。

2.3.6 execute_dummy_batch / take_draft_token_ids
    def execute_dummy_batch(self) -> None:
        self.collective_rpc("execute_dummy_batch", unique_reply_rank=self.output_rank)

空批处理:用于预热(warmup)或填充流水线气泡。在PP场景中,需要保持流水线满载,空批处理填充不完整的流水线阶段。

    def take_draft_token_ids(self) -> DraftTokenIds | None:
        return self.collective_rpc(
            "take_draft_token_ids", unique_reply_rank=self.output_rank
        )

推测解码:从draft模型获取候选token ID。只从 output_rank 获取即可。

2.3.7 collective_rpc——核心RPC机制(第314-380行)
    def collective_rpc(
        self,
        method: str | Callable,
        timeout: float | None = None,
        args: tuple = (),
        kwargs: dict | None = None,
        non_block: bool = False,
        unique_reply_rank: int | None = None,
        kv_output_aggregator: KVOutputAggregator | None = None,
    ) -> Any:
        """Returns single result if unique_reply_rank and/or kv_output_aggregator
        is provided, otherwise list."""

这是整个执行器最核心的方法——所有Worker通信都通过它。

参数解析

  • method:字符串方法名或可调用对象。字符串→直接 getattr,可调用→cloudpickle 序列化
  • timeout:RPC超时时间
  • args/kwargs:方法参数
  • non_block:是否异步(返回Future而非阻塞)
  • unique_reply_rank:指定只有某个rank回复(优化:不需要所有Worker都发回结果)
  • kv_output_aggregator:KV输出聚合器(用于KV传输场景)
        assert self.rpc_broadcast_mq is not None, (
            "collective_rpc should not be called on follower node"
        )
        if self.is_failed:
            raise RuntimeError("Executor failed.")

前置检查:follower节点不应调用此方法(它没有广播队列),故障状态也不应继续。

        deadline = None if timeout is None else time.monotonic() + timeout
        kwargs = kwargs or {}

超时截止时间:使用单调时钟(time.monotonic),不受系统时间调整影响。

        if kv_output_aggregator is not None:
            output_rank = None
            aggregate: Callable[[Any], Any] = partial(
                kv_output_aggregator.aggregate, output_rank=unique_reply_rank or 0
            )
        else:
            output_rank = unique_reply_rank
            aggregate = lambda x: x

KV聚合 vs 唯一回复

  • 如果有 kv_output_aggregator:需要所有Worker的输出来聚合KV信息,所以 output_rank=None(所有Worker都回复),然后通过 aggregate 函数合并
  • 否则:只有 unique_reply_rank 的Worker回复

这体现了两种通信模式

  1. 聚合模式:所有Worker回复 → 聚合器合并 → 单一结果
  2. 唯一回复模式:只有指定rank回复 → 直接返回
        if isinstance(method, str):
            send_method = method
        else:
            send_method = cloudpickle.dumps(method, protocol=pickle.HIGHEST_PROTOCOL)
        self.rpc_broadcast_mq.enqueue((send_method, args, kwargs, output_rank))

广播RPC请求

  1. 字符串方法名直接发送(零开销)
  2. 可调用对象用 cloudpickle 序列化(支持lambda、局部函数等)
  3. 将四元组 (method, args, kwargs, output_rank) 入队

所有Worker从同一个广播队列读取相同的数据——这就是"广播"的含义。MessageQueue 的共享内存设计确保零拷贝。

        response_mqs: Sequence[MessageQueue] = self.response_mqs
        if output_rank is not None:
            response_mqs = (response_mqs[output_rank],)

响应队列选择:如果只需要特定rank的回复,缩小监听范围。

        def get_response():
            responses = []
            for mq in response_mqs:
                dequeue_timeout = (
                    None if deadline is None else (deadline - time.monotonic())
                )
                try:
                    status, result = mq.dequeue(timeout=dequeue_timeout)
                except TimeoutError as e:
                    raise TimeoutError(f"RPC call to {method} timed out.") from e
                if status != WorkerProc.ResponseStatus.SUCCESS:
                    raise RuntimeError(
                        f"Worker failed with error '{result}', please check the"
                        " stack trace above for the root cause"
                    )
                responses.append(result)
            return responses[0] if output_rank is not None else responses

get_response 闭包:从响应队列中读取结果。

超时处理:每次 dequeue 的剩余超时 = deadline - 当前时间。这确保总等待时间不超过 timeout

错误处理

  • TimeoutError:重新抛出更清晰的错误消息
  • FAILURE 状态:Worker执行异常,抛出 RuntimeError

返回格式

  • 唯一回复模式:单个结果
  • 全部回复模式:结果列表
        future = FutureWrapper(
            self.futures_queue,
            get_response=get_response,
            aggregate=aggregate,
        )
        return future if non_block else future.result()

创建Future并返回

  • non_block=True:返回Future,调用者稍后获取结果
  • non_block=False:直接调用 future.result() 阻塞等待
2.3.8 _ensure_worker_termination(第382-405行)
    @staticmethod
    def _ensure_worker_termination(worker_procs: list[BaseProcess]):
        """Ensure that all worker processes are terminated."""

三级终止策略

        def wait_for_termination(procs, timeout):
            if not time:
                # If we are in late stage shutdown, the interpreter may replace
                # `time` with `None`.
                return all(not proc.is_alive() for proc in procs)
            start_time = time.time()
            while time.time() - start_time < timeout:
                if all(not proc.is_alive() for proc in procs):
                    return True
                time.sleep(0.1)
            return False

if not time 检查:Python解释器关闭时,模块级变量可能被设为 None。这是一个防御性编程技巧,确保在解释器关闭阶段也能安全处理。

        active_procs = lambda: [proc for proc in worker_procs if proc.is_alive()]
        logger.debug("Worker Termination: allow workers to gracefully shutdown")
        if wait_for_termination(active_procs(), 4):
            return

第一级:等待4秒,允许Worker优雅关闭。

        logger.debug("Worker Termination: workers still running sending SIGTERM")
        for p in active_procs():
            p.terminate()
        if not wait_for_termination(active_procs(), 4):

第二级:发送 SIGTERM,再等4秒。

            logger.debug(
                "Worker Termination: resorting to SIGKILL to take down workers"
            )
            for p in active_procs():
                p.kill()

第三级:发送 SIGKILL,强制终止。这是最终手段,不留给Worker任何清理机会。

设计哲学:优雅→礼貌→强制,与Linux进程管理惯例一致。

2.3.9 shutdown(第407-434行)
    def shutdown(self):
        if not getattr(self, "shutting_down", False):
            logger.debug("Triggering shutdown of workers")
            self.shutting_down = True

幂等性:通过 shutting_down 标志确保只执行一次。getattr 而非 self.shutting_down 是因为 shutdown 可能在 _init_executor 完成前被调用(此时属性可能不存在)。

            if workers := getattr(self, "workers", None):
                for w in workers:
                    if w.death_writer is not None:
                        w.death_writer.close()
                        w.death_writer = None
                self._ensure_worker_termination([w.proc for w in workers])

关闭流程

  1. 关闭 death_writer → 子进程的 death_reader 收到EOF → 子进程知道父进程要退出
  2. 等待Worker进程终止
                for w in workers:
                    if w.worker_response_mq is not None:
                        w.worker_response_mq.shutdown()
                        w.worker_response_mq = None

关闭响应队列:释放共享内存资源。

        if rpc_broadcast_mq := getattr(self, "rpc_broadcast_mq", None):
            rpc_broadcast_mq.shutdown()
            self.rpc_broadcast_mq = None
        if response_mqs := getattr(self, "response_mqs", None):
            for mq in response_mqs:
                mq.shutdown()
            self.response_mqs = []

关闭广播队列和远程响应队列:注意关闭顺序——先关闭Worker级别的资源,再关闭Executor级别的资源。

2.3.10 其他方法
    def check_health(self) -> None:
        self.collective_rpc("check_health", timeout=10)

健康检查:向所有Worker发送 check_health RPC,10秒超时。任何一个Worker无响应或异常都会传播到EngineCore。

    @cached_property
    def max_concurrent_batches(self) -> int:
        pp_size = self.parallel_config.pipeline_parallel_size
        return 2 if pp_size <= 1 and self.scheduler_config.async_scheduling else pp_size

最大并发批次数

  • PP > 1:需要PP-size个并发批次填满流水线
  • PP = 1 + 异步调度:允许2个并发批次(overlap调度和执行)
  • PP = 1 + 同步调度:隐式返回1(由父类默认值或调用方决定)
    def _get_output_rank(self) -> int:
        return (
            self.world_size
            - self.parallel_config.tensor_parallel_size
            * self.parallel_config.prefill_context_parallel_size
        )
    ```

**输出rank计算**:最后一个PP阶段的第一个TP Worker。

**示例**(TP=8, PP=4, PCP=1, world_size=32):
- PP rank 0: ranks 0-7
- PP rank 1: ranks 8-15
- PP rank 2: ranks 16-23
- PP rank 3: ranks 24-31
- output_rank = 32 - 8 * 1 = 24(PP rank 3的TP rank 0)

```python
    @classmethod
    def supports_async_scheduling(cls) -> bool:
        return True

声明支持异步调度模式。


2.4 WorkerProc类(第485-940行)

WorkerProc 是在子进程中运行的Worker封装,是整个多进程执行器的"另一半"。

class WorkerProc:
    """Wrapper that runs one Worker in a separate process."""
    READY_STR = "READY"
    rpc_broadcast_mq: MessageQueue | None
    worker_response_mq: MessageQueue | None

类属性READY_STR 是就绪信号字符串,两个消息队列属性在运行时赋值。

2.4.1 _init_message_queues(第494-528行)
    def _init_message_queues(
        self, input_shm_handle: Handle, vllm_config: VllmConfig
    ) -> None:

消息队列初始化——单节点和多节点走不同路径。

        if vllm_config.parallel_config.nnodes_within_dp == 1:
            # Initialize MessageQueue for receiving SchedulerOutput
            self.rpc_broadcast_mq = MessageQueue.create_from_handle(
                input_shm_handle, self.worker.rank
            )

单节点模式

  • 通过 Handle 创建连接到Executor端广播队列的读取端
  • self.worker.rank 指定当前Worker是第几个读取者
            # Initializes a message queue for sending the model output
            self.worker_response_mq = MessageQueue(1, 1)
            self.peer_response_handles = []

响应队列:创建一个容量=1、读者=1的消息队列。Executor端通过READY消息中传递的Handle连接到此队列。

        else:
            # Initialize remote MessageQueue for receiving SchedulerOutput across nodes
            self.rpc_broadcast_mq = get_inner_dp_world_group().create_mq_broadcaster(
                external_writer_handle=input_shm_handle,
                blocking=False,
            )

多节点模式:使用分布式通信组创建跨节点广播器。external_writer_handle 来自Executor端,blocking=False 因为握手机制在 wait_until_ready 中完成。

            self.worker_response_mq, self.peer_response_handles = (
                get_inner_dp_world_group().create_single_reader_mq_broadcasters(
                    reader_rank_in_group=0
                )
            )

远程响应队列:创建单读者广播器,reader_rank_in_group=0 表示只有driver节点(rank 0)读取。peer_response_handles 包含所有Worker的响应队列句柄,通过READY消息传回Executor。

2.4.2 WorkerProc.init(第530-590行)
    @instrument(span_name="Worker init")
    def __init__(
        self,
        vllm_config: VllmConfig,
        local_rank: int,
        rank: int,
        distributed_init_method: str,
        input_shm_handle: Handle,
        shared_worker_lock: LockType,
        is_driver_worker: bool,
    ):

@instrument:追踪装饰器,记录Worker初始化耗时。

        self.rank = rank
        wrapper = WorkerWrapperBase(rpc_rank=local_rank, global_rank=rank)

Worker包装器WorkerWrapperBase 封装了实际的模型执行逻辑。rpc_rankglobal_rank 用于区分不同并行组的角色。

        all_kwargs: list[dict] = [
            {} for _ in range(vllm_config.parallel_config.world_size)
        ]
        all_kwargs[local_rank] = {
            "vllm_config": vllm_config,
            "local_rank": local_rank,
            "rank": rank,
            "distributed_init_method": distributed_init_method,
            "is_driver_worker": is_driver_worker,
            "shared_worker_lock": shared_worker_lock,
        }
        wrapper.init_worker(all_kwargs)

初始化Worker:创建一个长度为 world_size 的参数列表,只填充当前Worker的参数。init_worker 方法会根据 rpc_rank 选取自己的参数。这种设计允许在Ray执行器中所有Worker共享同一个参数列表,而在MultiprocExecutor中只有当前Worker的参数非空。

        self.worker = wrapper
        self.setup_proc_title_and_log_prefix(
            enable_ep=vllm_config.parallel_config.enable_expert_parallel
        )

设置进程标题:如 Worker_TP0_PP0 等,便于在 ps / top 中识别。

        # Load model
        self.worker.init_device()

初始化设备:设置CUDA设备、初始化分布式后端(NCCL)、创建并行通信组。这是最耗时的初始化步骤之一。

        self.setup_proc_title_and_log_prefix(
            enable_ep=vllm_config.parallel_config.enable_expert_parallel
        )

再次设置标题init_device 后并行组已初始化,标题可以包含更详细的并行信息(DP/PP/TP/PCP/DCP/EP)。

        if envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH:
            self.worker.elastic_ep_execute("load_model")
        else:
            self.worker.load_model()

加载模型:支持弹性专家并行(Elastic EP)模式,使用 elastic_ep_execute 进行特殊加载。

        scheduler_config = vllm_config.scheduler_config
        self.use_async_scheduling = scheduler_config.async_scheduling
        if self.use_async_scheduling:
            self.async_output_queue: queue.Queue = queue.Queue()
            self.async_output_copy_thread = Thread(
                target=self.async_output_busy_loop,
                daemon=True,
                name="WorkerAsyncOutputCopy",
            )
            self.async_output_copy_thread.start()

异步调度模式

  • 启动一个后台线程 async_output_busy_loop
  • Worker主循环的输出先放入 async_output_queue
  • 后台线程从队列取出输出,通过 worker_response_mq 发送
  • 这样Worker主循环可以立即处理下一个请求,无需等待输出传输
        current_platform.update_block_size_for_backend(vllm_config)

更新块大小:根据注意力后端更新KV缓存的块大小配置。

        self._init_message_queues(input_shm_handle, vllm_config)
        enable_envs_cache()

最后初始化消息队列:必须在 init_device 之后,因为多节点场景需要分布式组已初始化。enable_envs_cache 冻结环境变量,之后不再从 os.environ 读取。

2.4.3 make_worker_process——创建子进程(第592-642行)
    @staticmethod
    def make_worker_process(
        vllm_config: VllmConfig,
        local_rank: int,
        rank: int,
        distributed_init_method: str,
        input_shm_handle,
        shared_worker_lock: LockType,
        is_driver_worker: bool,
        inherited_fds: list[int] | None = None,
    ) -> UnreadyWorkerProcHandle:

工厂方法:在父进程中调用,创建Worker子进程并返回未就绪句柄。

        context = get_mp_context()
        ready_reader, ready_writer = context.Pipe(duplex=False)
        death_reader, death_writer = context.Pipe(duplex=False)

创建两对管道

  • ready_pipe:子进程 → 父进程,报告初始化完成
  • death_pipe:父进程 → 子进程,检测父进程退出

duplex=False:单向管道,更高效且语义更清晰。

        if inherited_fds is not None:
            inherited_fds = inherited_fds.copy()
            inherited_fds.extend((ready_reader.fileno(), death_writer.fileno()))

Fork模式FD追踪:记录本Worker创建的管道中"属于父进程端"的fd(ready_readerdeath_writer)。后续Worker需要关闭这些fd。

为什么不追踪 ready_writerdeath_reader?这两个在子进程中使用,子进程应该保留它们。

        process_kwargs = {
            "vllm_config": vllm_config,
            "local_rank": local_rank,
            "rank": rank,
            "distributed_init_method": distributed_init_method,
            "input_shm_handle": input_shm_handle,
            "ready_pipe": ready_writer,
            "death_pipe": death_reader,
            "shared_worker_lock": shared_worker_lock,
            "is_driver_worker": is_driver_worker,
            "inherited_fds": inherited_fds if inherited_fds is not None else [],
        }

子进程参数:注意传递的是管道的"子进程端"。

        proc = context.Process(
            target=WorkerProc.worker_main,
            kwargs=process_kwargs,
            name=f"VllmWorker-{rank}",
            daemon=True,
        )

创建进程对象

  • target=WorkerProc.worker_main:子进程入口函数
  • daemon=True:守护进程,主进程退出时自动终止
  • name=f"VllmWorker-{rank}":便于调试
        with numa_utils.configure_subprocess(
            vllm_config, local_rank, process_kind="worker"
        ):
            proc.start()

NUMA绑定:如果配置了NUMA亲和性,在启动子进程时设置CPU绑核。

        ready_writer.close()
        death_reader.close()

关闭子进程端的管道:父进程不需要写入ready_pipe或读取death_pipe,关闭以避免fd泄漏。

        return UnreadyWorkerProcHandle(proc, rank, ready_reader, death_writer)

返回未就绪句柄,包含父进程需要的信息。

2.4.4 wait_for_ready——等待Worker就绪(第661-699行)
    @staticmethod
    def wait_for_ready(
        unready_proc_handles: list[UnreadyWorkerProcHandle],
    ) -> list[WorkerProcHandle]:

等待所有Worker完成初始化,将 UnreadyWorkerProcHandle 转换为 WorkerProcHandle

        e = Exception(
            "WorkerProc initialization failed due to an exception in a "
            "background process. See stack trace for the root cause."
        )

预构造异常对象——如果Worker初始化失败,抛出此异常。

        pipes = {handle.ready_pipe: handle for handle in unready_proc_handles}
        ready_proc_handles: list[WorkerProcHandle | None] = [None] * len(
            unready_proc_handles
        )

管道→句柄映射:通过 multiprocessing.connection.wait() 等待多个管道。

        while pipes:
            ready = multiprocessing.connection.wait(pipes.keys())

高效等待wait() 类似 select(),返回可读的管道列表。不会CPU空转。

            for pipe in ready:
                assert isinstance(pipe, Connection)
                try:
                    unready_proc_handle = pipes.pop(pipe)
                    response: dict[str, Any] = pipe.recv()
                    if response["status"] != "READY":
                        raise e

处理就绪信号

  1. 从映射中移除已就绪的管道(pop 确保不会重复处理)
  2. 读取Worker发送的字典
  3. 验证状态为 READY
                    idx = unready_proc_handle.rank % len(ready_proc_handles)
                    ready_proc_handles[idx] = WorkerProc.wait_for_response_handle_ready(
                        response, unready_proc_handle
                    )

索引计算rank % len(ready_proc_handles) 将全局rank映射到本地索引。单节点下 rank == idx;多节点下需要取模。

                except EOFError:
                    e.__suppress_context__ = True
                    raise e from None

EOFError处理:Worker进程在发送READY前崩溃,管道关闭产生EOF。from None__suppress_context__ 清理异常链,避免暴露内部细节。

                finally:
                    pipe.close()

关闭管道:就绪信号读取完毕,不再需要。

        return cast(list[WorkerProcHandle], ready_proc_handles)

类型断言:所有 None 都应被替换为实际的 WorkerProcHandle,否则上面的检查会抛出异常。

2.4.5 wait_for_response_handle_ready(第644-659行)
    @staticmethod
    def wait_for_response_handle_ready(
        handles: dict[str, Any], proc_handle: UnreadyWorkerProcHandle
    ) -> WorkerProcHandle:
        response_handle = handles["handle"]
        worker_response_mq: MessageQueue | None = None
        if len(response_handle.local_reader_ranks) > 0:
            worker_response_mq = MessageQueue.create_from_handle(response_handle, 0)

创建本地响应队列连接:如果有本地读取者,创建消息队列连接。reader_rank=0 因为Executor是唯一的读取者。

        peer_response_handles = handles["peer_response_handles"]
        peer_worker_response_mqs = [
            MessageQueue.create_from_handle(handle, -1)
            if handle.remote_subscribe_addr is not None
            else None
            for handle in peer_response_handles
        ]

创建远程响应队列连接:遍历所有远程Worker的句柄,创建连接。reader_rank=-1 表示远程订阅者(不直接读取,通过分布式通信中继)。

2.4.6 WorkerProc.shutdown(第701-710行)
    def shutdown(self):
        if self.rpc_broadcast_mq is not None:
            self.rpc_broadcast_mq.shutdown()
        if self.worker_response_mq is not None:
            self.worker_response_mq.shutdown()
        self.worker.shutdown()
        self.rpc_broadcast_mq = None
        self.worker_response_mq = None
        destroy_model_parallel()
        destroy_distributed_environment()

Worker关闭流程

  1. 关闭消息队列
  2. 关闭Worker(释放GPU内存等)
  3. 销毁模型并行状态
  4. 销毁分布式环境
2.4.7 monitor_death_pipe——父进程存活监控(第712-738行)
    def monitor_death_pipe(self, death_pipe, shutdown_requested: threading.Event):
        if death_pipe is None:
            return

可选监控:如果没有death_pipe(某些特殊配置),跳过。

        def death_pipe_monitor(queues_to_shutdown: list[MessageQueue]):
            try:
                death_pipe.recv()   # 阻塞等待,父进程写入或关闭
            except EOFError:
                logger.info_once("Parent process exited, terminating worker queues")
                shutdown_requested.set()
                for mq in queues_to_shutdown:
                    if mq is not None:
                        mq.shutdown()
            except Exception as e:
                logger.warning("Death monitoring error: %s", e)

EOF检测death_pipe.recv() 正常返回意味着父进程写入了数据(不应该发生)。EOFError 表示管道关闭——父进程退出了。

为什么要关闭消息队列:如果Worker的busy_loop正在等待 rpc_broadcast_mq.dequeue(),需要通过shutdown唤醒它。

        Thread(
            target=death_pipe_monitor,
            args=([self.rpc_broadcast_mq, self.worker_response_mq],),
            daemon=True,
            name="DeathPipeMonitor",
        ).start()

启动监控线程:直接传递队列引用而非self,避免GC问题。

2.4.8 worker_main——子进程入口(第740-816行)

这是Worker子进程的入口函数,是理解整个多进程架构的关键。

    @staticmethod
    def worker_main(*args, **kwargs):
        """Worker initialization and execution loops.
        This runs a background process"""

注意:这是一个静态方法,在子进程中执行。WorkerProc 对象在 worker_main 内部创建。

信号处理
        shutdown_requested = threading.Event()

        def signal_handler(signum, frame):
            nonlocal shutdown_requested
            if not shutdown_requested.is_set():
                shutdown_requested.set()
                logger.debug(
                    "WorkerProc handling signal %d, raising SystemExit", signum
                )
                raise SystemExit()

优雅信号处理

  • shutdown_requested 事件确保信号只处理一次
  • raise SystemExit() 而非直接 sys.exit()——SystemExit 是异常,可以被 except 捕获做清理
        signal.signal(signal.SIGTERM, signal_handler)
        signal.signal(signal.SIGINT, signal_handler)

注册SIGTERM和SIGINT处理器。

参数提取
        worker = None
        ready_writer = kwargs.pop("ready_pipe")
        death_pipe = kwargs.pop("death_pipe", None)

从kwargs中提取管道:这些是 make_worker_process 传入的参数,提取后不再传递给 WorkerProc.__init__

        for fd in kwargs.pop("inherited_fds", []):
            try:
                os.close(fd)
            except Exception as e:
                logger.warning("Error closing inherited connection: %s: %s", type(e), e)

关闭继承的fd:Fork模式下,子进程继承了父进程中其他Worker的管道fd。关闭它们确保管道的正确生命周期。

初始化Worker
        try:
            rank = kwargs.get("rank", 0)
            maybe_init_worker_tracer(
                instrumenting_module_name="vllm.worker",
                process_kind="worker",
                process_name=f"Worker_{rank}",
            )

追踪初始化:为Worker进程设置分布式追踪。

            worker = WorkerProc(*args, **kwargs)

创建WorkerProc实例:在子进程中执行完整的初始化流程(设备初始化、模型加载、消息队列建立)。

            assert worker.worker_response_mq is not None
            if kwargs["vllm_config"].parallel_config.numa_bind:
                numa_utils.log_current_affinity_state(f"Worker_{worker.rank}")

验证和日志:确保响应队列已创建,可选地记录NUMA亲和性状态。

启动死亡监控
            worker.monitor_death_pipe(death_pipe, shutdown_requested)
发送就绪信号
            ready_writer.send(
                {
                    "status": WorkerProc.READY_STR,
                    "handle": worker.worker_response_mq.export_handle(),
                    "peer_response_handles": worker.peer_response_handles,
                }
            )

READY消息内容

  • status:“READY”
  • handle:响应消息队列的序列化句柄
  • peer_response_handles:远程Worker的响应队列句柄列表

这是父子进程握手的关键步骤:父进程通过这些句柄建立到Worker响应队列的连接。

等待消息队列就绪
            if worker.rpc_broadcast_mq is not None:
                worker.rpc_broadcast_mq.wait_until_ready()
            worker.worker_response_mq.wait_until_ready()
            ready_writer.close()
            ready_writer = None

握手同步:确保所有消息队列的读取者和写入者都已注册。完成后关闭ready_pipe(不再需要)。

进入主循环
            worker.worker_busy_loop()
异常处理
        except Exception:
            if ready_writer is not None:
                logger.exception("WorkerProc failed to start.")
            elif shutdown_requested.is_set():
                logger.info("WorkerProc shutting down.")
            else:
                logger.exception("WorkerProc failed.")
            shutdown_requested.set()

异常分类

  • ready_writer is not None:初始化阶段失败(还没发送READY)
  • shutdown_requested.is_set():信号触发的关闭
  • 其他:运行时异常

shutdown_requested.set():防止后续信号处理再次抛出 SystemExit

        except SystemExit as e:
            logger.warning("WorkerProc was terminated")
            raise e

SystemExit不静默:必须重新抛出,否则子进程不会退出。

        finally:
            if ready_writer is not None:
                ready_writer.close()
            if death_pipe is not None:
                death_pipe.close()
            if worker is not None:
                worker.shutdown()

最终清理:关闭管道、释放资源。

2.4.9 ResponseStatus枚举(第818-820行)
    class ResponseStatus(Enum):
        SUCCESS = auto()
        FAILURE = auto()

响应状态:Worker的RPC结果通过 (status, result) 元组返回。SUCCESS 表示正常结果,FAILURE 表示异常。

2.4.10 enqueue_output——输出入队(第822-837行)
    def enqueue_output(self, output: Any):
        if isinstance(output, AsyncModelRunnerOutput):
            output = output.get_output()

处理异步输出AsyncModelRunnerOutput 是延迟计算的结果,调用 get_output() 获取实际值。

        if isinstance(output, Exception):
            result = (WorkerProc.ResponseStatus.FAILURE, str(output))
        else:
            result = (WorkerProc.ResponseStatus.SUCCESS, output)

异常序列化:异常对象可能不可pickle,转换为字符串以确保可靠传输。

        if (response_mq := self.worker_response_mq) is not None:
            response_mq.enqueue(result)

入队:注意使用walrus运算符+None检查,因为消息队列可能在shutdown后变为None。

2.4.11 handle_output——输出处理分发(第839-847行)
    def handle_output(self, output: Any):
        if self.use_async_scheduling:
            self.async_output_queue.put(output)
        else:
            self.enqueue_output(output)

调度模式分发

  • 异步调度:输出先进入线程间队列,由后台线程负责入队到MessageQueue
  • 同步调度:直接入队

异步模式的好处:Worker的 worker_busy_loop 不需要等待输出传输完成,可以立即开始处理下一个输入,实现计算与通信的overlap。

2.4.12 async_output_busy_loop——异步输出线程(第849-867行)
    def async_output_busy_loop(self):
        from vllm.platforms import current_platform
        if hasattr(self.worker, "device"):
            current_platform.set_device(self.worker.device)

设置CUDA设备:关键步骤!新线程不会继承主线程的CUDA上下文。如果不设置,任何CUDA操作都会在设备0上创建新上下文,浪费GPU内存。

        while True:
            output = self.async_output_queue.get()
            self.enqueue_output(output)

无限循环:从线程间队列取出输出,入队到消息队列。queue.get() 在队列为空时阻塞,不会浪费CPU。

没有退出机制:线程是daemon的,随主进程退出而终止。这在正常关闭时没问题,但如果需要优雅退出,可能需要添加哨兵值。

2.4.13 worker_busy_loop——Worker主循环(第869-895行)

这是Worker的核心执行循环。

    def worker_busy_loop(self):
        """Main busy loop for Multiprocessing Workers"""
        assert self.rpc_broadcast_mq is not None

前置断言:确保广播队列已初始化。

        while True:
            method, args, kwargs, output_rank = self.rpc_broadcast_mq.dequeue(
                indefinite=True
            )

等待RPC调用dequeue(indefinite=True) 无限等待新消息。返回四元组 (method, args, kwargs, output_rank)

            try:
                if isinstance(method, str):
                    func = getattr(self.worker, method)
                elif isinstance(method, bytes):
                    func = partial(cloudpickle.loads(method), self.worker)

方法解析

  • 字符串:通过 getattr 获取Worker的属性方法(如 "execute_model"
  • 字节串:用 cloudpickle 反序列化,返回一个接受Worker作为第一个参数的函数,用 partial 绑定
                output = func(*args, **kwargs)

执行方法:调用Worker的方法并获取输出。

            except Exception as e:
                if hasattr(e, "add_note"):
                    e.add_note(traceback.format_exc())
                logger.exception("WorkerProc hit an exception.")
                if output_rank is None or self.rank == output_rank:
                    self.handle_output(e)
                continue

异常处理

  1. 附带堆栈信息(Python 3.11+的 add_note
  2. 记录日志
  3. 只向需要回复的rank发送异常

output_rank is None:表示所有Worker都需要回复(聚合模式),当前Worker是其中之一。

            if output_rank is None or self.rank == output_rank:
                self.handle_output(output)

正常输出处理:同样只向需要的rank发送输出。这是性能优化——不需要的Worker无需序列化和传输结果。

2.4.14 setup_proc_title_and_log_prefix(第897-940行)
    @staticmethod
    def setup_proc_title_and_log_prefix(enable_ep: bool) -> None:
        if not model_parallel_is_initialized():
            set_process_title(name="Worker")
            decorate_logs("Worker")
            return

初始化前:并行组未初始化时使用默认标题。

        dp_size = get_dp_group().world_size
        dp_rank = get_dp_group().rank_in_group
        pp_size = get_pp_group().world_size
        pp_rank = get_pp_group().rank_in_group
        pcp_size = get_pcp_group().world_size
        pcp_rank = get_pcp_group().rank_in_group
        tp_size = get_tp_group().world_size
        tp_rank = get_tp_group().rank_in_group
        dcp_size = get_dcp_group().world_size
        dcp_rank = get_dcp_group().rank_in_group

查询各并行维度:获取每个维度的世界大小和当前rank。

        process_name = "Worker"
        if dp_size > 1:
            process_name += f"_DP{dp_rank}"
        if pp_size > 1:
            process_name += f"_PP{pp_rank}"
        if pcp_size > 1:
            process_name += f"_PCP{pcp_rank}"
        if tp_size > 1:
            process_name += f"_TP{tp_rank}"
        if dcp_size > 1:
            process_name += f"_DCP{dcp_rank}"
        if enable_ep:
            ep_rank = get_ep_group().rank_in_group
            process_name += f"_EP{ep_rank}"
        set_process_title(name=process_name)
        decorate_logs(process_name)

构建进程名称:按需追加各并行维度信息。例如 Worker_DP0_PP2_TP3

只显示大小>1的维度:避免 Worker_DP0_PP0_TP0_PCP0_DCP0_EP0 这种冗余名称。


2.5 通信机制详解

2.5.1 整体通信架构
Executor (父进程)                          Workers (子进程)
┌─────────────────┐                  ┌─────────────────────┐
│                 │  rpc_broadcast_mq │                     │
│  collective_rpc │ ────enqueue─────→ │  worker_busy_loop   │
│                 │   (SchedulerOutput)│   │dequeue          │
│                 │                   │   ▼                 │
│                 │                   │  func(*args,**kwargs)│
│                 │                   │   │                 │
│                 │  response_mqs[i]  │   ▼                 │
│  get_response   │ ←──dequeue────── │  handle_output      │
│                 │  (ModelRunnerOutput)│  │enqueue          │
└─────────────────┘                  └─────────────────────┘
2.5.2 消息队列(MessageQueue)详解

MessageQueue 是 vLLM 自研的共享内存广播机制(定义在 shm_broadcast.py 中),具有以下特性:

  1. 零拷贝广播:写入一次,多个读取者通过映射同一块共享内存读取
  2. 分块传输:大消息自动分块,每块大小不超过 max_chunk_bytes
  3. 读/写同步:通过原子变量和fence确保读写顺序
  4. 跨进程安全:使用POSIX共享内存,可被不同进程映射

Handle机制

# 创建端
mq = MessageQueue(num_writers, num_readers, ...)
handle = mq.export_handle()   # 序列化共享内存元信息

# 连接端
mq = MessageQueue.create_from_handle(handle, reader_rank)

Handle包含:

  • 共享内存文件路径
  • 数据区偏移量
  • 元数据区偏移量
  • 连接IP和端口(用于跨节点场景)
2.5.3 单节点通信流程
步骤1: Executor创建rpc_broadcast_mq并export_handle
步骤2: handle传递给Worker子进程
步骤3: Worker通过create_from_handle连接到同一共享内存
步骤4: Worker创建worker_response_mq并export_handle
步骤5: handle通过ready_pipe传回Executor
步骤6: Executor通过create_from_handle连接到Worker的响应队列
步骤7: 双方wait_until_ready完成握手
步骤8: 通信就绪
2.5.4 多节点通信流程
步骤1: Leader节点Executor创建rpc_broadcast_mq
步骤2: handle通过分布式通信组广播给所有节点
步骤3: Worker通过get_inner_dp_world_group().create_mq_broadcaster连接
步骤4: Worker创建响应队列并通过create_single_reader_mq_broadcasters暴露句柄
步骤5: 句柄通过ready_pipe传回Executor
步骤6: Executor通过peer_worker_response_mqs访问远程Worker的响应
步骤7: 握手完成
2.5.5 广播-响应模式的效率分析

优点

  • SchedulerOutput对所有Worker相同,一次写入多次读取
  • 共享内存避免序列化/反序列化开销
  • 只有output_rank的Worker需要回复,减少通信量

缺点

  • Worker必须同步消费广播消息(最慢的Worker决定整体吞吐)
  • 共享内存有固定容量,极端情况下可能溢出

2.6 生命周期管理

2.6.1 启动流程
1. MultiprocExecutor._init_executor()
   ├── 设置环境变量
   ├── 创建distributed_init_method
   ├── 创建rpc_broadcast_mq (leader节点)
   ├── 为每个local_rank创建Worker子进程
   │   └── WorkerProc.make_worker_process()
   │       ├── 创建ready_pipe和death_pipe
   │       ├── 启动子进程 (WorkerProc.worker_main)
   │       └── 返回UnreadyWorkerProcHandle
   ├── wait_for_ready()
   │   ├── 等待所有Worker发送READY
   │   ├── 从READY消息中提取响应队列句柄
   │   └── 返回WorkerProcHandle列表
   ├── 构建response_mqs列表
   ├── wait_until_ready() 所有消息队列
   └── _post_init_executor()

2. Worker子进程 (worker_main)
   ├── 设置信号处理器
   ├── 关闭继承的fd
   ├── 创建WorkerProc实例
   │   ├── WorkerWrapperBase初始化
   │   ├── init_device() (CUDA, NCCL, 并行组)
   │   ├── load_model()
   │   ├── 初始化消息队列
   │   └── 启动async_output线程(如果启用)
   ├── monitor_death_pipe()
   ├── 发送READY信号
   ├── wait_until_ready() 消息队列
   └── worker_busy_loop() 进入主循环
2.6.2 运行时流程
execute_model(scheduler_output) 调用
   │
   ▼
collective_rpc("execute_model", args=(scheduler_output,), ...)
   │
   ├── rpc_broadcast_mq.enqueue((method, args, kwargs, output_rank))
   │   └── 所有Worker从共享内存读取相同数据
   │
   ├── 创建FutureWrapper(futures_queue, get_response, aggregate)
   │
   ├── Worker端:
   │   ├── worker_busy_loop: dequeue → func(*args, **kwargs) → output
   │   ├── handle_output(output)
   │   │   ├── 同步模式: 直接enqueue_output
   │   │   └── 异步模式: put to async_output_queue
   │   │       └── async_output_busy_loop: get → enqueue_output
   │   └── worker_response_mq.enqueue((SUCCESS, output))
   │
   └── Executor端:
       ├── get_response(): response_mqs[output_rank].dequeue()
       ├── aggregate(responses) 如果有kv_output_aggregator
       └── future.set_result(response) → 返回给调用者
2.6.3 关闭流程
shutdown() 调用
   │
   ├── shutting_down = True (幂等保护)
   │
   ├── 关闭所有death_writer
   │   └── Worker子进程检测到EOF
   │       ├── death_pipe_monitor: shutdown_requested.set()
   │       ├── shutdown消息队列
   │       └── worker_busy_loop退出 (mq.dequeue返回错误/空)
   │
   ├── _ensure_worker_termination()
   │   ├── 等待4秒 (优雅退出)
   │   ├── SIGTERM + 等待4秒 (礼貌退出)
   │   └── SIGKILL (强制退出)
   │
   ├── 关闭worker_response_mq
   ├── 关闭rpc_broadcast_mq
   └── 关闭response_mqs
2.6.4 故障处理流程
Worker异常 → worker_busy_loop捕获
   │
   ├── logger.exception() 记录
   ├── handle_output(exception) → enqueue_output → (FAILURE, str(e))
   └── Executor端get_response() 检测到FAILURE → raise RuntimeError

Worker进程崩溃 → WorkerMonitor检测
   │
   ├── is_failed = True
   ├── logger.error()
   ├── shutdown()
   └── failure_callback() → 通知EngineCore

父进程退出 → Worker检测
   │
   ├── death_pipe EOFError
   ├── shutdown_requested.set()
   ├── shutdown消息队列 (唤醒busy_loop)
   └── Worker自行退出

2.7 set_multiprocessing_worker_envs(第942-972行)

def set_multiprocessing_worker_envs():
    _maybe_force_spawn()

强制spawn模式:GPU相关代码通常不支持fork(CUDA上下文不能在fork后重用),此函数检查并可能强制使用spawn。

    if not current_platform.is_cpu():
        default_omp_num_threads = 1
        if (
            "OMP_NUM_THREADS" not in os.environ
            and (current_parallelism := torch.get_num_threads())
            > default_omp_num_threads
        ):
            logger.warning_once(
                "Reducing Torch parallelism from %d threads to %d to avoid "
                "unnecessary CPU contention. Set OMP_NUM_THREADS in the "
                "external environment to tune this value as needed.",
                current_parallelism,
                default_omp_num_threads,
            )
            os.environ["OMP_NUM_THREADS"] = str(default_omp_num_threads)
            torch.set_num_threads(default_omp_num_threads)

OMP线程数控制

  • GPU模式下,每个Worker进程默认只使用1个OpenMP线程
  • 原因:多GPU × 多OMP线程 = CPU争用严重,导致性能下降
  • 容器环境下CPU限制更严格,争用问题更突出
  • 用户可通过 OMP_NUM_THREADS 环境变量覆盖

warning_once:避免每个Worker都打印相同警告。


三、Mermaid图表汇总

3.1 MultiprocExecutor 类图

futures_queue

workers

creates

converts to

from_unready_handle

«abstract»

Executor

+execute_model(scheduler_output, non_block)

+check_health()

+shutdown()

+register_failure_callback(callback)

MultiprocExecutor

+supports_pp: bool = True

-monitor_workers: bool

-is_failed: bool

-failure_callback: FailureCallback | None

-rpc_broadcast_mq: MessageQueue | None

-response_mqs: list<MessageQueue>

-futures_queue: deque<FutureWrapper>

-workers: list<WorkerProcHandle>

-shutting_down: bool

-_finalizer: weakref.finalize

+max_concurrent_batches: int

+_init_executor()

+execute_model(scheduler_output, non_block)

+collective_rpc(method, timeout, args, kwargs, non_block, unique_reply_rank, kv_output_aggregator)

+shutdown()

+check_health()

+start_worker_monitor(inline)

+register_failure_callback(callback)

-_get_parallel_sizes() : tuple

-_is_driver_worker(rank) : bool

-_ensure_worker_termination(worker_procs)

-_get_output_rank() : int

FutureWrapper

-futures_queue: deque<FutureWrapper>

-get_response: Callable

-aggregate: Callable

+result(timeout)

-_wait_for_response()

UnreadyWorkerProcHandle

+proc: BaseProcess

+rank: int

+ready_pipe: Connection

+death_writer: Connection | None

WorkerProcHandle

+proc: BaseProcess

+rank: int

+worker_response_mq: MessageQueue | None

+peer_worker_response_mqs: list<MessageQueue | None>

+death_writer: Connection | None

+from_unready_handle(unready_handle, worker_response_mq, peer_worker_response_mqs)

WorkerProc

+READY_STR: str = "READY"

-rank: int

-worker: WorkerWrapperBase

-rpc_broadcast_mq: MessageQueue | None

-worker_response_mq: MessageQueue | None

-use_async_scheduling: bool

-async_output_queue: Queue

+init(vllm_config, local_rank, rank, ...)

+make_worker_process() : UnreadyWorkerProcHandle

+wait_for_ready(unready_handles) : list<WorkerProcHandle>

+worker_main(*args, **kwargs)

+worker_busy_loop()

+enqueue_output(output)

+handle_output(output)

+async_output_busy_loop()

+monitor_death_pipe(death_pipe, shutdown_requested)

+shutdown()

+setup_proc_title_and_log_prefix(enable_ep)

-_init_message_queues(input_shm_handle, vllm_config)

Future

3.2 初始化时序图

Worker 1 (子进程) Worker 0 (子进程) Parent Process MultiprocExecutor Worker 1 (子进程) Worker 0 (子进程) Parent Process MultiprocExecutor init_device() → load_model() _init_message_queues() init_device() → load_model() _init_message_queues() 初始化完成,进入运行状态 _init_executor() set_multiprocessing_worker_envs() get_distributed_init_method(loopback_ip, open_port) MessageQueue(world_size, local_world_size) → rpc_broadcast_mq export_handle() → scheduler_output_handle make_worker_process(rank=0, handle=scheduler_output_handle) Pipe(ready) + Pipe(death) Process.start() → worker_main() ready_pipe.send({status: "READY", handle, peer_response_handles}) make_worker_process(rank=1, handle=scheduler_output_handle) Pipe(ready) + Pipe(death) Process.start() → worker_main() ready_pipe.send({status: "READY", handle, peer_response_handles}) wait_for_ready([W0_handle, W1_handle]) wait_for_response_handle_ready() → WorkerProcHandle 构建response_mqs列表 rpc_broadcast_mq.wait_until_ready() response_mqs[*].wait_until_ready()

3.3 collective_rpc 通信时序图

response_mq[output_rank] Worker 1 Worker 0 rpc_broadcast_mq (共享内存) MultiprocExecutor EngineCore response_mq[output_rank] Worker 1 Worker 0 rpc_broadcast_mq (共享内存) MultiprocExecutor EngineCore 序列化method(字符串) + args getattr(worker, "execute_model")(scheduler_output) getattr(worker, "execute_model")(scheduler_output) 不发送输出(优化) alt [output_rank == W0.rank] [W0.rank != output_rank] 不发送输出(优化) alt [output_rank == W1.rank] [W1.rank != output_rank] execute_model(scheduler_output) collective_rpc("execute_model", args=(scheduler_output,)) enqueue(("execute_model", (scheduler_output,), {}, output_rank)) dequeue() → (method, args, kwargs, output_rank) dequeue() → (method, args, kwargs, output_rank) enqueue((SUCCESS, model_output)) enqueue((SUCCESS, model_output)) dequeue() → (SUCCESS, model_output) ModelRunnerOutput

3.4 WorkerBusyLoop 状态机

worker_busy_loop 启动

dequeue(method, args, kwargs, output_rank)

method是字符串 → getattr(worker, method)

method是bytes → cloudpickle.loads + partial

正常返回 output

抛出异常

output_rank is None

self.rank == output_rank

rank不匹配,跳过

use_async_scheduling == True

use_async_scheduling == False

直接enqueue_output

async_output_queue.put(output)

async_output_busy_loop线程取出

isinstance AsyncModelRunnerOutput

output.get_output()

不是AsyncOutput

isinstance Exception

正常结果

enqueue((FAILURE, str(e)))

enqueue((SUCCESS, output))

output_rank is None 或 rank匹配

rank不匹配,跳过

handle_output(exception)

WaitingForRPC

MethodDispatch

阻塞等待共享内存广播

Executing

OutputHandling

ErrorHandling

ShouldReply

AsyncPath

SyncPath

EnqueueOutput

PutToQueue

CheckAsyncOutput

GetActualOutput

CheckException

FailureResponse

SuccessResponse

ShouldReplyError

3.5 关闭流程图

shutdown 被调用

shutting_down?

直接返回 - 幂等

shutting_down = True

关闭所有 death_writer

Worker子进程收到EOF

death_pipe_monitor

shutdown_requested.set

关闭消息队列 → 唤醒busy_loop

worker_busy_loop退出

_ensure_worker_termination

4秒内全部退出?

优雅关闭成功

SIGTERM + 4秒等待

4秒内全部退出?

SIGKILL 强制终止

关闭 worker_response_mq

关闭 rpc_broadcast_mq

关闭 response_mqs

清理完成

3.6 故障检测与传播图

Executor 父进程

Worker 子进程

共享内存

WorkerMonitor 线程

proc.sentinel 可读

shutting_down?

忽略 - 正在关闭

is_failed = True

shutdown

failure_callback

通知 EngineCore

worker_busy_loop 异常

handle_output exception

enqueue_output: FAILURE, str_e

worker_response_mq.enqueue

get_response: response_mq.dequeue

status == SUCCESS?

raise RuntimeError: Worker failed

返回正常结果

3.7 管道与消息队列拓扑图

渲染错误: Mermaid 渲染失败: Parse error on line 4: ... RMQ0[response_mq[0]
读取端] -----------------------^ Expecting 'SQE', 'DOUBLECIRCLEEND', 'PE', '-)', 'STADIUMEND', 'SUBROUTINEEND', 'PIPE', 'CYLINDEREND', 'DIAMOND_STOP', 'TAGEND', 'TRAPEND', 'INVTRAPEND', 'UNICODE_TEXT', 'TEXT', 'TAGSTART', got 'SQS'

3.8 FutureWrapper 队列处理时序图

response_mq FutureWrapper 2 FutureWrapper 1 futures_queue (deque) 调用2: execute_model 调用1: execute_model response_mq FutureWrapper 2 FutureWrapper 1 futures_queue (deque) 调用2: execute_model 调用1: execute_model [FW1] [FW2, FW1] [FW2] C1的result()也会得到response1 调用顺序保证: response1先于response2 appendleft(FW1) appendleft(FW2) result() (调用2先等待) pop() → FW1 _wait_for_response() dequeue() → response1 set_result(response1) pop() → FW2 _wait_for_response() dequeue() → response2 set_result(response2) response2

四、关键设计模式与权衡

4.1 共享内存广播 vs Socket/RPC

方面 共享内存广播 (vLLM) Socket/RPC (传统)
延迟 微秒级(内存映射) 毫秒级(网络栈)
拷贝 零拷贝 至少2次拷贝
序列化 仅metadata 整个SchedulerOutput
扩展性 单节点最优 天然跨节点
复杂度 共享内存管理复杂 成熟简单

vLLM的选择:单节点用共享内存,多节点用分布式通信组——兼顾了性能和扩展性。

4.2 有序响应 vs 无序响应

FutureWrapper 的有序设计是必要的:

  • 调度器发送的请求有严格顺序
  • 响应乱序会导致错误的请求-响应匹配
  • 例如:request1要采样A,request2要采样B,如果响应乱序可能把B的结果当A的

4.3 Daemon进程 + Death Pipe

Worker设为daemon进程但还使用death pipe的原因:

  • Daemon确保主进程退出时Worker也被终止(兜底)
  • Death pipe提供更早的退出检测——主进程还在但Executor已关闭时,Worker可以优雅退出
  • 不依赖daemon的粗暴终止,给Worker清理资源的机会

4.4 唯一回复优化

unique_reply_rank 的设计避免了不必要的通信:

  • TP组中只有rank=0的Worker需要返回采样结果
  • 其他Worker的输出只在TP组内部使用(通过NCCL all-reduce)
  • 节省了N-1个Worker的响应序列化和传输开销

4.5 弱引用的使用

多处使用 weakref

  1. _finalizer = weakref.finalize(self, self.shutdown):GC时自动清理
  2. self_ref = weakref.ref(self):monitor线程不阻止Executor被GC
  3. death_pipe_monitor 直接传递队列引用:避免通过self间接持有Executor引用

这些设计避免了循环引用和内存泄漏,特别是在长期运行的服务中。

4.6 Fork模式的FD管理

fork模式下文件描述符的继承是一个经典陷阱:

  • 子进程继承父进程所有打开的fd
  • 如果不关闭其他Worker的管道fd,管道的写端永远不会关闭
  • 读端无法收到EOF,导致阻塞等待

vLLM的解决方案:显式追踪所有已创建的fd,在新Worker中关闭不属于自己的fd。


五、总结

MultiprocExecutor 是 vLLM v1 多GPU推理的核心执行器,其设计体现了以下工程智慧:

  1. 性能优先的通信架构:共享内存广播实现零拷贝、低延迟的SchedulerOutput分发
  2. 分层抽象的状态管理UnreadyWorkerProcHandleWorkerProcHandle 的状态转换清晰表达了Worker生命周期
  3. 健壮的故障处理:三级进程终止策略、death pipe双向监控、failure callback传播链
  4. 灵活的并行支持:TP/PP/DP/PCP/EP/DCP多维度并行,通过rank计算和并行组查询统一管理
  5. 优雅的资源管理:weakref终结器、幂等shutdown、daemon+pipe双重保障
  6. 可扩展的RPC框架collective_rpc 支持字符串方法名和cloudpickle序列化的可调用对象,支持聚合和唯一回复模式

这个1037行的文件浓缩了多进程GPU推理的所有关键机制,是理解vLLM v1执行引擎的必经之路。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐