基于 Ray 的分布式 AI 训练与推理:从单机到集群的弹性扩展

cover

一、AI 训练的"单机天花板":模型太大,一张卡装不下

7B 参数的模型 FP16 精度需要 14GB 显存,训练时加上梯度和优化器状态,至少需要 56GB——单张 A100 80GB 勉强能跑,但 13B 及以上的模型就必须多卡并行。更关键的是,推理服务也需要弹性扩展——白天高峰需要 10 个推理副本,夜间低谷只需要 2 个,手动调整效率太低。

Ray 的核心价值是"用统一的编程模型实现从单机到集群的无缝扩展"。单机开发时用 Ray 本地模式,部署时切换到集群模式,代码无需修改。Ray 的 Actor 模型天然适合推理服务(有状态的长连接),Task 模型适合训练任务(无状态的并行计算)。

二、Ray 分布式架构

graph TB
    subgraph Ray 集群
        A[Head Node<br/>GCS + Dashboard] --> B[Worker Node 1<br/>GPU 0,1]
        A --> C[Worker Node 2<br/>GPU 2,3]
        A --> D[Worker Node 3<br/>CPU only]
    end

    subgraph 编程模型
        E[Remote Function<br/>无状态并行任务] --> F[训练数据并行]
        G[Actor<br/>有状态长连接] --> H[推理服务]
        I[Placement Group<br/>资源拓扑约束] --> J[GPU 亲和调度]
    end

    subgraph 弹性伸缩
        K[Autoscaler<br/>根据队列深度扩缩]
        K --> B
        K --> C
    end

Ray 集群由 Head Node(全局控制服务+Dashboard)和多个 Worker Node 组成。Remote Function 用于无状态的并行任务(数据并行训练),Actor 用于有状态的长连接服务(推理),Placement Group 用于 GPU 拓扑约束调度。

三、实现

3.1 分布式训练

import ray

# 初始化 Ray(本地模式或集群模式)
ray.init()

@ray.remote(num_gpus=1)
class TrainingWorker:
    """训练 Worker:每个 GPU 一个 Actor"""

    def __init__(self, model_config: dict):
        self.model_config = model_config
        self.model = None
        self.optimizer = None

    def setup(self):
        """初始化模型和优化器"""
        import torch
        # 实际场景中加载模型
        self.model = torch.nn.Linear(768, 768).cuda()
        self.optimizer = torch.optim.Adam(
            self.model.parameters(), lr=1e-4
        )

    def train_step(self, batch: dict) -> dict:
        """执行一步训练"""
        import torch
        self.model.train()
        inputs = torch.tensor(batch['input']).cuda()
        labels = torch.tensor(batch['label']).cuda()

        self.optimizer.zero_grad()
        outputs = self.model(inputs)
        loss = torch.nn.functional.mse_loss(outputs, labels)
        loss.backward()
        self.optimizer.step()

        return {'loss': loss.item()}

    def get_gradients(self) -> dict:
        """获取梯度(用于 AllReduce)"""
        import torch
        return {
            name: param.grad.clone()
            for name, param in self.model.named_parameters()
            if param.grad is not None
        }

    def apply_gradients(self, avg_grads: dict):
        """应用平均梯度"""
        import torch
        with torch.no_grad():
            for name, param in self.model.named_parameters():
                if name in avg_grads:
                    param.grad = avg_grads[name].to(param.device)


class DistributedTrainer:
    """分布式训练协调器"""

    def __init__(self, num_workers: int = 4):
        self.num_workers = num_workers
        self.workers = []

    def setup(self):
        """创建训练 Workers"""
        for _ in range(self.num_workers):
            worker = TrainingWorker.remote(model_config={})
            ray.get(worker.setup.remote())
            self.workers.append(worker)

    def train_epoch(
        self, data_shards: list, num_steps: int = 100
    ) -> list:
        """执行一轮训练"""
        losses = []

        for step in range(num_steps):
            # 并行执行前向+反向
            futures = []
            for i, worker in enumerate(self.workers):
                batch = data_shards[i % len(data_shards)]
                futures.append(
                    worker.train_step.remote(batch)
                )

            # 等待所有 Worker 完成
            step_results = ray.get(futures)
            step_loss = sum(
                r['loss'] for r in step_results
            ) / len(step_results)
            losses.append(step_loss)

            # AllReduce:收集梯度并平均
            grad_futures = [
                w.get_gradients.remote() for w in self.workers
            ]
            all_grads = ray.get(grad_futures)

            # 简化:平均梯度
            avg_grads = self._average_gradients(all_grads)

            # 应用平均梯度
            apply_futures = [
                w.apply_gradients.remote(avg_grads)
                for w in self.workers
            ]
            ray.get(apply_futures)

        return losses

    def _average_gradients(self, all_grads: list) -> dict:
        """平均梯度(简化版 AllReduce)"""
        import torch
        avg = {}
        for name in all_grads[0]:
            avg[name] = torch.stack(
                [g[name] for g in all_grads]
            ).mean(0)
        return avg

3.2 弹性推理服务

@ray.remote(num_gpus=0.5)
class InferenceActor:
    """推理 Actor:有状态的推理服务"""

    def __init__(self, model_path: str):
        self.model_path = model_path
        self.model = None
        self.request_count = 0

    def load_model(self):
        """加载模型"""
        import torch
        # 实际场景中加载模型
        self.model = torch.nn.Linear(768, 768).cuda()
        self.model.eval()

    def predict(self, input_data: dict) -> dict:
        """执行推理"""
        import torch
        with torch.no_grad():
            inputs = torch.tensor(
                input_data['features']
            ).cuda()
            outputs = self.model(inputs)
        self.request_count += 1
        return {
            'output': outputs.cpu().tolist(),
            'request_id': input_data.get('id'),
        }

    def get_load(self) -> int:
        """获取当前负载"""
        return self.request_count


class ElasticInferenceService:
    """弹性推理服务"""

    def __init__(
        self,
        model_path: str,
        min_replicas: int = 1,
        max_replicas: int = 8,
    ):
        self.model_path = model_path
        self.min_replicas = min_replicas
        self.max_replicas = max_replicas
        self.actors = []

    def start(self):
        """启动最小副本数"""
        for _ in range(self.min_replicas):
            actor = InferenceActor.remote(self.model_path)
            ray.get(actor.load_model.remote())
            self.actors.append(actor)

    def predict(self, input_data: dict) -> dict:
        """路由推理请求到负载最低的 Actor"""
        # 获取所有 Actor 的负载
        load_futures = [
            a.get_load.remote() for a in self.actors
        ]
        loads = ray.get(load_futures)

        # 选择负载最低的 Actor
        min_idx = loads.index(min(loads))
        return ray.get(
            self.actors[min_idx].predict.remote(input_data)
        )

    def scale(self, target_replicas: int):
        """调整副本数"""
        target = max(
            self.min_replicas,
            min(self.max_replicas, target_replicas)
        )
        current = len(self.actors)

        if target > current:
            # 扩容
            for _ in range(target - current):
                actor = InferenceActor.remote(self.model_path)
                ray.get(actor.load_model.remote())
                self.actors.append(actor)
        elif target < current:
            # 缩容
            removed = self.actors[target:]
            self.actors = self.actors[:target]
            for actor in removed:
                ray.kill(actor)

四、Ray 分布式方案的 Trade-offs 分析

Ray vs. Horovod/DeepSpeed:Horovod 和 DeepSpeed 是专门的分布式训练框架,对通信优化更深入(梯度压缩、混合精度 AllReduce)。Ray 的优势是通用性——训练和推理用同一套编程模型,部署和运维更简单。如果只需要训练,Horovod/DeepSpeed 更优;如果训练+推理+数据处理需要统一调度,Ray 更合适。

Actor 模型的内存开销:每个 Actor 是独立的进程,有自己的 Python 解释器和 GPU 上下文。大量小 Actor 会浪费 GPU 内存(每个 Actor 至少占用 1-2GB 上下文内存)。建议每个 GPU 运行 1-2 个 Actor,避免过度拆分。

Autoscaler 的延迟:Ray Autoscaler 从检测到队列积压到新 Worker 加入集群,约需 1-3 分钟(启动 EC2 实例+注册到集群)。对于突发流量,需要预热池或超额配置。

GCS 单点:Ray 的全局控制服务(GCS)运行在 Head Node 上,是单点。Head Node 故障会导致整个集群不可用。生产环境建议 Head Node 部署在高可用实例上,并定期备份 GCS 状态。

五、总结

Ray 的核心价值是"用统一的编程模型实现从单机到集群的无缝扩展"。Remote Function 用于无状态并行任务,Actor 用于有状态推理服务,Placement Group 用于 GPU 拓扑调度。单机开发和集群部署代码无需修改。

落地建议:先用 Ray 本地模式开发和调试,确认逻辑正确后切换到集群模式。训练任务用 Remote Function + 手动 AllReduce,推理服务用 Actor + 负载均衡路由。Autoscaler 配合预热池应对突发流量。Head Node 部署在高可用实例上。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐