基于 Ray 的分布式 AI 训练与推理:从单机到集群的弹性扩展
基于 Ray 的分布式 AI 训练与推理:从单机到集群的弹性扩展

一、AI 训练的"单机天花板":模型太大,一张卡装不下
7B 参数的模型 FP16 精度需要 14GB 显存,训练时加上梯度和优化器状态,至少需要 56GB——单张 A100 80GB 勉强能跑,但 13B 及以上的模型就必须多卡并行。更关键的是,推理服务也需要弹性扩展——白天高峰需要 10 个推理副本,夜间低谷只需要 2 个,手动调整效率太低。
Ray 的核心价值是"用统一的编程模型实现从单机到集群的无缝扩展"。单机开发时用 Ray 本地模式,部署时切换到集群模式,代码无需修改。Ray 的 Actor 模型天然适合推理服务(有状态的长连接),Task 模型适合训练任务(无状态的并行计算)。
二、Ray 分布式架构
graph TB
subgraph Ray 集群
A[Head Node<br/>GCS + Dashboard] --> B[Worker Node 1<br/>GPU 0,1]
A --> C[Worker Node 2<br/>GPU 2,3]
A --> D[Worker Node 3<br/>CPU only]
end
subgraph 编程模型
E[Remote Function<br/>无状态并行任务] --> F[训练数据并行]
G[Actor<br/>有状态长连接] --> H[推理服务]
I[Placement Group<br/>资源拓扑约束] --> J[GPU 亲和调度]
end
subgraph 弹性伸缩
K[Autoscaler<br/>根据队列深度扩缩]
K --> B
K --> C
end
Ray 集群由 Head Node(全局控制服务+Dashboard)和多个 Worker Node 组成。Remote Function 用于无状态的并行任务(数据并行训练),Actor 用于有状态的长连接服务(推理),Placement Group 用于 GPU 拓扑约束调度。
三、实现
3.1 分布式训练
import ray
# 初始化 Ray(本地模式或集群模式)
ray.init()
@ray.remote(num_gpus=1)
class TrainingWorker:
"""训练 Worker:每个 GPU 一个 Actor"""
def __init__(self, model_config: dict):
self.model_config = model_config
self.model = None
self.optimizer = None
def setup(self):
"""初始化模型和优化器"""
import torch
# 实际场景中加载模型
self.model = torch.nn.Linear(768, 768).cuda()
self.optimizer = torch.optim.Adam(
self.model.parameters(), lr=1e-4
)
def train_step(self, batch: dict) -> dict:
"""执行一步训练"""
import torch
self.model.train()
inputs = torch.tensor(batch['input']).cuda()
labels = torch.tensor(batch['label']).cuda()
self.optimizer.zero_grad()
outputs = self.model(inputs)
loss = torch.nn.functional.mse_loss(outputs, labels)
loss.backward()
self.optimizer.step()
return {'loss': loss.item()}
def get_gradients(self) -> dict:
"""获取梯度(用于 AllReduce)"""
import torch
return {
name: param.grad.clone()
for name, param in self.model.named_parameters()
if param.grad is not None
}
def apply_gradients(self, avg_grads: dict):
"""应用平均梯度"""
import torch
with torch.no_grad():
for name, param in self.model.named_parameters():
if name in avg_grads:
param.grad = avg_grads[name].to(param.device)
class DistributedTrainer:
"""分布式训练协调器"""
def __init__(self, num_workers: int = 4):
self.num_workers = num_workers
self.workers = []
def setup(self):
"""创建训练 Workers"""
for _ in range(self.num_workers):
worker = TrainingWorker.remote(model_config={})
ray.get(worker.setup.remote())
self.workers.append(worker)
def train_epoch(
self, data_shards: list, num_steps: int = 100
) -> list:
"""执行一轮训练"""
losses = []
for step in range(num_steps):
# 并行执行前向+反向
futures = []
for i, worker in enumerate(self.workers):
batch = data_shards[i % len(data_shards)]
futures.append(
worker.train_step.remote(batch)
)
# 等待所有 Worker 完成
step_results = ray.get(futures)
step_loss = sum(
r['loss'] for r in step_results
) / len(step_results)
losses.append(step_loss)
# AllReduce:收集梯度并平均
grad_futures = [
w.get_gradients.remote() for w in self.workers
]
all_grads = ray.get(grad_futures)
# 简化:平均梯度
avg_grads = self._average_gradients(all_grads)
# 应用平均梯度
apply_futures = [
w.apply_gradients.remote(avg_grads)
for w in self.workers
]
ray.get(apply_futures)
return losses
def _average_gradients(self, all_grads: list) -> dict:
"""平均梯度(简化版 AllReduce)"""
import torch
avg = {}
for name in all_grads[0]:
avg[name] = torch.stack(
[g[name] for g in all_grads]
).mean(0)
return avg
3.2 弹性推理服务
@ray.remote(num_gpus=0.5)
class InferenceActor:
"""推理 Actor:有状态的推理服务"""
def __init__(self, model_path: str):
self.model_path = model_path
self.model = None
self.request_count = 0
def load_model(self):
"""加载模型"""
import torch
# 实际场景中加载模型
self.model = torch.nn.Linear(768, 768).cuda()
self.model.eval()
def predict(self, input_data: dict) -> dict:
"""执行推理"""
import torch
with torch.no_grad():
inputs = torch.tensor(
input_data['features']
).cuda()
outputs = self.model(inputs)
self.request_count += 1
return {
'output': outputs.cpu().tolist(),
'request_id': input_data.get('id'),
}
def get_load(self) -> int:
"""获取当前负载"""
return self.request_count
class ElasticInferenceService:
"""弹性推理服务"""
def __init__(
self,
model_path: str,
min_replicas: int = 1,
max_replicas: int = 8,
):
self.model_path = model_path
self.min_replicas = min_replicas
self.max_replicas = max_replicas
self.actors = []
def start(self):
"""启动最小副本数"""
for _ in range(self.min_replicas):
actor = InferenceActor.remote(self.model_path)
ray.get(actor.load_model.remote())
self.actors.append(actor)
def predict(self, input_data: dict) -> dict:
"""路由推理请求到负载最低的 Actor"""
# 获取所有 Actor 的负载
load_futures = [
a.get_load.remote() for a in self.actors
]
loads = ray.get(load_futures)
# 选择负载最低的 Actor
min_idx = loads.index(min(loads))
return ray.get(
self.actors[min_idx].predict.remote(input_data)
)
def scale(self, target_replicas: int):
"""调整副本数"""
target = max(
self.min_replicas,
min(self.max_replicas, target_replicas)
)
current = len(self.actors)
if target > current:
# 扩容
for _ in range(target - current):
actor = InferenceActor.remote(self.model_path)
ray.get(actor.load_model.remote())
self.actors.append(actor)
elif target < current:
# 缩容
removed = self.actors[target:]
self.actors = self.actors[:target]
for actor in removed:
ray.kill(actor)
四、Ray 分布式方案的 Trade-offs 分析
Ray vs. Horovod/DeepSpeed:Horovod 和 DeepSpeed 是专门的分布式训练框架,对通信优化更深入(梯度压缩、混合精度 AllReduce)。Ray 的优势是通用性——训练和推理用同一套编程模型,部署和运维更简单。如果只需要训练,Horovod/DeepSpeed 更优;如果训练+推理+数据处理需要统一调度,Ray 更合适。
Actor 模型的内存开销:每个 Actor 是独立的进程,有自己的 Python 解释器和 GPU 上下文。大量小 Actor 会浪费 GPU 内存(每个 Actor 至少占用 1-2GB 上下文内存)。建议每个 GPU 运行 1-2 个 Actor,避免过度拆分。
Autoscaler 的延迟:Ray Autoscaler 从检测到队列积压到新 Worker 加入集群,约需 1-3 分钟(启动 EC2 实例+注册到集群)。对于突发流量,需要预热池或超额配置。
GCS 单点:Ray 的全局控制服务(GCS)运行在 Head Node 上,是单点。Head Node 故障会导致整个集群不可用。生产环境建议 Head Node 部署在高可用实例上,并定期备份 GCS 状态。
五、总结
Ray 的核心价值是"用统一的编程模型实现从单机到集群的无缝扩展"。Remote Function 用于无状态并行任务,Actor 用于有状态推理服务,Placement Group 用于 GPU 拓扑调度。单机开发和集群部署代码无需修改。
落地建议:先用 Ray 本地模式开发和调试,确认逻辑正确后切换到集群模式。训练任务用 Remote Function + 手动 AllReduce,推理服务用 Actor + 负载均衡路由。Autoscaler 配合预热池应对突发流量。Head Node 部署在高可用实例上。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)