本篇先说Publisher的源码设计,共享内存数据相关下篇解析

设计1: 分层设计

  • 用户层API封装目的:不暴露过多接口
  • 类型适配层:使用port进行数据交互的逻辑
  • 公共行为层:封装了Publisher的服务发现行为
  • 端口层:封装共享内存数据使用逻辑
  • 数据层:管理数据

设计2: Loan/Publish 双阶段模型

发布数据分两步:loan(借用) 和 publish(发布)

2.1 loan — 从共享内存池借出一块 chunk

  // publisher_impl.inl:45-48
  template <typename... Args>
  expected<Sample<T, H>, AllocationError> loan(Args&&... args) noexcept
  {
      return loanSample().and_then([&](auto& sample) {
          new (sample.get()) T(std::forward<Args>(args)...);  // placement new 就地构造
      });
  }

loan内部调用链:
  loan(args...)
    → loanSample()
        → port().tryAllocateChunk(sizeof(T), alignof(T), userHeaderSize, alignof(H))
            → ChunkSender 从 MemoryManager 申请共享内存
        → convertChunkHeaderToSample(chunkHeader)
                关键!涉及到共享内存指针到Sample转换
            → unique_ptr<T>(payload, custom_deleter) + *this 引用 RAII自动释放设计)
            → Sample<T, H>
    → placement new T(args...) 在 sample 内存上就地构造

 2.2 publish — 转移所有权并发送

// publisher_impl.inl:94-99
  void publish(Sample<T, H>&& sample) noexcept
  {
      auto userPayload = sample.release();    // 关键!从 Sample 的 unique_ptr 中取出裸指针(阻止 deleter 执行)
      auto chunkHeader = ChunkHeader::fromUserPayload(userPayload);
      port().sendChunk(chunkHeader);          // ② 发送给所有连接的 subscriber
  }

关键流程:

  1.  sample.release() — 调用 SmartChunk::release(),夺取 unique_ptr 的所有权,使其不会在析构时调 deleter
  2.  sendChunk() — 将 chunk 放入所有连接的 subscriber 的队列

时序总结

用户调用 publisher.loan(args...)
      │
      ▼
  PublisherImpl::loan()
      │ 调用 loanSample()
      ▼
  loanSample()
      │ 调用 port().tryAllocateChunk(sizeof(T), alignof(T), ...)
      ▼
  PublisherPortUser::tryAllocateChunk()
      │ ChunkSender 从 MemoryManager 获取共享内存 chunk
      ▼
  返回 ChunkHeader* → convertChunkHeaderToSample()
      │ unique_ptr<T>(payload, deleter=releaseChunk) + publisher 引用
      ▼
  Sample<T, H> 返回给用户
      │ placement new T(args...) 就地构造
      ▼
  用户操作 sample->field = value
      │
      ▼
  用户调用 sample.publish()  或  publisher.publish(std::move(sample))
      │
      ▼
  PublisherImpl::publish(Sample<T,H>&& sample)
      │ sample.release() — 夺取 unique_ptr 的指针,阻止 deleter
      │ port().sendChunk(chunkHeader)
      ▼
  PublisherPortUser::sendChunk()
      │ ChunkSender → ChunkDistributor → 推入所有连接的 subscriber 队列
      ▼
  Subscriber 端 take() 接收(零拷贝,读同一块共享内存)

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐