《把 Hermes Agent 养成你的专属帕鲁:进程管理的关键设计》(六)
·
1. 进程管理的关键设计
在 Hermes RL Training 中,进程管理是系统稳定运行的核心保障。三个独立进程(run-api、Trainer、Environment)需要协调启动、监控运行状态、优雅关闭,任何一个环节出现问题都可能导致训练失败或资源泄漏。本章将深入剖析 Hermes 的进程管理设计,包括启动序列、状态监控、优雅关闭、故障恢复等关键机制。
1.1 进程管理的核心挑战
1.1.1 为什么需要复杂的进程管理?
RL 训练是一个长时间运行的任务(通常数小时甚至数天),涉及多个相互依赖的进程:
| 挑战 | 说明 | 后果 |
|---|---|---|
| 启动依赖 | API 必须先启动,Trainer 才能连接 | 顺序错误导致连接失败 |
| 启动时间 | 各进程初始化时间不同(5秒~90秒) | 过早访问导致错误 |
| 进程崩溃 | 任何一个进程退出,训练都应停止 | 孤立进程浪费资源 |
| 资源泄漏 | 进程异常退出时未清理子进程 | GPU 内存泄漏、端口占用 |
| 状态同步 | 需要实时知道训练是否在进行中 | 无法准确监控进度 |
1.1.2 Hermes 的解决方案概览
┌─────────────────────────────────────────────────────────────────────┐
│ 进程管理架构 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────┐ │
│ │ RunState │ │ Async Monitor │ │ Log Files │ │
│ │ (状态容器) │◀───▶│ (后台监控) │ │ (独立日志) │ │
│ │ │ │ │ │ │ │
│ │ • api_process │ │ • 30秒轮询 │ │ • api.log │ │
│ │ • trainer_proc │ │ • 进程存活检查 │ │ • train.log │ │
│ │ • env_process │ │ • 异常检测 │ │ • env.log │ │
│ │ • status │ │ • 自动清理 │ │ │ │
│ └─────────────────┘ └─────────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
该架构围绕三个核心组件构建:RunState(状态容器) 负责存储三个子进程(api_process、trainer_proc、env_process)的句柄与运行状态;Async Monitor(后台监控) 以30秒为周期轮询检查各进程的存活状态,检测异常并执行自动清理;Log Files(独立日志) 为每个进程保留独立的日志文件(api.log、train.log、env.log),便于问题追踪与调试。其中,RunState与Async Monitor之间通过双向交互实现状态同步与异常响应,三者共同形成了一套轻量级但可靠的进程生命周期管理机制,确保了多进程训练任务的稳定运行与可观测性。
1.2. RunState:进程状态的中央容器
1.2.1 数据结构定义
RunState 是一个 dataclass,用于存储单个训练运行的完整状态:
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import subprocess
@dataclass
class RunState:
"""State for a training run."""
# 基本信息
run_id: str # 唯一标识符 (UUID)
environment: str # 环境名称
config: Dict[str, Any] # 训练配置
# 状态信息
status: str = "pending" # pending, starting, running, stopping, stopped, completed, failed
error_message: str = "" # 错误信息
# WandB 信息
wandb_project: str = ""
wandb_run_name: str = ""
# 时间信息
start_time: float = 0.0 # 启动时间戳
# 进程句柄(核心)
api_process: Optional[subprocess.Popen] = None
trainer_process: Optional[subprocess.Popen] = None
env_process: Optional[subprocess.Popen] = None
# 日志文件句柄(用于关闭)
api_log_file: Optional[object] = None
trainer_log_file: Optional[object] = None
env_log_file: Optional[object] = None
1.2.2 全局状态管理
# 全局状态存储
_active_runs: Dict[str, RunState] = {} # run_id -> RunState
_last_status_check: Dict[str, float] = {} # run_id -> 上次检查时间
# 状态检查频率限制(30分钟)
MIN_STATUS_CHECK_INTERVAL = 30 * 60
1.2.3 设计意图
| 设计选择 | 原因 |
|---|---|
| Dataclass | 类型安全、自动 __init__、清晰字段定义 |
| Optional[Popen] | 进程可能未启动或已终止 |
| 独立日志句柄 | 确保进程终止时能正确关闭文件,避免资源泄漏 |
| 全局字典 | 支持多训练并行,通过 run_id 隔离 |
1.3 启动序列:有序的依赖启动
1.3.1 启动流程图
┌─────────────────────────────────────────────────────────────────────┐
│ 进程启动序列 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Step 1: 启动 API 服务器 │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 命令: run-api │ │
│ │ 等待: 5 秒 │ │
│ │ 检查: process.poll() is None? │ │
│ │ │ │
│ │ ❌ 如果已退出: status = "failed", 返回错误 │ │
│ │ ✅ 如果运行中: 继续下一步 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Step 2: 启动 Trainer │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 命令: python launch_training.py --config <config> │ │
│ │ 等待: 30 秒 │ │
│ │ 检查: process.poll() is None? │ │
│ │ │ │
│ │ ❌ 如果已退出: status = "failed", 停止 API, 返回错误 │ │
│ │ ✅ 如果运行中: 继续下一步 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Step 3: 启动 Environment │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 命令: python <env>.py serve --config <config> │ │
│ │ 等待: 90 秒 │ │
│ │ 检查: process.poll() is None? │ │
│ │ │ │
│ │ ❌ 如果已退出: status = "failed", 停止所有进程, 返回错误 │ │
│ │ ✅ 如果运行中: status = "running", 启动后台监控 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
1.3.2 源码实现详解
async def _spawn_training_run(run_state: RunState, config_path: Path):
"""
启动训练的三个进程,按依赖顺序启动,每步都有健康检查。
"""
run_id = run_state.run_id
_ensure_logs_dir() # 确保日志目录存在
# ==================== Step 1: 启动 API 服务器 ====================
logger.info("[%s] Starting Atropos API server (run-api)...", run_id)
api_log = LOGS_DIR / f"api_{run_id}.log"
api_log_file = open(api_log, "w") # 打开日志文件
run_state.api_log_file = api_log_file # 保存句柄用于后续关闭
run_state.api_process = subprocess.Popen(
["run-api"],
stdout=api_log_file, # 重定向 stdout 到日志
stderr=subprocess.STDOUT, # stderr 合并到 stdout
cwd=str(TINKER_ATROPOS_ROOT), # 工作目录
)
# 等待 API 初始化
await asyncio.sleep(5)
# 健康检查:API 是否正常启动?
if run_state.api_process.poll() is not None:
# poll() 返回退出码表示进程已结束
exit_code = run_state.api_process.returncode
run_state.status = "failed"
run_state.error_message = f"API server exited with code {exit_code}. Check {api_log}"
_stop_training_run(run_state) # 清理已启动的资源
return
logger.info("[%s] Atropos API server started", run_id)
# ==================== Step 2: 启动 Trainer ====================
logger.info("[%s] Starting Tinker trainer...", run_id)
trainer_log = LOGS_DIR / f"trainer_{run_id}.log"
trainer_log_file = open(trainer_log, "w")
run_state.trainer_log_file = trainer_log_file
run_state.trainer_process = subprocess.Popen(
[sys.executable, "launch_training.py", "--config", str(config_path)],
stdout=trainer_log_file,
stderr=subprocess.STDOUT,
cwd=str(TINKER_ATROPOS_ROOT),
env={**os.environ, "TINKER_API_KEY": os.getenv("TINKER_API_KEY", "")},
)
# Trainer 需要更长时间初始化(启动 SGLang 推理服务器)
logger.info("[%s] Waiting 30 seconds for trainer to initialize...", run_id)
await asyncio.sleep(30)
if run_state.trainer_process.poll() is not None:
exit_code = run_state.trainer_process.returncode
run_state.status = "failed"
run_state.error_message = f"Trainer exited with code {exit_code}. Check {trainer_log}"
_stop_training_run(run_state) # 会停止 API 和 Trainer
return
logger.info("[%s] Trainer started, inference server on port 8001", run_id)
# ==================== Step 3: 启动 Environment ====================
logger.info("[%s] Waiting 90 seconds before starting environment...", run_id)
await asyncio.sleep(90) # Environment 需要等待 Trainer 完全就绪
# 查找环境文件
env_info = None
for env in _environments:
if env.name == run_state.environment:
env_info = env
break
if not env_info:
run_state.status = "failed"
run_state.error_message = f"Environment '{run_state.environment}' not found"
_stop_training_run(run_state)
return
logger.info("[%s] Starting environment: %s serve", run_id, env_info.file_path)
env_log = LOGS_DIR / f"env_{run_id}.log"
env_log_file = open(env_log, "w")
run_state.env_log_file = env_log_file
run_state.env_process = subprocess.Popen(
[sys.executable, str(env_info.file_path), "serve", "--config", str(config_path)],
stdout=env_log_file,
stderr=subprocess.STDOUT,
cwd=str(TINKER_ATROPOS_ROOT),
)
# 等待 Environment 连接
await asyncio.sleep(10)
if run_state.env_process.poll() is not None:
exit_code = run_state.env_process.returncode
run_state.status = "failed"
run_state.error_message = f"Environment exited with code {exit_code}. Check {env_log}"
_stop_training_run(run_state) # 停止所有三个进程
return
# ==================== 启动成功 ====================
run_state.status = "running"
run_state.start_time = time.time()
logger.info("[%s] Training run started successfully!", run_id)
# 启动后台监控任务
asyncio.create_task(_monitor_training_run(run_state))
1.3.3 等待时间的设计考量
| 进程 | 等待时间 | 原因 |
|---|---|---|
| API | 5 秒 | 轻量级服务,快速启动 |
| Trainer | 30 秒 | 需要加载模型权重、启动 SGLang 推理服务器 |
| Environment | 90 秒 | 需要等待 Trainer 完全就绪,建立连接 |
为什么不使用健康检查 API 而是固定等待?
- 简化实现,避免复杂的轮询逻辑
- 训练任务通常运行数小时,几十秒的启动时间可以接受
- 日志文件可以在失败后查看具体原因
1.4 后台监控:实时状态检测
1.4.1 监控机制
async def _monitor_training_run(run_state: RunState):
"""
后台任务:持续监控训练运行状态。
每 30 秒检查一次进程是否存活。
"""
while run_state.status == "running":
await asyncio.sleep(30) # 30 秒轮询间隔
# 检查 Environment 进程
if run_state.env_process and run_state.env_process.poll() is not None:
exit_code = run_state.env_process.returncode
if exit_code == 0:
run_state.status = "completed" # 正常完成
else:
run_state.status = "failed" # 异常退出
run_state.error_message = f"Environment process exited with code {exit_code}"
_stop_training_run(run_state)
break
# 检查 Trainer 进程
if run_state.trainer_process and run_state.trainer_process.poll() is not None:
exit_code = run_state.trainer_process.returncode
if exit_code == 0:
run_state.status = "completed"
else:
run_state.status = "failed"
run_state.error_message = f"Trainer process exited with code {exit_code}"
_stop_training_run(run_state)
break
# 检查 API 进程
if run_state.api_process and run_state.api_process.poll() is not None:
run_state.status = "failed"
run_state.error_message = "API server exited unexpectedly"
_stop_training_run(run_state)
break
1.4.2 监控设计要点
| 设计 | 说明 |
|---|---|
| 异步任务 | asyncio.create_task() 创建独立协程,不阻塞主流程 |
| 30 秒间隔 | 平衡实时性和系统开销 |
| poll() 检查 | 非阻塞检查进程状态,None 表示仍在运行 |
| exit_code 判断 | 0 表示正常完成,非 0 表示异常 |
| 自动清理 | 任一进程退出,自动停止其他进程 |
1.5 优雅关闭:逆序终止与资源清理
1.5.1 关闭顺序的重要性
启动顺序: API → Trainer → Environment
关闭顺序: Environment → Trainer → API (逆序)
原因:
- Environment 依赖 Trainer 的推理服务
- Trainer 依赖 API 的状态同步
- 逆序关闭避免连接错误和日志异常
1.5.2 源码实现
def _stop_training_run(run_state: RunState):
"""
停止训练运行的所有进程。
按逆序停止:env -> trainer -> api
"""
logger.info("[%s] Stopping training run...", run_state.run_id)
# ==================== Step 1: 停止 Environment ====================
if run_state.env_process and run_state.env_process.poll() is None:
logger.info("[%s] Stopping environment process...", run_state.run_id)
run_state.env_process.terminate() # 发送 SIGTERM
try:
# 等待进程优雅退出,超时 10 秒
run_state.env_process.wait(timeout=10)
except subprocess.TimeoutExpired:
# 超时后强制杀死
logger.warning("[%s] Environment did not terminate gracefully, killing...", run_state.run_id)
run_state.env_process.kill()
# ==================== Step 2: 停止 Trainer ====================
if run_state.trainer_process and run_state.trainer_process.poll() is None:
logger.info("[%s] Stopping trainer process...", run_state.run_id)
run_state.trainer_process.terminate()
try:
run_state.trainer_process.wait(timeout=10)
except subprocess.TimeoutExpired:
logger.warning("[%s] Trainer did not terminate gracefully, killing...", run_state.run_id)
run_state.trainer_process.kill()
# ==================== Step 3: 停止 API ====================
if run_state.api_process and run_state.api_process.poll() is None:
logger.info("[%s] Stopping API server...", run_state.run_id)
run_state.api_process.terminate()
try:
run_state.api_process.wait(timeout=10)
except subprocess.TimeoutExpired:
logger.warning("[%s] API server did not terminate gracefully, killing...", run_state.run_id)
run_state.api_process.kill()
# 更新状态
if run_state.status == "running":
run_state.status = "stopped"
# ==================== 清理日志文件句柄 ====================
for attr in ("env_log_file", "trainer_log_file", "api_log_file"):
fh = getattr(run_state, attr, None)
if fh is not None:
try:
fh.close()
except Exception:
pass
setattr(run_state, attr, None)
logger.info("[%s] Training run stopped", run_state.run_id)
1.5.3 优雅关闭的策略
| 步骤 | 操作 | 超时 | 备用方案 |
|---|---|---|---|
| 1 | terminate() (SIGTERM) |
10 秒 | kill() (SIGKILL) |
| 2 | wait() 等待进程结束 |
- | - |
| 3 | 关闭日志文件句柄 | - | 忽略异常 |
为什么需要两步关闭?
terminate()(SIGTERM) 给进程机会保存状态、清理资源kill()(SIGKILL) 强制结束,用于进程无响应的情况- 避免直接
kill()导致的数据丢失或资源泄漏
1.6 优雅启动(带等待时间)
# Step 1: 启动 API,等待 5 秒
await asyncio.sleep(5)
if run_state.api_process.poll() is not None:
run_state.status = "failed"
return
# Step 2: 启动 Trainer,等待 30 秒
await asyncio.sleep(30)
if run_state.trainer_process.poll() is not None:
run_state.status = "failed"
return
# Step 3: 启动 Environment,等待 90 秒
await asyncio.sleep(90)
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)