大模型推理引擎vLLM(20):MOE层相关代码流程梳理_权重重排、量化、GEMM计算、EP、vllm serve命令的解析过程
文章目录
- 1 vllm/model_executor/layers/fused_moe/layer.py
- 2 vllm/model_executor/layers/fused_moe/fused_moe.py
- 3 vllm/model_executor/layers/fused_moe/modular_kernel.py
- 4 疑问:fused_moe.py和modular_kernel.py,走哪个分支,具体是在哪里代码控制的
- 5 maybe_init_modular_kernel是在哪里被调用的
- 6 vllm/model_executor/layers/quantization/compressed_tensors/compressed_tensors_moe_marlin.py文件的作用
- 7 疑问:layer.py里面的apply怎么一步步到modular_kernel.py里面的推理函数的
- 参考文献
2026年五一假期,闲着没事,之前关于moe那块代码的一些流程,有几个疑问,正好梳理下,然后简单整理下笔记。
1 vllm/model_executor/layers/fused_moe/layer.py
首先,这个文件就是moe层相关的东西,这个文件就是负责MOE层的,
2 vllm/model_executor/layers/fused_moe/fused_moe.py
moe前向推理文件,里面包含了moe前向推理需要的一些核函数,但是注意如果开了ep,那么moe前向推理就不走这个文件里面的fused_experts_impl函数了,只有在一下一些情况,moe才走这个文件做前向推理
- 没有 在 maybe_init_modular_kernel 里把 quant_method 换成 FusedMoEModularMethod;
- 很多 单卡、只开 TP、不开 EP / 不用 all2all MoE kernel 的配置,会一直用这种 单体 fused_experts;
- 某些 Marlin / compressed tensors 分支本身就是 fused_moe_forward → fused_experts,不经过 modular。
3 vllm/model_executor/layers/fused_moe/modular_kernel.py
这个文件里面包含了moe需要的dispatch—gemm1 激活 gemm2 ---- combine,如果开了ep,那么moe就走这个文件了。
4 疑问:fused_moe.py和modular_kernel.py,走哪个分支,具体是在哪里代码控制的
vllm/model_executor/layers/fused_moe/layer.py里面有个maybe_init_modular_kernel
# Note: maybe_init_modular_kernel should only be called by
# prepare_communication_buffer_for_model.
# This is called after all weight loading and post-processing, so it
# should be safe to swap out the quant_method.
def maybe_init_modular_kernel(self) -> None:
# If this layer is configured for Marlin W16A16 path, we intentionally
# keep the monolithic execution route so runtime can dispatch to
# fused_experts_impl_w16a16_marlin when weights are packed.
if getattr(self, "_marlin_w16a16_moe_enabled", False):
return
self.ensure_moe_quant_config_init()
# routing_tables only needed for round-robin expert placement with
# DeepEP all2all backend.
routing_tables = self._maybe_init_expert_routing_tables()
prepare_finalize = self.quant_method.maybe_make_prepare_finalize(
routing_tables=routing_tables
)
if prepare_finalize is not None:
logger.debug(
"%s for %s(%s)", prepare_finalize.__class__.__name__, self, id(self)
)
self.quant_method = FusedMoEModularMethod.make(
self, self.quant_method, prepare_finalize, self.shared_experts
)
4.1 self.quant_method.maybe_make_prepare_finalize(
这里面首先第一个,self.quant_method.maybe_make_prepare_finalize(
def maybe_make_prepare_finalize(
self,
routing_tables: tuple[torch.Tensor, torch.Tensor, torch.Tensor] | None = None,
) -> FusedMoEPrepareAndFinalize | None:
from .all2all_utils import maybe_make_prepare_finalize
return maybe_make_prepare_finalize(
self.moe, self.moe_quant_config, routing_tables
)
然后在vllm/model_executor/layers/fused_moe/all2all_utils.py里面
def maybe_make_prepare_finalize(
moe: FusedMoEConfig,
quant_config: FusedMoEQuantConfig | None,
routing_tables: tuple[torch.Tensor, torch.Tensor, torch.Tensor] | None = None,
) -> FusedMoEPrepareAndFinalize | None:
if not moe.moe_parallel_config.use_all2all_kernels:
return None
all2all_manager = get_ep_group().device_communicator.all2all_manager
assert all2all_manager is not None
prepare_finalize: FusedMoEPrepareAndFinalize | None = None
# TODO(rob): update this as part of the MoE refactor.
assert not moe.use_flashinfer_cutlass_kernels, (
"Must be created in modelopt.py or fp8.py"
)
if moe.use_pplx_kernels:
assert quant_config is not None
hidden_dim_bytes, hidden_scale_bytes = pplx_hidden_dim_scale_bytes(
moe.max_num_tokens,
moe.hidden_dim,
moe.in_dtype,
quant_config.quant_dtype,
per_act_token_quant=quant_config.per_act_token_quant,
block_shape=quant_config.block_shape,
)
all_to_all_args = dict(
max_num_tokens=moe.max_num_tokens,
num_experts=moe.num_experts,
experts_per_token=moe.experts_per_token, # topk
rank=all2all_manager.rank,
world_size=all2all_manager.world_size,
# dp_size actually means tp_size, bug in pplx kernels
dp_size=all2all_manager.tp_group.world_size,
hidden_dim=moe.hidden_dim,
hidden_dim_bytes=hidden_dim_bytes,
hidden_dim_scale_bytes=hidden_scale_bytes,
)
num_dispatchers = (
all2all_manager.world_size // all2all_manager.tp_group.world_size
)
# Intranode pplx a2a takes a group name while internode does not.
if not all2all_manager.internode:
all_to_all_args["group_name"] = all2all_manager.cpu_group.group_name
handle = all2all_manager.get_handle(all_to_all_args)
prepare_finalize = PplxPrepareAndFinalize(
handle,
max_num_tokens=moe.max_num_tokens,
num_local_experts=moe.num_local_experts,
num_dispatchers=num_dispatchers,
)
elif moe.use_deepep_ht_kernels:
assert moe.dp_size == all2all_manager.dp_world_size
all_to_all_args = dict()
handle = all2all_manager.get_handle(all_to_all_args)
prepare_finalize = DeepEPHTPrepareAndFinalize(
handle,
num_dispatchers=all2all_manager.world_size,
dp_size=all2all_manager.dp_world_size,
rank_expert_offset=all2all_manager.rank * moe.num_local_experts,
)
elif moe.use_deepep_ll_kernels:
assert quant_config is not None
global_to_physical = physical_to_global = local_expert_global_ids = None
if routing_tables is not None:
(
global_to_physical,
physical_to_global,
local_expert_global_ids,
) = routing_tables
all_to_all_args = dict(
max_num_tokens_per_dp_rank=moe.max_num_tokens,
token_hidden_size=moe.hidden_dim,
num_ep_ranks=all2all_manager.world_size,
num_global_experts=moe.num_experts,
num_local_experts=moe.num_experts // all2all_manager.world_size,
)
handle = all2all_manager.get_handle(all_to_all_args)
# Note: We may want to use FP8 dispatch just to reduce
# data movement.
use_fp8_dispatch = quant_config.quant_dtype == current_platform.fp8_dtype()
use_int8_dispatch = quant_config.quant_dtype == torch.int8
prepare_finalize = DeepEPLLPrepareAndFinalize(
handle,
max_tokens_per_rank=moe.max_num_tokens,
num_dispatchers=all2all_manager.world_size,
use_fp8_dispatch=use_fp8_dispatch,
global_to_physical=global_to_physical,
physical_to_global=physical_to_global,
local_expert_global_ids=local_expert_global_ids,
use_int8_dispatch=use_int8_dispatch,
)
elif moe.use_mori_kernels:
assert quant_config is not None
# Note: We may want to use FP8 dispatch just to reduce
# data movement.
use_fp8_dispatch = (
quant_config.is_per_act_token or quant_config.is_block_quantized
)
# For PTPC (per token per channel) quant, the scale dim for each token is 1
# For 1x128 quant, the scale dim for each token is hidden_dim // 128
scale_dim = 1 if quant_config.is_per_act_token else moe.hidden_dim // 128
all_to_all_args = dict(
rank=all2all_manager.rank,
num_ep_ranks=all2all_manager.world_size,
quant_dtype=quant_config.quant_dtype,
token_hidden_size=moe.hidden_dim,
scale_dim=scale_dim,
scale_type_size=torch.float32.itemsize,
max_num_tokens_per_dp_rank=moe.max_num_tokens,
input_dtype=moe.in_dtype,
num_local_experts=moe.num_experts // all2all_manager.world_size,
num_experts_per_token=moe.experts_per_token,
)
handle = all2all_manager.get_handle(all_to_all_args)
prepare_finalize = MoriPrepareAndFinalize(
handle,
max_tokens_per_rank=moe.max_num_tokens,
num_dispatchers=all2all_manager.world_size,
use_fp8_dispatch=use_fp8_dispatch,
)
return prepare_finalize
这个函数开头就是if not moe.moe_parallel_config.use_all2all_kernels:,那么返回None,
use_all2all_kernels 为假 → prepare_finalize 为 None → 不替换为 FusedMoEModularMethod → 不走 FusedMoEModularKernel;MoE 前向仍由 原 quant_method 完成,往往仍会用 fused_moe.fused_experts 等单体路径
4.2 self.quant_method = FusedMoEModularMethod.make(
self.quant_method = FusedMoEModularMethod.make(
self, self.quant_method, prepare_finalize, self.shared_experts
)
这个是在vllm/model_executor/layers/fused_moe/fused_moe_modular_method.py里面,
@staticmethod
def make(
moe_layer: torch.nn.Module,
old_quant_method: FusedMoEMethodBase,
prepare_finalize: FusedMoEPrepareAndFinalize,
shared_experts: torch.nn.Module | None,
) -> "FusedMoEModularMethod":
return FusedMoEModularMethod(
old_quant_method,
FusedMoEModularKernel(
prepare_finalize,
old_quant_method.select_gemm_impl(prepare_finalize, moe_layer),
shared_experts,
moe_parallel_config=moe_layer.moe_parallel_config,
N=old_quant_method.N if hasattr(old_quant_method, "N") else -1,
K=old_quant_method.K if hasattr(old_quant_method, "K") else -1,
),
)
那么这里就到了vllm/model_executor/layers/fused_moe/modular_kernel.py那里了。
5 maybe_init_modular_kernel是在哪里被调用的
在vllm/model_executor/layers/fused_moe/layer.py里面的maybe_init_modular_kernel前面有下面的注释
# Note: maybe_init_modular_kernel should only be called by
# prepare_communication_buffer_for_model.
# This is called after all weight loading and post-processing, so it
# should be safe to swap out the quant_method.
def maybe_init_modular_kernel(self) -> None:
也就是说他是被prepare_communication_buffer_for_model调用的,
在vllm/distributed/device_communicators/base_device_communicator.py的
def prepare_communication_buffer_for_model(self, model: torch.nn.Module) -> None:
"""
Prepare the communication buffer for the model.
"""
if not self.is_ep_communicator:
return
moe_modules = [
module
for module in model.modules()
# TODO(bnell): Should use isinstance but can't. Maybe search for
# presence of quant_method.maybe_init_modular_kernel?
if (
module.__class__.__name__ == "FusedMoE"
or module.__class__.__name__ == "SharedFusedMoE"
)
]
for module in moe_modules:
module.maybe_init_modular_kernel()
这是在给模型准备通信buffer,如果开了EP,那么就会走到这个函数,然后执行这里
5.1 疑问:只有开了EP才会调用这个函数
那我要是没开ep专家并行,那么这个函数不会执行,那么moe层的前向计算怎么计算的,你不是说只有开了EP才执行这个函数,然后这个函数里面就是前面一层层分析的会调用到maybe_make_prepare_finalize,然后里面判断alltoall是none那么就走普通的fusedmoe前向推理,如果这个不是none,那么就会走@vllm/model_executor/layers/fused_moe/modular_kernel.py 的前向推理吗,
解答:
5.2 prepare_communication_buffer_for_model是在哪里被调用的
vllm/v1/worker/gpu_model_runner.py里面有个load_model
prepare_communication_buffer_for_model(self.model)
if (drafter := getattr(self, "drafter", None)) and (
drafter_model := getattr(drafter, "model", None)
):
prepare_communication_buffer_for_model(drafter_model)
mm_config = self.model_config.multimodal_config
self.is_multimodal_pruning_enabled = (
supports_multimodal_pruning(self.get_model())
and mm_config is not None
and mm_config.is_multimodal_pruning_enabled()
然后这个prepare_communication_buffer_for_model到了vllm/distributed/parallel_state.py里面的
def prepare_communication_buffer_for_model(model: torch.nn.Module):
"""Prepare the communication buffer for the model.
Traditional communication libraries like NCCL are almost
model agnostic. However, emerging new communication libraries like
MoE all2all (DeepEP) usually allocate the communication buffer
based on the model shape for optimal performance.
"""
if _TP is not None:
_TP.prepare_communication_buffer_for_model(model)
if _PCP is not None:
_PCP.prepare_communication_buffer_for_model(model)
if _PP is not None:
_PP.prepare_communication_buffer_for_model(model)
if _DP is not None:
_DP.prepare_communication_buffer_for_model(model)
if _EP is not None:
_EP.prepare_communication_buffer_for_model(model)
进而也就到了vllm/distributed/device_communicators/base_device_communicator.py里面的prepare_communication_buffer_for_model了。
6 vllm/model_executor/layers/quantization/compressed_tensors/compressed_tensors_moe_marlin.py文件的作用
6.1 疑问:get_moe_method在哪里被调用的
class CompressedTensorsMarlinMoEMethod(FusedMoEMethodBase):
def __init_(self, moe: FusedMoEConfig):
super().__init__(moe)
@staticmethod
def get_moe_method(
quant_config: "SlimQuantCompressedTensorsMarlinConfig", # type: ignore # noqa E501
layer: torch.nn.Module,
) -> "CompressedTensorsMarlinMoEMethod":
# are supported + check if the layer is being ignored.
weight_quant = quant_config.target_scheme_map["Linear"].get("weights")
input_quant = quant_config.target_scheme_map["Linear"].get(
"input_activations")
if quant_config._is_fp8_w8a8(weight_quant, input_quant):
return CompressedTensorsW8A8FP8MarlinMoEMethod(quant_config, layer.moe_config)
elif quant_config._is_dynamic_token_w8a8(weight_quant, input_quant):
return CompressedTensorsW8A8Int8MarlinMoEMethod(quant_config, layer.moe_config)
else:
raise RuntimeError(
f"Slimquant_marlin does not support the FusedMoe scheme: {weight_quant}, {input_quant}")
他是在vllm/model_executor/layers/quantization/compressed_tensors/compressed_tensors_marlin.py中被调用的
if isinstance(layer, FusedMoE):
return CompressedTensorsMarlinMoEMethod.get_moe_method(self, layer)
return None
那么再往上是哪里被调用的,在layer.py里面,
def _get_quant_method() -> FusedMoEMethodBase:
"""
Helper method to ensure self.quant_method is never None and
of the proper type.
"""
quant_method = None
if self.quant_config is not None:
quant_method = self.quant_config.get_quant_method(self, prefix)
if quant_method is None:
quant_method = UnquantizedFusedMoEMethod(self.moe_config)
assert isinstance(quant_method, FusedMoEMethodBase)
return quant_method
# Note: get_quant_method will look at the layer's local_num_experts
# for heuristic purposes, so it must be initialized first.
self.quant_method: FusedMoEMethodBase = _get_quant_method()
然后这个是在init函数里面,定义了_get_quant_method之后,接着就调用了一次。
6.2 疑问:是怎么到了CompressedTensorsMarlinMoEMethod类的
前面分析get_moe_method的时候,有个问题,就是这里self.quant_config.get_quant_method的quant_config是怎么和CompressedTensorsMarlinMoEMethod类联系到 一起的,从而调用到CompressedTensorsMarlinMoEMethod类里面的get_quant_method函数的,下面开始一层层找一下。
当我们执行类似下面的命令
vllm serve /models/GLM-5-W8A8 \
--disable-log-requests \
-q slimquant_marlin \
--trust-remote-code \
-dp 8 \
-tp 1 \
--enable-expert-parallel \
--disable-custom-all-reduce \
--dtype bfloat16 \
--enable-chunked-prefill \
--max-model-len 50000 \
--max-num-batched-tokens 128 \
--max-num-seqs 32 \
--enable-prefix-caching \
--block-size 64 \
--gpu-memory-utilization 0.88 \
--kv-cache-dtype fp8_ds_mla \
-cc '{"inductor_compile_config":{"combo_kernels": false}}' \
--speculative_config '{"method":"mtp","num_speculative_tokens":3, "quantization": "slimquant_marlin"}'
那么首先会去到vllm/entrypoints/cli/serve.py这个文件的这个函数
class ServeSubcommand(CLISubcommand):
"""The `serve` subcommand for the vLLM CLI."""
name = "serve"
@staticmethod
def cmd(args: argparse.Namespace) -> None:
# If model is specified in CLI (as positional arg), it takes precedence
if hasattr(args, "model_tag") and args.model_tag is not None:
args.model = args.model_tag
进而到了cmd函数里面的这一行uvloop.run(run_server(args)),那么就会去到vllm/entrypoints/openai/api_server.py里面的
async def run_server(args, **uvicorn_kwargs) -> None:
"""Run a single-worker API server."""
# Add process-specific prefix to stdout and stderr.
decorate_logs("APIServer")
listen_address, sock = setup_server(args)
await run_server_worker(listen_address, sock, args, **uvicorn_kwargs)
这个run_server_worker也在同一个文件中
async def run_server_worker(
listen_address, sock, args, client_config=None, **uvicorn_kwargs
) -> None:
"""Run a single API server worker."""
if args.tool_parser_plugin and len(args.tool_parser_plugin) > 3:
ToolParserManager.import_tool_parser(args.tool_parser_plugin)
if args.reasoning_parser_plugin and len(args.reasoning_parser_plugin) > 3:
ReasoningParserManager.import_reasoning_parser(args.reasoning_parser_plugin)
# Load logging config for uvicorn if specified
log_config = load_log_config(args.log_config_file)
if log_config is not None:
uvicorn_kwargs["log_config"] = log_config
async with build_async_engine_client(
args,
client_config=client_config,
) as engine_client:
...其它代码...
然后这里面调用了build_async_engine_client也在同一个文件中
@asynccontextmanager
async def build_async_engine_client(
args: Namespace,
*,
usage_context: UsageContext = UsageContext.OPENAI_API_SERVER,
disable_frontend_multiprocessing: bool | None = None,
client_config: dict[str, Any] | None = None,
) -> AsyncIterator[EngineClient]:
...其他代码...
async with build_async_engine_client_from_engine_args(
engine_args,
usage_context=usage_context,
disable_frontend_multiprocessing=disable_frontend_multiprocessing,
client_config=client_config,
) as engine:
yield engine
然后还是在当前vllm/entrypoints/openai/api_server.py文件里面
@asynccontextmanager
async def build_async_engine_client_from_engine_args(
engine_args: AsyncEngineArgs,
*,
usage_context: UsageContext = UsageContext.OPENAI_API_SERVER,
disable_frontend_multiprocessing: bool = False,
client_config: dict[str, Any] | None = None,
) -> AsyncIterator[EngineClient]:
"""
Create EngineClient, either:
- in-process using the AsyncLLMEngine Directly
- multiprocess using AsyncLLMEngine RPC
Returns the Client or None if the creation failed.
"""
# Create the EngineConfig (determines if we can use V1).
vllm_config = engine_args.create_engine_config(usage_context=usage_context)
...其他代码...
try:
async_llm = AsyncLLM.from_vllm_config(
vllm_config=vllm_config,
usage_context=usage_context,
enable_log_requests=engine_args.enable_log_requests,
aggregate_engine_logging=engine_args.aggregate_engine_logging,
disable_log_stats=engine_args.disable_log_stats,
client_addresses=client_config,
client_count=client_count,
client_index=client_index,
)
这个build_async_engine_client_from_engine_args函数里面有两个重要的函数,分开看。
6.2.1 engine_args.create_engine_config
然后这里调用的是engine_args.create_engine_config(usage_context=usage_context)
那么这个create_engine_config去了vllm/engine/arg_utils.py文件的create_engine_config函数,这个 函数前面就是处理各种config,处理完之后调用了
config = VllmConfig(
model_config=model_config,
cache_config=cache_config,
parallel_config=parallel_config,
scheduler_config=scheduler_config,
device_config=device_config,
load_config=load_config,
attention_config=attention_config,
lora_config=lora_config,
speculative_config=speculative_config,
structured_outputs_config=self.structured_outputs_config,
observability_config=observability_config,
compilation_config=compilation_config,
kv_transfer_config=self.kv_transfer_config,
kv_events_config=self.kv_events_config,
ec_transfer_config=self.ec_transfer_config,
profiler_config=self.profiler_config,
additional_config=self.additional_config,
optimization_level=self.optimization_level,
)
那么到了vllm/config/vllm.py文件里面VllmConfig类的构造函数,会调用到这个类的__post_init__函数,
def __post_init__(self):
"""Verify configs are valid & consistent with each other."""
# To give each torch profile run a unique instance name.
self.instance_id = f"{time.time_ns()}"
self.try_verify_and_update_config()
if self.model_config is not None:
self.model_config.verify_with_parallel_config(self.parallel_config)
self.model_config.verify_dual_chunk_attention_config(self.load_config)
self.parallel_config.is_moe_model = self.model_config.is_moe
self.cache_config.verify_with_parallel_config(self.parallel_config)
if self.lora_config is not None:
self.lora_config.verify_with_model_config(self.model_config)
if self.quant_config is None and self.model_config is not None:
self.quant_config = VllmConfig._get_quantization_config(
self.model_config, self.load_config
)
然后进而调用到VllmConfig._get_quantization_config(函数,
@staticmethod
def _get_quantization_config(
model_config: ModelConfig, load_config: LoadConfig
) -> QuantizationConfig | None:
"""Get the quantization config."""
from vllm.platforms import current_platform
if model_config.quantization is not None:
from vllm.model_executor.model_loader.weight_utils import get_quant_config
quant_config = get_quant_config(model_config, load_config)
那么进而vllm/model_executor/model_loader/weight_utils.py里面的
# TODO(woosuk): Move this to other place.
def get_quant_config(
model_config: ModelConfig, load_config: LoadConfig
) -> QuantizationConfig:
quant_cls = get_quantization_config(model_config.quantization)
...其他代码...
然后去了vllm/model_executor/layers/quantization/init.py
def get_quantization_config(quantization: str) -> type[QuantizationConfig]:
if quantization not in QUANTIZATION_METHODS:
raise ValueError(f"Invalid quantization method: {quantization}")
...其他代码...
"slimquant_marlin":SlimQuantCompressedTensorsMarlinConfig,
"slimquant_compressed_tensors_marlin":SlimQuantCompressedTensorsMarlinConfig,
}
这样就把slimquant_marlin和类SlimQuantCompressedTensorsMarlinConfig关联到一起了。
6.2.2 AsyncLLM.from_vllm_config(
这个是去了vllm/v1/engine/async_llm.py里面的
@classmethod
def from_vllm_config(
cls,
vllm_config: VllmConfig,
start_engine_loop: bool = True,
usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
stat_loggers: list[StatLoggerFactory] | None = None,
enable_log_requests: bool = False,
aggregate_engine_logging: bool = False,
disable_log_stats: bool = False,
client_addresses: dict[str, str] | None = None,
client_count: int = 1,
client_index: int = 0,
) -> "AsyncLLM":
# Create the LLMEngine.
return cls(
vllm_config=vllm_config,
executor_class=Executor.get_class(vllm_config),
start_engine_loop=start_engine_loop,
stat_loggers=stat_loggers,
log_requests=enable_log_requests,
log_stats=not disable_log_stats,
aggregate_engine_logging=aggregate_engine_logging,
usage_context=usage_context,
client_addresses=client_addresses,
client_count=client_count,
client_index=client_index,
)
这里的函数from_vllm_config是一个 @classmethod,那么这里的cls就是这个类,那么这就相当于构造函数,会去调用到init。
从 vllm serve 到 CompressedTensorsMarlinMoEMethod.get_moe_method(函数级)
| 顺序 | 文件 | 函数 / 要点 |
|---|---|---|
| 1 | vllm/entrypoints/cli/serve.py |
ServeSubcommand.cmd → run_server(args) |
| 2 | vllm/entrypoints/openai/api_server.py |
run_server → run_server_worker |
| 3 | vllm/entrypoints/openai/api_server.py |
build_async_engine_client → build_async_engine_client_from_engine_args |
| 4 | vllm/entrypoints/openai/api_server.py |
engine_args.create_engine_config(...) |
| 5 | vllm/engine/arg_utils.py |
EngineArgs.create_engine_config → VllmConfig(...) |
| 6 | vllm/config/vllm.py |
VllmConfig.__post_init__ → 必要时 _get_quantization_config → get_quant_config |
| 7 | vllm/model_executor/model_loader/weight_utils.py |
get_quant_config → SlimQuantCompressedTensorsMarlinConfig.from_config(等) |
| 8 | vllm/v1/worker/gpu_model_runner.py |
load_model → 按 model_config 架构实例化模型 |
| 9 | 例:vllm/model_executor/models/glm4_moe.py |
Glm4MoeForCausalLM / Glm4MoeModel → 各层 SharedFusedMoE(..., quant_config=vllm_config.quant_config) |
| 10 | vllm/model_executor/layers/fused_moe/layer.py |
FusedMoE.__init__ → _get_quant_method → quant_config.get_quant_method(self, prefix) |
| 11 | vllm/model_executor/layers/quantization/compressed_tensors/compressed_tensors_marlin.py |
SlimQuantCompressedTensorsMarlinConfig.get_quant_method → CompressedTensorsMarlinMoEMethod.get_moe_method |
| 12 | vllm/model_executor/layers/quantization/compressed_tensors/compressed_tensors_moe_marlin.py |
CompressedTensorsMarlinMoEMethod.get_moe_method → 返回具体 CompressedTensorsW8A8*MarlinMoEMethod 实例 |
6.3疑问:layer.py里面self.quant_method.apply(进一步去了哪里
疑问:vllm/model_executor/layers/fused_moe/layer.py有self.quant_method.apply,这就是moe层真正的推理函数,那么这里进一步去了哪里,去了vllm/model_executor/layers/fused_moe/fused_moe.py里面,还是去了vllm/model_executor/layers/quantization/compressed_tensors/compressed_tensors_moe_marlin.py里面。
解答:
-
FusedMoEModularMethod(maybe_init_modular_kernel 把 quant_method 换成 modular 之后)
→ 进入 fused_moe_modular_method.py 的 FusedMoEModularMethod.apply
→ 调用 FusedMoEModularKernel(modular_kernel.py)里的 prepare → 专家 GEMM → finalize 编排;一般不经过 fused_moe.py 里的 fused_experts 函数。 -
CompressedTensorsW8A8FP8MarlinMoEMethod / CompressedTensorsW8A8Int8MarlinMoEMethod(以及同一文件里 CompressedTensorsMarlinMoEMethod 工厂选出来的 Marlin MoE 量化类)
→ 进入 compressed_tensors_moe_marlin.py 里对应类的 apply 或 fused_moe_forward(FP8 常见是 apply → self.fused_experts(即 fused_moe_forward))
→ 再在类内分支到 aiter_moe、fused_moe.fused_experts、fused_experts_impl_fp8_marlin / fused_experts_impl_int8_marlin(lmslim)等之一。 -
其它 FusedMoEMethodBase 子类(例如 UnquantizedFusedMoEMethod、fp8.py 里的 Fp8MoEMethod、quark_moe / moe_wna16 等各自文件)
→ 进入各自模块里的 apply(或 apply_monolithic)
→ 由实现内部再决定是否调用 fused_moe.fused_experts、Marlin CUDA、或其它 kernel;不统一进 compressed_tensors_moe_marlin.py。
6.4 疑问:def apply(函数是做什么的
疑问:vllm/model_executor/layers/quantization/compressed_tensors/compressed_tensors_moe_marlin.py 这里面的apply函数就是真正执行moe计算的,如果是marlin量化,那么moe计算就走这个文件里面的apply是不是,如果是其他量化方式,那么就走其他的,我理解的对吗,
解答:
6.5 疑问:select_gemm_impl是在哪里被调用的,怎么到的modular_kernel.py
在layer.py前面分析过不是有个,
def maybe_init_modular_kernel(self) -> None:
# If this layer is configured for Marlin W16A16 path, we intentionally
# keep the monolithic execution route so runtime can dispatch to
# fused_experts_impl_w16a16_marlin when weights are packed.
if getattr(self, "_marlin_w16a16_moe_enabled", False):
return
self.ensure_moe_quant_config_init()
# routing_tables only needed for round-robin expert placement with
# DeepEP all2all backend.
routing_tables = self._maybe_init_expert_routing_tables()
prepare_finalize = self.quant_method.maybe_make_prepare_finalize(
routing_tables=routing_tables
)
if prepare_finalize is not None:
logger.debug(
"%s for %s(%s)", prepare_finalize.__class__.__name__, self, id(self)
)
self.quant_method = FusedMoEModularMethod.make(
self, self.quant_method, prepare_finalize, self.shared_experts
)
然后这里的
self.quant_method = FusedMoEModularMethod.make(
self, self.quant_method, prepare_finalize, self.shared_experts
)
这里的quant_method我们用的不就是marlin那个吗,
def make(
moe_layer: torch.nn.Module,
old_quant_method: FusedMoEMethodBase,
prepare_finalize: FusedMoEPrepareAndFinalize,
shared_experts: torch.nn.Module | None,
) -> "FusedMoEModularMethod":
return FusedMoEModularMethod(
old_quant_method,
FusedMoEModularKernel(
prepare_finalize,
old_quant_method.select_gemm_impl(prepare_finalize, moe_layer),
shared_experts,
moe_parallel_config=moe_layer.moe_parallel_config,
N=old_quant_method.N if hasattr(old_quant_method, "N") else -1,
K=old_quant_method.K if hasattr(old_quant_method, "K") else -1,
),
)
self.quant_method传给了参数old_quant_method,那么这里这不就是用了old_quant_method.select_gemm_impl(prepare_finalize, moe_layer),吗,就是在这里被调用的,
6.6 疑问:process_weights_after_loading是在哪里被调用的
vllm/model_executor/model_loader/utils.py这里面有
for _, module in model.named_modules():
quant_method = getattr(module, "quant_method", None)
if isinstance(quant_method, QuantizeMethodBase):
# When quant methods need to process weights after loading
# (for repacking, quantizing, etc), they expect parameters
# to be on the global target device. This scope is for the
# case where cpu offloading is used, where we will move the
# parameters onto device for processing and back off after.
with device_loading_context(module, target_device):
quant_method.process_weights_after_loading(module)
相当于模型加载完之后会对权重进行重排,跟modular_kernel.py没关系,
和你前面梳理的路径怎么对齐
没换成 FusedMoEModularMethod:这里的 apply 会直接调 fused_moe.fused_experts 或 lmslim 的 fused_experts_impl_*_marlin,或 aiter_moe(由 VLLM_USE_AITER_MOE_W8A8 等决定)。
换成了 modular:select_gemm_impl 决定 BatchedDeepGemmExperts / DeepGemmExperts,和 fused_moe.py 里那个 fused_experts 函数 可以是两条并行世界线。
7 疑问:layer.py里面的apply怎么一步步到modular_kernel.py里面的推理函数的
我突然想起来一个问题,layer.py里面调用vllm/model_executor/layers/fused_moe/layer.py有self.quant_method.apply,这就是moe层真正的推理函数,然后如果是@vllm/model_executor/layers/fused_moe/modular_kernel.py 那么会去@vllm/model_executor/layers/fused_moe/modular_kernel.py ,可是你看@layer.py (770-772) 这里的量化quant_method不还是marlin那个吗,也没换成modular 呀,那么怎么去到@vllm/model_executor/layers/fused_moe/modular_kernel.py 的apply的,
看了下代码
if prepare_finalize is not None:
logger.debug(
"%s for %s(%s)", prepare_finalize.__class__.__name__, self, id(self)
)
self.quant_method = FusedMoEModularMethod.make(
self, self.quant_method, prepare_finalize, self.shared_experts
)
这里这一行执行完之后就赋值给了self.quant_method,这里self.quant_method就是FusedMoEModularMethod了,然后layer.py的apply就会到FusedMoEModularMethod类里面的apply函数,
def apply(
self,
layer: "FusedMoE", # type: ignore[name-defined] # noqa: F821
x: torch.Tensor,
topk_weights: torch.Tensor,
topk_ids: torch.Tensor,
use_nn_moe: bool | None = False,
shared_output: torch.Tensor | None = None,
routed_scaling_factor: float = 1.0,
) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor]:
return self.fused_experts(
hidden_states=x,
w1=layer.w13_weight,
w2=layer.w2_weight,
topk_weights=topk_weights,
topk_ids=topk_ids,
inplace=self.allow_inplace,
activation=layer.activation,
global_num_experts=layer.global_num_experts,
apply_router_weight_on_input=layer.apply_router_weight_on_input,
expert_map=None if self.disable_expert_map else layer.expert_map,
use_nn_moe=use_nn_moe,
shared_output=shared_output,
routed_scaling_factor=routed_scaling_factor,
)
而从下面vllm/model_executor/layers/fused_moe/fused_moe_modular_method.py代码能看出来,这里的self.fused_experts就是experts
# --8<-- [start:modular_fused_moe]
@CustomOp.register("modular_fused_moe")
class FusedMoEModularMethod(FusedMoEMethodBase, CustomOp):
# --8<-- [end:modular_fused_moe]
def __init__(
self, old_quant_method: FusedMoEMethodBase, experts: FusedMoEModularKernel
):
super().__init__(old_quant_method.moe)
self.moe_quant_config = old_quant_method.moe_quant_config
self.fused_experts = experts
self.disable_expert_map = getattr(
old_quant_method,
"disable_expert_map",
not self.fused_experts.supports_expert_map(),
)
self.old_quant_method = old_quant_method
assert not self.old_quant_method.is_monolithic
logger.debug("Swapping out %s", self.old_quant_method.__class__.__name__)
而这个experts不就是构造FusedMoEModularMethod时的FusedMoEModularKernel类吗,
def make(
moe_layer: torch.nn.Module,
old_quant_method: FusedMoEMethodBase,
prepare_finalize: FusedMoEPrepareAndFinalize,
shared_experts: torch.nn.Module | None,
) -> "FusedMoEModularMethod":
return FusedMoEModularMethod(
old_quant_method,
FusedMoEModularKernel(
prepare_finalize,
old_quant_method.select_gemm_impl(prepare_finalize, moe_layer),
shared_experts,
moe_parallel_config=moe_layer.moe_parallel_config,
N=old_quant_method.N if hasattr(old_quant_method, "N") else -1,
K=old_quant_method.K if hasattr(old_quant_method, "K") else -1,
),
)
那么FusedMoEModularMethod类的apply函数就是相当于调用了FusedMoEModularKernel,而这个类继承的torch.nn.Module,那么对这个类调用,就是调用了类里面的forward函数了。
参考文献
大模型推理引擎vLLM(17): vllm/vllm/model_executor/layers/fused_moe/modular_kernel.py代码阅读笔记
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)