RL系统中的异步编程:async & Ray
个人异步编程与 RL 系统学习笔记,持续更新。
1. 🚀 核心概念:同步 vs 异步 vs 分布式
在复杂的深度学习或强化学习引擎中,如果让 CPU 等待 GPU,或者让数据生成(Rollout)等待模型更新(Update),会造成极大的算力浪费。
| 概念 | 核心思想 | 通俗理解 |
|---|---|---|
| 同步 (Sync) | 串行执行,前一步没做完,后一步干等 | 烧水时盯着水壶,水没开就不去切菜 |
| 异步 (Async) | 遇到耗时操作立刻切换去干别的 | 烧水的同时去切菜,水开了水壶会"滴"一声通知你 |
| 分布式 (Distributed) | 多机多核并行,通过网络通信协作 | 雇了一群厨师,有人专门烧水,有人专门切菜,各司其职 |
💡 记住:
asyncio解决的是单进程内的并发(一个人同时处理多件事)。Ray解决的是跨进程/跨机器的并行(一群人同时处理多件事)。
2. ⏳ Python Asyncio 常用操作
在 RL 流水线中,经常需要一边生成数据,一边把数据推送到队列——这正是 asyncio 的主场。
🚪 asyncio.run():异步世界的入口
同步代码没法直接调用协程,asyncio.run() 是从同步世界进入异步世界的唯一大门。它会创建一个事件循环,跑完传入的协程后关闭。
async def main():
await do_something()
# 整个程序通常只调用一次
asyncio.run(main())
⚠️ 不能在已有事件循环里嵌套调用
asyncio.run(),那种情况下直接await就好。
🪄 async 与 await
async def:定义一个协程函数。调用它不会立即执行,而是返回一个协程对象。await:交出控制权。告诉事件循环:“这步要等,你先去跑别的协程,好了再叫我。”
async def fetch_data():
await asyncio.sleep(1) # 挂起自己,事件循环去干别的
return "data"
async def main():
# 两种写法等价,拆开写是为了拿到 ref 方便后续操作(比如取消)
ref = fetch_data()
result = await ref
# 更常见的简写
result = await fetch_data()
🏃 asyncio.create_task():丢到后台,不等它
用途:启动一个任务让它在后台跑,主流程继续往下走,不阻塞。
async def main():
# 立刻返回,后台开始跑 save_to_queue,主流程不等它
task = asyncio.create_task(save_to_queue(data))
# 继续做其他事...
generate_more_data()
# 如果后面某个时刻需要等它完成,再 await
await task
和直接
await coroutine()的区别:create_task会立即调度协程开始执行,而直接await要等到执行到那行才开始。
📦 asyncio.gather() vs asyncio.wait()
| 命令 | 作用 | 适用场景 |
|---|---|---|
asyncio.gather(*tasks) |
等待全部任务完成,按顺序返回结果 | 需要汇总多个独立请求的结果 |
asyncio.wait(tasks) |
可设置条件(如 FIRST_COMPLETED)或超时 |
谁先完成就先处理谁,或者设超时兜底 |
# gather:等所有人
results = await asyncio.gather(task_a(), task_b(), task_c())
# wait:等最快的那个
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
📬 asyncio.Queue:协程间的数据管道
RL 流水线里,生产者(Rollout)和消费者(Trainer)速度往往不一致,asyncio.Queue 是解耦它们的标准方式。
async def producer(queue: asyncio.Queue):
for i in range(10):
trajectory = generate_trajectory()
await queue.put(trajectory) # 队列满了会自动挂起等待
print(f"生产了第 {i} 条轨迹")
async def consumer(queue: asyncio.Queue):
while True:
trajectory = await queue.get() # 队列空了会自动挂起等待
train_on(trajectory)
queue.task_done() # 告知队列这条数据已处理完
async def main():
queue = asyncio.Queue(maxsize=32) # 最多缓存 32 条,防止内存爆炸
await asyncio.gather(
producer(queue),
consumer(queue),
)
🚦 asyncio.Event:协程间的信号灯
用途:一个协程等待某个事件发生,另一个协程触发它。比 Queue 更轻量,适合"通知"而非"传数据"的场景。
async def worker(event: asyncio.Event):
print("Worker 等待启动信号...")
await event.wait() # 挂起,直到 event 被 set()
print("收到信号,开始干活!")
async def main():
event = asyncio.Event()
asyncio.create_task(worker(event))
await asyncio.sleep(2) # 模拟准备工作
event.set() # 发出信号,worker 恢复执行
3. 🕸️ 分布式计算基础(以 Ray 为例)
RL 训练往往需要成百上千个 Worker 节点,Ray 是目前最主流的 Python 分布式框架之一。
🎯 标记分布式任务(@ray.remote)
把一个普通的 Python 函数或类变成可以在集群任意节点上运行的分布式任务。
@ray.remote
class RolloutWorker:
def generate(self):
return "trajectory"
# 在集群里找一台机器拉起这个 Worker 进程
worker = RolloutWorker.remote()
# 瞬间返回 ObjectRef(快递单号),远端已经开始执行了
future_ref = worker.generate.remote()
🧱 异步句柄(ObjectRef)
调用 .remote() 会立刻返回一个 ObjectRef,类似快递单号——远端已经开始算了,你拿着单号可以继续干别的,随时凭单号取货。
ObjectRef 支持两种取货方式:
# 方式1:同步阻塞取货(主进程卡住等)
result = ray.get(future_ref)
# 方式2:在 async 上下文中非阻塞等待(推荐)
result = await future_ref
🛑 阻塞获取结果(ray.get())
用途:拿着单号,死等快递送达。主进程会卡在这里直到远端返回。
result = ray.get(future_ref)
⚠️ 如果传入一个列表
ray.get([ref1, ref2, ref3]),会等所有任务都完成才返回,最慢的那个会拖累所有人。
🔄 非阻塞轮询监控(ray.wait())
用途:监控一群 Worker 的进度,谁干完了就收谁的结果,没干完的继续等。比 ray.get 灵活得多。
pending_tasks = [worker1.run.remote(), worker2.run.remote(), worker3.run.remote()]
while pending_tasks:
# 只要有 1 个完成了(或超时 1 秒),就立刻返回
ready, pending_tasks = ray.wait(pending_tasks, num_returns=1, timeout=1.0)
if ready:
print("有 Worker 完成了!", ray.get(ready))
else:
print("还没人完成,打印一下进度...")
ray.get() |
ray.wait() |
|
|---|---|---|
| 返回值 | 实际数据 | (已完成列表, 未完成列表) |
| 阻塞行为 | 等所有任务完成 | 可设数量或超时,灵活返回 |
| 适用场景 | 需要全部结果才能继续 | 流式处理,谁快先处理谁 |
4. 🛠️ 微服务部署与绑定(Ray Serve)
在工业级 RL 引擎中,各个阶段(Actor、Rollout、Reward)通常不会直接裸跑,而是包装成微服务,自带 HTTP 路由、健康检查和负载均衡。
📦 @serve.deployment 与蓝图绑定
把一个类标记为微服务模板。
@serve.deployment
class ActorService:
def __init__(self, config):
self.model = load_model(config)
async def __call__(self, request):
data = await request.json()
return self.model.predict(data)
.bind() 和 .options() 分别干什么?
.bind(config):给__init__传业务参数,生成一份部署蓝图。此时还没有拉起任何进程。.options(num_gpus=1):给调度框架传资源参数,告诉集群"拉起这个服务时必须给它分配 1 张 GPU"。
# 生成蓝图:描述"将来怎么部署",不立即执行
app_blueprint = ActorService.options(num_gpus=1).bind(config_dict)
💡
bind()的本质是延迟初始化——真正的对象创建发生在serve.run()被调用、Serve 在 worker 上启动副本的时候。
🚀 真正拉起服务
拿着蓝图,向集群下达部署指令:
# 集群分配机器、拉起进程,返回一个调用句柄(DeploymentHandle)
handle = serve.run(app_blueprint, name="actor")
# 之后对句柄发号施令,请求会通过 RPC 打到远端微服务
ref = handle.predict.remote(input_data)
result = await ref
@ray.remote vs ray.serve
@ray.remote |
ray.serve |
|
|---|---|---|
| 定位 | 底层分布式计算原语 | 高层在线服务框架 |
| 调用方式 | Ray 集群内部 Python 调用 | HTTP 请求 / Python Handle |
| 生命周期 | Task 用完销毁,Actor 手动管理 | 长期运行,自动管理 |
| 扩缩容 | 手动 | 自动(配置副本数/并发) |
| 典型场景 | 离线批处理、训练、并行计算 | 在线推理、对外 API 服务 |
5. 💡 RL 引擎中的经典异步设计模式
🎭 隐藏的 .remote():本地代理类(Proxy)
场景:主控代码里调用的是普通函数 service.run(),远端进程却神奇地开始跑了?
原因:框架通常会写一个本地的 Wrapper 类,把 RPC 逻辑封装起来,对外暴露干净的接口。
class ServiceWrapper:
def __init__(self, serve_handle):
self.handle = serve_handle
def run(self):
# 表面是普通调用,内部偷偷发起了异步 RPC
self.task_ref = self.handle.run.remote()
return self.task_ref # 返回 ObjectRef,调用方可以选择 await 或 ray.get
async def wait(self):
# 等待上一次 run 的结果
return await self.task_ref
🏃 后台死循环与事件通知
场景:远端服务启动后,如何让它持续跑,同时又不阻塞 RPC 消息接收?
解法:在 run() 里开一个后台线程执行死循环,主协程用 asyncio.Event 挂起等待停止信号。
class Worker:
async def run(self):
self.stop_event = asyncio.Event()
# 后台线程跑死循环,不占用 async 事件循环
self._thread = threading.Thread(target=self._background_loop, daemon=True)
self._thread.start()
# 挂起当前协程,直到收到 stop 信号
await self.stop_event.wait()
def _background_loop(self):
while not self.stop_event.is_set():
# 不断从队列取数据、计算、写回队列...
data = self.queue.get()
self.process(data)
async def stop(self):
self.stop_event.set() # 触发信号,run() 恢复并返回
🌊 生产消费解耦(Pipeline Parallelism)
全异步 RL 流水线的标准分工:
- 生产者(Rollout Worker):队列没满就疯狂生成轨迹,异步写入分布式队列,不等下游。
- 分布式队列:隔离不同阶段,支持多路复用,削峰填谷。
- 消费者(Trainer):流式读取数据,训完一批就更新权重,处理完后通知队列释放空间。
- 主控(Driver):不写
for step in range(N)的阻塞大循环,而是用ray.wait()异步监控所有 Worker 句柄,谁完成了就收谁的结果。
Rollout Worker ──→ [ 分布式队列 ] ──→ Trainer
↑ ↓
└──────── 权重同步(广播)─────────────┘
↑
Driver(非阻塞监控)
✅ 总结:
好的异步架构里,没有全局的阻塞大管家。每个 Worker 自己判断进度,自己生产消费,首尾相接,Driver 只负责监控和协调,不插手具体执行。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)