设计1: Loan/Publish 双阶段模型

用户层可以直接往共享(Sample直接使用)内存填充数据

Publisher详细参考:https://blog.csdn.net/hanyuan1993/article/details/158929006?spm=1001.2014.3001.5502

设计2: Publisher直接放到存储队列

这部分的实现就比较复杂了,涉及到服务发现里的步骤

2.1 Subscriber 向 Roudi 请求订阅

  // subscriber 进程中
  subscriber.subscribe();
    → getMembers()->m_subscribeRequested.store(true);  // 只是写了个 atomic 标记

Subscriber详细参考:https://blog.csdn.net/hanyuan1993/article/details/158885128

2.2 Roudi 将 Subscriber 的 队列地址 直接添加到 Publisher的 订阅队列中

 RouDi discovery 循环检测到订阅请求
// port_manager.cpp — RouDi 进程中
  void PortManager::handleSubscriberPorts() {
      for (auto& subscriberPort : subscriberPorts) {
          auto caproMessage = subscriberPort.tryGetCaProMessage();
          // 检测到 m_subscribeRequested==true → 生成 SUB 消息
          // 消息里携带了 subscriber 的 ChunkQueueData 指针
      }
  }

找到匹配的 Publisher,把队列指针塞进去
  void PortManager::sendToAllMatchingPublisherPorts(caproMessage) {
      for (auto& publisherPort : publisherPorts) {
          if (服务描述匹配) {
              publisherPort.dispatchCaProMessageAndGetPossibleResponse(caproMessage);
          }
      }
  }

// 此类为Roudi侧的PublisherPort(publisher_port_roudi.cpp:92-101)
PublisherPortRouDi::dispatchCaProMessageAndGetPossibleResponse(caProMessage) {
      if (caProMessage.m_type == SUB) {
          m_chunkSender.tryAddQueue(
              static_cast<ChunkQueueData_t*>(caProMessage.m_chunkQueueData)
              //                             ^^^^^^^^^^^^^^^^^^^^^^^^^^
              //                             这就是 Subscriber 的队列地址!
          );
      }
  }


ChunkDistributor::tryAddQueue(queueData) {
      getMembers()->m_queues.push_back(RelativePointer(queueData));
      //                               ^^^^^^^^^^^^^^^^^^^^^^^^^^
      //                               存为 RelativePointer 放入投递名单
  }

2.3 共享内存分布情况

补充说明:

  • PublisherPortUser 是用户层操控共享内存数据的接口(只提供offer,stop offer等逻辑)
  • PublisherPortRoudi 是Roudi层操控共享内存的数据接口(可以添加Subscriber侧的接收Queue,Publisher进程崩溃时回收所有内存)
  • SubscriberPortUser (只提供subscriber,unsubscriber逻辑)
  • SubscriberPortRoudi (提供队列地址,Subscriber进程崩溃时回收所有内存)
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐