1. 进程管理的关键设计

在 Hermes RL Training 中,进程管理是系统稳定运行的核心保障。三个独立进程(run-api、Trainer、Environment)需要协调启动、监控运行状态、优雅关闭,任何一个环节出现问题都可能导致训练失败或资源泄漏。本章将深入剖析 Hermes 的进程管理设计,包括启动序列、状态监控、优雅关闭、故障恢复等关键机制。

1.1 进程管理的核心挑战

1.1.1 为什么需要复杂的进程管理?

RL 训练是一个长时间运行的任务(通常数小时甚至数天),涉及多个相互依赖的进程:

挑战 说明 后果
启动依赖 API 必须先启动,Trainer 才能连接 顺序错误导致连接失败
启动时间 各进程初始化时间不同(5秒~90秒) 过早访问导致错误
进程崩溃 任何一个进程退出,训练都应停止 孤立进程浪费资源
资源泄漏 进程异常退出时未清理子进程 GPU 内存泄漏、端口占用
状态同步 需要实时知道训练是否在进行中 无法准确监控进度
1.1.2 Hermes 的解决方案概览
┌─────────────────────────────────────────────────────────────────────┐
│                    进程管理架构                                      │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────┐     ┌─────────────────┐     ┌─────────────┐   │
│  │   RunState      │     │  Async Monitor  │     │  Log Files  │   │
│  │   (状态容器)     │◀───▶│  (后台监控)      │     │ (独立日志)   │   │
│  │                 │     │                 │     │             │   │
│  │ • api_process   │     │ • 30秒轮询      │     │ • api.log   │   │
│  │ • trainer_proc  │     │ • 进程存活检查  │     │ • train.log │   │
│  │ • env_process   │     │ • 异常检测      │     │ • env.log   │   │
│  │ • status        │     │ • 自动清理      │     │             │   │
│  └─────────────────┘     └─────────────────┘     └─────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

该架构围绕三个核心组件构建:RunState(状态容器) 负责存储三个子进程(api_process、trainer_proc、env_process)的句柄与运行状态;Async Monitor(后台监控) 以30秒为周期轮询检查各进程的存活状态,检测异常并执行自动清理;Log Files(独立日志) 为每个进程保留独立的日志文件(api.log、train.log、env.log),便于问题追踪与调试。其中,RunState与Async Monitor之间通过双向交互实现状态同步与异常响应,三者共同形成了一套轻量级但可靠的进程生命周期管理机制,确保了多进程训练任务的稳定运行与可观测性。

1.2. RunState:进程状态的中央容器

1.2.1 数据结构定义

RunState 是一个 dataclass,用于存储单个训练运行的完整状态:

from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import subprocess

@dataclass
class RunState:
    """State for a training run."""
    # 基本信息
    run_id: str                          # 唯一标识符 (UUID)
    environment: str                     # 环境名称
    config: Dict[str, Any]               # 训练配置
    
    # 状态信息
    status: str = "pending"              # pending, starting, running, stopping, stopped, completed, failed
    error_message: str = ""              # 错误信息
    
    # WandB 信息
    wandb_project: str = ""
    wandb_run_name: str = ""
    
    # 时间信息
    start_time: float = 0.0              # 启动时间戳
    
    # 进程句柄(核心)
    api_process: Optional[subprocess.Popen] = None
    trainer_process: Optional[subprocess.Popen] = None
    env_process: Optional[subprocess.Popen] = None
    
    # 日志文件句柄(用于关闭)
    api_log_file: Optional[object] = None
    trainer_log_file: Optional[object] = None
    env_log_file: Optional[object] = None
1.2.2 全局状态管理
# 全局状态存储
_active_runs: Dict[str, RunState] = {}           # run_id -> RunState
_last_status_check: Dict[str, float] = {}        # run_id -> 上次检查时间

# 状态检查频率限制(30分钟)
MIN_STATUS_CHECK_INTERVAL = 30 * 60
1.2.3 设计意图
设计选择 原因
Dataclass 类型安全、自动 __init__、清晰字段定义
Optional[Popen] 进程可能未启动或已终止
独立日志句柄 确保进程终止时能正确关闭文件,避免资源泄漏
全局字典 支持多训练并行,通过 run_id 隔离

1.3 启动序列:有序的依赖启动

1.3.1 启动流程图
┌─────────────────────────────────────────────────────────────────────┐
│                     进程启动序列                                     │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Step 1: 启动 API 服务器                                             │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  命令: run-api                                               │   │
│  │  等待: 5 秒                                                  │   │
│  │  检查: process.poll() is None?                               │   │
│  │                                                              │   │
│  │  ❌ 如果已退出: status = "failed", 返回错误                   │   │
│  │  ✅ 如果运行中: 继续下一步                                    │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                              │                                      │
│                              ▼                                      │
│  Step 2: 启动 Trainer                                               │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  命令: python launch_training.py --config <config>           │   │
│  │  等待: 30 秒                                                 │   │
│  │  检查: process.poll() is None?                               │   │
│  │                                                              │   │
│  │  ❌ 如果已退出: status = "failed", 停止 API, 返回错误         │   │
│  │  ✅ 如果运行中: 继续下一步                                    │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                              │                                      │
│                              ▼                                      │
│  Step 3: 启动 Environment                                           │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  命令: python <env>.py serve --config <config>               │   │
│  │  等待: 90 秒                                                 │   │
│  │  检查: process.poll() is None?                               │   │
│  │                                                              │   │
│  │  ❌ 如果已退出: status = "failed", 停止所有进程, 返回错误     │   │
│  │  ✅ 如果运行中: status = "running", 启动后台监控              │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
1.3.2 源码实现详解

async def _spawn_training_run(run_state: RunState, config_path: Path):
    """
    启动训练的三个进程,按依赖顺序启动,每步都有健康检查。
    """
    run_id = run_state.run_id
    _ensure_logs_dir()  # 确保日志目录存在

    # ==================== Step 1: 启动 API 服务器 ====================
    logger.info("[%s] Starting Atropos API server (run-api)...", run_id)
    
    api_log = LOGS_DIR / f"api_{run_id}.log"
    api_log_file = open(api_log, "w")  # 打开日志文件
    run_state.api_log_file = api_log_file  # 保存句柄用于后续关闭
    
    run_state.api_process = subprocess.Popen(
        ["run-api"],
        stdout=api_log_file,           # 重定向 stdout 到日志
        stderr=subprocess.STDOUT,       # stderr 合并到 stdout
        cwd=str(TINKER_ATROPOS_ROOT),  # 工作目录
    )
    
    # 等待 API 初始化
    await asyncio.sleep(5)
    
    # 健康检查:API 是否正常启动?
    if run_state.api_process.poll() is not None:
        # poll() 返回退出码表示进程已结束
        exit_code = run_state.api_process.returncode
        run_state.status = "failed"
        run_state.error_message = f"API server exited with code {exit_code}. Check {api_log}"
        _stop_training_run(run_state)  # 清理已启动的资源
        return
    
    logger.info("[%s] Atropos API server started", run_id)

    # ==================== Step 2: 启动 Trainer ====================
    logger.info("[%s] Starting Tinker trainer...", run_id)
    
    trainer_log = LOGS_DIR / f"trainer_{run_id}.log"
    trainer_log_file = open(trainer_log, "w")
    run_state.trainer_log_file = trainer_log_file
    
    run_state.trainer_process = subprocess.Popen(
        [sys.executable, "launch_training.py", "--config", str(config_path)],
        stdout=trainer_log_file,
        stderr=subprocess.STDOUT,
        cwd=str(TINKER_ATROPOS_ROOT),
        env={**os.environ, "TINKER_API_KEY": os.getenv("TINKER_API_KEY", "")},
    )
    
    # Trainer 需要更长时间初始化(启动 SGLang 推理服务器)
    logger.info("[%s] Waiting 30 seconds for trainer to initialize...", run_id)
    await asyncio.sleep(30)
    
    if run_state.trainer_process.poll() is not None:
        exit_code = run_state.trainer_process.returncode
        run_state.status = "failed"
        run_state.error_message = f"Trainer exited with code {exit_code}. Check {trainer_log}"
        _stop_training_run(run_state)  # 会停止 API 和 Trainer
        return
    
    logger.info("[%s] Trainer started, inference server on port 8001", run_id)

    # ==================== Step 3: 启动 Environment ====================
    logger.info("[%s] Waiting 90 seconds before starting environment...", run_id)
    await asyncio.sleep(90)  # Environment 需要等待 Trainer 完全就绪
    
    # 查找环境文件
    env_info = None
    for env in _environments:
        if env.name == run_state.environment:
            env_info = env
            break
    
    if not env_info:
        run_state.status = "failed"
        run_state.error_message = f"Environment '{run_state.environment}' not found"
        _stop_training_run(run_state)
        return
    
    logger.info("[%s] Starting environment: %s serve", run_id, env_info.file_path)
    
    env_log = LOGS_DIR / f"env_{run_id}.log"
    env_log_file = open(env_log, "w")
    run_state.env_log_file = env_log_file
    
    run_state.env_process = subprocess.Popen(
        [sys.executable, str(env_info.file_path), "serve", "--config", str(config_path)],
        stdout=env_log_file,
        stderr=subprocess.STDOUT,
        cwd=str(TINKER_ATROPOS_ROOT),
    )
    
    # 等待 Environment 连接
    await asyncio.sleep(10)
    
    if run_state.env_process.poll() is not None:
        exit_code = run_state.env_process.returncode
        run_state.status = "failed"
        run_state.error_message = f"Environment exited with code {exit_code}. Check {env_log}"
        _stop_training_run(run_state)  # 停止所有三个进程
        return
    
    # ==================== 启动成功 ====================
    run_state.status = "running"
    run_state.start_time = time.time()
    logger.info("[%s] Training run started successfully!", run_id)
    
    # 启动后台监控任务
    asyncio.create_task(_monitor_training_run(run_state))
1.3.3 等待时间的设计考量
进程 等待时间 原因
API 5 秒 轻量级服务,快速启动
Trainer 30 秒 需要加载模型权重、启动 SGLang 推理服务器
Environment 90 秒 需要等待 Trainer 完全就绪,建立连接

为什么不使用健康检查 API 而是固定等待?

  • 简化实现,避免复杂的轮询逻辑
  • 训练任务通常运行数小时,几十秒的启动时间可以接受
  • 日志文件可以在失败后查看具体原因

1.4 后台监控:实时状态检测

1.4.1 监控机制
async def _monitor_training_run(run_state: RunState):
    """
    后台任务:持续监控训练运行状态。
    每 30 秒检查一次进程是否存活。
    """
    while run_state.status == "running":
        await asyncio.sleep(30)  # 30 秒轮询间隔
        
        # 检查 Environment 进程
        if run_state.env_process and run_state.env_process.poll() is not None:
            exit_code = run_state.env_process.returncode
            if exit_code == 0:
                run_state.status = "completed"  # 正常完成
            else:
                run_state.status = "failed"     # 异常退出
                run_state.error_message = f"Environment process exited with code {exit_code}"
            _stop_training_run(run_state)
            break
        
        # 检查 Trainer 进程
        if run_state.trainer_process and run_state.trainer_process.poll() is not None:
            exit_code = run_state.trainer_process.returncode
            if exit_code == 0:
                run_state.status = "completed"
            else:
                run_state.status = "failed"
                run_state.error_message = f"Trainer process exited with code {exit_code}"
            _stop_training_run(run_state)
            break
        
        # 检查 API 进程
        if run_state.api_process and run_state.api_process.poll() is not None:
            run_state.status = "failed"
            run_state.error_message = "API server exited unexpectedly"
            _stop_training_run(run_state)
            break
1.4.2 监控设计要点
设计 说明
异步任务 asyncio.create_task() 创建独立协程,不阻塞主流程
30 秒间隔 平衡实时性和系统开销
poll() 检查 非阻塞检查进程状态,None 表示仍在运行
exit_code 判断 0 表示正常完成,非 0 表示异常
自动清理 任一进程退出,自动停止其他进程

1.5 优雅关闭:逆序终止与资源清理

1.5.1 关闭顺序的重要性
启动顺序: API → Trainer → Environment
关闭顺序: Environment → Trainer → API (逆序)

原因:
- Environment 依赖 Trainer 的推理服务
- Trainer 依赖 API 的状态同步
- 逆序关闭避免连接错误和日志异常
1.5.2 源码实现
def _stop_training_run(run_state: RunState):
    """
    停止训练运行的所有进程。
    按逆序停止:env -> trainer -> api
    """
    logger.info("[%s] Stopping training run...", run_state.run_id)
    
    # ==================== Step 1: 停止 Environment ====================
    if run_state.env_process and run_state.env_process.poll() is None:
        logger.info("[%s] Stopping environment process...", run_state.run_id)
        run_state.env_process.terminate()  # 发送 SIGTERM
        try:
            # 等待进程优雅退出,超时 10 秒
            run_state.env_process.wait(timeout=10)
        except subprocess.TimeoutExpired:
            # 超时后强制杀死
            logger.warning("[%s] Environment did not terminate gracefully, killing...", run_state.run_id)
            run_state.env_process.kill()
    
    # ==================== Step 2: 停止 Trainer ====================
    if run_state.trainer_process and run_state.trainer_process.poll() is None:
        logger.info("[%s] Stopping trainer process...", run_state.run_id)
        run_state.trainer_process.terminate()
        try:
            run_state.trainer_process.wait(timeout=10)
        except subprocess.TimeoutExpired:
            logger.warning("[%s] Trainer did not terminate gracefully, killing...", run_state.run_id)
            run_state.trainer_process.kill()
    
    # ==================== Step 3: 停止 API ====================
    if run_state.api_process and run_state.api_process.poll() is None:
        logger.info("[%s] Stopping API server...", run_state.run_id)
        run_state.api_process.terminate()
        try:
            run_state.api_process.wait(timeout=10)
        except subprocess.TimeoutExpired:
            logger.warning("[%s] API server did not terminate gracefully, killing...", run_state.run_id)
            run_state.api_process.kill()
    
    # 更新状态
    if run_state.status == "running":
        run_state.status = "stopped"
    
    # ==================== 清理日志文件句柄 ====================
    for attr in ("env_log_file", "trainer_log_file", "api_log_file"):
        fh = getattr(run_state, attr, None)
        if fh is not None:
            try:
                fh.close()
            except Exception:
                pass
            setattr(run_state, attr, None)
    
    logger.info("[%s] Training run stopped", run_state.run_id)
1.5.3 优雅关闭的策略
步骤 操作 超时 备用方案
1 terminate() (SIGTERM) 10 秒 kill() (SIGKILL)
2 wait() 等待进程结束 - -
3 关闭日志文件句柄 - 忽略异常

为什么需要两步关闭?

  • terminate() (SIGTERM) 给进程机会保存状态、清理资源
  • kill() (SIGKILL) 强制结束,用于进程无响应的情况
  • 避免直接 kill() 导致的数据丢失或资源泄漏

1.6 优雅启动(带等待时间)

# Step 1: 启动 API,等待 5 秒
await asyncio.sleep(5)
if run_state.api_process.poll() is not None:
    run_state.status = "failed"
    return

# Step 2: 启动 Trainer,等待 30 秒
await asyncio.sleep(30)
if run_state.trainer_process.poll() is not None:
    run_state.status = "failed"
    return

# Step 3: 启动 Environment,等待 90 秒
await asyncio.sleep(90)

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐