个人异步编程与 RL 系统学习笔记,持续更新。


1. 🚀 核心概念:同步 vs 异步 vs 分布式

在复杂的深度学习或强化学习引擎中,如果让 CPU 等待 GPU,或者让数据生成(Rollout)等待模型更新(Update),会造成极大的算力浪费。

概念 核心思想 通俗理解
同步 (Sync) 串行执行,前一步没做完,后一步干等 烧水时盯着水壶,水没开就不去切菜
异步 (Async) 遇到耗时操作立刻切换去干别的 烧水的同时去切菜,水开了水壶会"滴"一声通知你
分布式 (Distributed) 多机多核并行,通过网络通信协作 雇了一群厨师,有人专门烧水,有人专门切菜,各司其职

💡 记住

  • asyncio 解决的是单进程内的并发(一个人同时处理多件事)。
  • Ray 解决的是跨进程/跨机器的并行(一群人同时处理多件事)。

2. ⏳ Python Asyncio 常用操作

在 RL 流水线中,经常需要一边生成数据,一边把数据推送到队列——这正是 asyncio 的主场。

🚪 asyncio.run():异步世界的入口

同步代码没法直接调用协程,asyncio.run() 是从同步世界进入异步世界的唯一大门。它会创建一个事件循环,跑完传入的协程后关闭。

async def main():
    await do_something()

# 整个程序通常只调用一次
asyncio.run(main())

⚠️ 不能在已有事件循环里嵌套调用 asyncio.run(),那种情况下直接 await 就好。

🪄 asyncawait

  • async def:定义一个协程函数。调用它不会立即执行,而是返回一个协程对象。
  • await交出控制权。告诉事件循环:“这步要等,你先去跑别的协程,好了再叫我。”
async def fetch_data():
    await asyncio.sleep(1)  # 挂起自己,事件循环去干别的
    return "data"

async def main():
    # 两种写法等价,拆开写是为了拿到 ref 方便后续操作(比如取消)
    ref = fetch_data()
    result = await ref

    # 更常见的简写
    result = await fetch_data()

🏃 asyncio.create_task():丢到后台,不等它

用途:启动一个任务让它在后台跑,主流程继续往下走,不阻塞。

async def main():
    # 立刻返回,后台开始跑 save_to_queue,主流程不等它
    task = asyncio.create_task(save_to_queue(data))

    # 继续做其他事...
    generate_more_data()

    # 如果后面某个时刻需要等它完成,再 await
    await task

和直接 await coroutine() 的区别:create_task立即调度协程开始执行,而直接 await 要等到执行到那行才开始。

📦 asyncio.gather() vs asyncio.wait()

命令 作用 适用场景
asyncio.gather(*tasks) 等待全部任务完成,按顺序返回结果 需要汇总多个独立请求的结果
asyncio.wait(tasks) 可设置条件(如 FIRST_COMPLETED)或超时 谁先完成就先处理谁,或者设超时兜底
# gather:等所有人
results = await asyncio.gather(task_a(), task_b(), task_c())

# wait:等最快的那个
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

📬 asyncio.Queue:协程间的数据管道

RL 流水线里,生产者(Rollout)和消费者(Trainer)速度往往不一致,asyncio.Queue 是解耦它们的标准方式。

async def producer(queue: asyncio.Queue):
    for i in range(10):
        trajectory = generate_trajectory()
        await queue.put(trajectory)   # 队列满了会自动挂起等待
        print(f"生产了第 {i} 条轨迹")

async def consumer(queue: asyncio.Queue):
    while True:
        trajectory = await queue.get()  # 队列空了会自动挂起等待
        train_on(trajectory)
        queue.task_done()               # 告知队列这条数据已处理完

async def main():
    queue = asyncio.Queue(maxsize=32)   # 最多缓存 32 条,防止内存爆炸
    await asyncio.gather(
        producer(queue),
        consumer(queue),
    )

🚦 asyncio.Event:协程间的信号灯

用途:一个协程等待某个事件发生,另一个协程触发它。比 Queue 更轻量,适合"通知"而非"传数据"的场景。

async def worker(event: asyncio.Event):
    print("Worker 等待启动信号...")
    await event.wait()   # 挂起,直到 event 被 set()
    print("收到信号,开始干活!")

async def main():
    event = asyncio.Event()
    asyncio.create_task(worker(event))

    await asyncio.sleep(2)   # 模拟准备工作
    event.set()              # 发出信号,worker 恢复执行

3. 🕸️ 分布式计算基础(以 Ray 为例)

RL 训练往往需要成百上千个 Worker 节点,Ray 是目前最主流的 Python 分布式框架之一。

🎯 标记分布式任务(@ray.remote

把一个普通的 Python 函数或类变成可以在集群任意节点上运行的分布式任务

@ray.remote
class RolloutWorker:
    def generate(self):
        return "trajectory"

# 在集群里找一台机器拉起这个 Worker 进程
worker = RolloutWorker.remote()

# 瞬间返回 ObjectRef(快递单号),远端已经开始执行了
future_ref = worker.generate.remote()

🧱 异步句柄(ObjectRef

调用 .remote()立刻返回一个 ObjectRef,类似快递单号——远端已经开始算了,你拿着单号可以继续干别的,随时凭单号取货。

ObjectRef 支持两种取货方式:

# 方式1:同步阻塞取货(主进程卡住等)
result = ray.get(future_ref)

# 方式2:在 async 上下文中非阻塞等待(推荐)
result = await future_ref

🛑 阻塞获取结果(ray.get()

用途:拿着单号,死等快递送达。主进程会卡在这里直到远端返回。

result = ray.get(future_ref)

⚠️ 如果传入一个列表 ray.get([ref1, ref2, ref3]),会等所有任务都完成才返回,最慢的那个会拖累所有人。

🔄 非阻塞轮询监控(ray.wait()

用途:监控一群 Worker 的进度,谁干完了就收谁的结果,没干完的继续等。比 ray.get 灵活得多。

pending_tasks = [worker1.run.remote(), worker2.run.remote(), worker3.run.remote()]

while pending_tasks:
    # 只要有 1 个完成了(或超时 1 秒),就立刻返回
    ready, pending_tasks = ray.wait(pending_tasks, num_returns=1, timeout=1.0)

    if ready:
        print("有 Worker 完成了!", ray.get(ready))
    else:
        print("还没人完成,打印一下进度...")
ray.get() ray.wait()
返回值 实际数据 (已完成列表, 未完成列表)
阻塞行为 等所有任务完成 可设数量或超时,灵活返回
适用场景 需要全部结果才能继续 流式处理,谁快先处理谁

4. 🛠️ 微服务部署与绑定(Ray Serve)

在工业级 RL 引擎中,各个阶段(Actor、Rollout、Reward)通常不会直接裸跑,而是包装成微服务,自带 HTTP 路由、健康检查和负载均衡。

📦 @serve.deployment 与蓝图绑定

把一个类标记为微服务模板。

@serve.deployment
class ActorService:
    def __init__(self, config):
        self.model = load_model(config)

    async def __call__(self, request):
        data = await request.json()
        return self.model.predict(data)

.bind().options() 分别干什么?

  • .bind(config):给 __init__ 传业务参数,生成一份部署蓝图。此时还没有拉起任何进程。
  • .options(num_gpus=1):给调度框架传资源参数,告诉集群"拉起这个服务时必须给它分配 1 张 GPU"。
# 生成蓝图:描述"将来怎么部署",不立即执行
app_blueprint = ActorService.options(num_gpus=1).bind(config_dict)

💡 bind() 的本质是延迟初始化——真正的对象创建发生在 serve.run() 被调用、Serve 在 worker 上启动副本的时候。

🚀 真正拉起服务

拿着蓝图,向集群下达部署指令:

# 集群分配机器、拉起进程,返回一个调用句柄(DeploymentHandle)
handle = serve.run(app_blueprint, name="actor")

# 之后对句柄发号施令,请求会通过 RPC 打到远端微服务
ref = handle.predict.remote(input_data)
result = await ref

@ray.remote vs ray.serve

@ray.remote ray.serve
定位 底层分布式计算原语 高层在线服务框架
调用方式 Ray 集群内部 Python 调用 HTTP 请求 / Python Handle
生命周期 Task 用完销毁,Actor 手动管理 长期运行,自动管理
扩缩容 手动 自动(配置副本数/并发)
典型场景 离线批处理、训练、并行计算 在线推理、对外 API 服务

5. 💡 RL 引擎中的经典异步设计模式

🎭 隐藏的 .remote():本地代理类(Proxy)

场景:主控代码里调用的是普通函数 service.run(),远端进程却神奇地开始跑了?

原因:框架通常会写一个本地的 Wrapper 类,把 RPC 逻辑封装起来,对外暴露干净的接口。

class ServiceWrapper:
    def __init__(self, serve_handle):
        self.handle = serve_handle

    def run(self):
        # 表面是普通调用,内部偷偷发起了异步 RPC
        self.task_ref = self.handle.run.remote()
        return self.task_ref   # 返回 ObjectRef,调用方可以选择 await 或 ray.get

    async def wait(self):
        # 等待上一次 run 的结果
        return await self.task_ref

🏃 后台死循环与事件通知

场景:远端服务启动后,如何让它持续跑,同时又不阻塞 RPC 消息接收?

解法:在 run() 里开一个后台线程执行死循环,主协程用 asyncio.Event 挂起等待停止信号。

class Worker:
    async def run(self):
        self.stop_event = asyncio.Event()
        # 后台线程跑死循环,不占用 async 事件循环
        self._thread = threading.Thread(target=self._background_loop, daemon=True)
        self._thread.start()

        # 挂起当前协程,直到收到 stop 信号
        await self.stop_event.wait()

    def _background_loop(self):
        while not self.stop_event.is_set():
            # 不断从队列取数据、计算、写回队列...
            data = self.queue.get()
            self.process(data)

    async def stop(self):
        self.stop_event.set()   # 触发信号,run() 恢复并返回

🌊 生产消费解耦(Pipeline Parallelism)

全异步 RL 流水线的标准分工:

  1. 生产者(Rollout Worker):队列没满就疯狂生成轨迹,异步写入分布式队列,不等下游。
  2. 分布式队列:隔离不同阶段,支持多路复用,削峰填谷。
  3. 消费者(Trainer):流式读取数据,训完一批就更新权重,处理完后通知队列释放空间。
  4. 主控(Driver):不写 for step in range(N) 的阻塞大循环,而是用 ray.wait() 异步监控所有 Worker 句柄,谁完成了就收谁的结果。
Rollout Worker ──→ [ 分布式队列 ] ──→ Trainer
     ↑                                   ↓
     └──────── 权重同步(广播)─────────────┘
                       ↑
                    Driver(非阻塞监控)

总结
好的异步架构里,没有全局的阻塞大管家。每个 Worker 自己判断进度,自己生产消费,首尾相接,Driver 只负责监控和协调,不插手具体执行。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐