1. pipe_t 是什么?

定义pipe_t 是 ZeroMQ 中两个对象之间的双向通信通道

核心作用

  • ✅ 连接 Socket 和 Session
  • ✅ 提供无锁消息队列
  • ✅ 实现背压(Back Pressure)
  • ✅ 管理消息流控(HWM)
  • ✅ 处理 Pipe 生命周期

继承关系

class pipe_t : public object_t,          // 命令处理基类
               public array_item_t<1>,   // 入站数组身份
               public array_item_t<2>,   // 出站数组身份
               public array_item_t<3>    // 销毁数组身份
pipe_t 继承的三个 array_item_t 详解

1. 代码定义

// pipe.hpp 第 73-77 行
class pipe_t ZMQ_FINAL : public object_t,
                         public array_item_t<1>,  // ← ID=1
                         public array_item_t<2>,  // ← ID=2
                         public array_item_t<3>   // ← ID=3
{
    // ...
};

注释说明(第 70-72 行):

//  Note that pipe can be stored in three different arrays.
//  The array of inbound pipes (1), the array of outbound pipes (2) and
//  the generic array of pipes to be deallocated (3).

翻译

注意,pipe 可以存储在三个不同的数组中:

  • 入站管道数组 (ID=1)
  • 出站管道数组 (ID=2)
  • 待销毁管道数组 (ID=3)

2. 为什么需要三个继承?

问题:同一个 pipe_t 对象需要同时存在于多个不同的数组中。

场景

// DEALER Socket
class dealer_t {
    fq_t _fq;  // 入站公平队列
    lb_t _lb;  // 出站负载均衡
};

// 一个 Pipe 同时属于:
// 1. _fq._pipes (用于接收消息)
// 2. _lb._pipes (用于发送消息)
// 3. socket_base_t 的销毁列表 (Socket 关闭时)

如果不使用多个继承

// 错误的设计
class pipe_t {
    int _index;  // ❌ 只能存储一个索引
};

// 当 Pipe 同时加入 _fq 和 _lb 时:
pipe->_index = 0;  // 在_fq 中的索引
pipe->_index = 2;  // ❌ 覆盖了!在_lb 中的索引丢失

正确的设计

// 使用模板继承
class pipe_t : public array_item_t<1>,  // 身份 1
               public array_item_t<2>,  // 身份 2
               public array_item_t<3>   // 身份 3
{
    // 每个基类有自己的 _index 成员
};

// 现在可以同时存储 3 个索引:
pipe->array_item_t<1>::_index = 0;  // 在_fq 中排第 1
pipe->array_item_t<2>::_index = 2;  // 在_lb 中排第 3
pipe->array_item_t<3>::_index = 1;  // 在销毁列表中排第 2

3. array_item_t 的实现

// array.hpp 第 53-67 行
template <int ID = 0> class array_item_t
{
  public:
    array_item_t () : _array_index (-1) {}
    
    virtual ~array_item_t () ZMQ_DEFAULT;
    
    void set_array_index (int index_) { 
        _array_index = index_; 
    }
    
    int get_array_index () const { 
        return _array_index; 
    }

  private:
    int _array_index;  // ← 存储在该数组中的索引
    
    ZMQ_NON_COPYABLE_NOR_MOVABLE (array_item_t)
};

关键点

  • ✅ 每个 array_item_t<ID> 都有自己独立的 _array_index 成员
  • ✅ 模板参数 ID 用于区分不同的数组
  • ✅ 初始值为 -1(表示不在数组中)

4. 内存布局

pipe_t 对象的内存布局:
┌─────────────────────────────┐
│ object_t 部分                │
│  (对象基类,含命令队列等)     │
├─────────────────────────────┤
│ array_item_t<1> 部分         │
│  _array_index = 0           │ ← 在 fq_t 中的索引
├─────────────────────────────┤
│ array_item_t<2> 部分         │
│  _array_index = 2           │ ← 在 lb_t 中的索引
├─────────────────────────────┤
│ array_item_t<3> 部分         │
│  _array_index = 1           │ ← 在销毁列表中的索引
├─────────────────────────────┤
│ pipe_t 自己的成员            │
│  _in_pipe, _out_pipe        │
│  _event_sink, _state...     │
└─────────────────────────────┘

总大小:约 100-150 bytes

示意图

同一个 pipe_t 对象:
         ┌──────────────────────┐
         │  pipe_t              │
         │ ┌──────────────────┐ │
         │ │ array_item<1>    │ │ ← index=0 (在_fq 中)
         │ ├──────────────────┤ │
         │ │ array_item<2>    │ │ ← index=2 (在_lb 中)
         │ ├──────────────────┤ │
         │ │ array_item<3>    │ │ ← index=1 (在销毁列表)
         │ └──────────────────┘ │
         └──────────────────────┘
                ↓
    ┌───────────┴───────────┬───────────────┐
    ↓                       ↓               ↓
┌─────────┐           ┌─────────┐    ┌──────────┐
│  fq_t   │           │   lb_t  │    │  own_t   │
│ _pipes  │           │  _pipes │    │_children │
│ [Pipe]  │           │ [A,B,Pipe]│  │ [Pipe,...]│
│ ID=1    │           │  ID=2    │    │  ID=3    │
└─────────┘           └─────────┘    └──────────┘

5. 三个 ID 的具体用途

ID=1:入站管道数组

使用者fq_t (Fair Queuing)

// fq.hpp
class fq_t
{
  private:
    typedef array_t<pipe_t, 1> pipes_t;  // ← 使用 ID=1
    pipes_t _pipes;
};

使用场景

  • DEALER、ROUTER、PULL 等 Socket 的接收队列
  • 公平队列算法,避免某个连接独占

代码示例

// dealer.cpp
void zmq::dealer_t::xattach_pipe (pipe_t *pipe_)
{
    _fq.attach (pipe_);  // ← 加入入站队列 (ID=1)
    _lb.attach (pipe_);  // ← 加入出站队列 (ID=2)
}

// fq.cpp
void zmq::fq_t::attach (pipe_t *pipe_)
{
    _pipes.push_back (pipe_);  // ← 调用 array_t::push_back
    activated (pipe_);
}

// array.hpp
void push_back (T *item_)
{
    if (item_)
        // ★★★ 设置 ID=1 的索引 ★★★
        static_cast<array_item_t<1> *> (item_)
            ->set_array_index (_items.size ());
    _items.push_back (item_);
}

ID=2:出站管道数组

使用者lb_t (Load Balancer)、dist_t (Distributor)

// lb.hpp
class lb_t
{
  private:
    typedef array_t<pipe_t, 2> pipes_t;  // ← 使用 ID=2
    pipes_t _pipes;
};

// dist.hpp
class dist_t
{
  private:
    typedef array_t<pipe_t, 2> pipes_t;  // ← 也使用 ID=2
    pipes_t _pipes;
};

使用场景

  • DEALER、PUSH、CLIENT 等 Socket 的发送队列
  • 负载均衡算法,轮流发送

代码示例

// lb.cpp
void zmq::lb_t::attach (pipe_t *pipe_)
{
    _pipes.push_back (pipe_);  // ← 调用 array_t::push_back
    activated (pipe_);
}

// array.hpp
void push_back (T *item_)
{
    if (item_)
        // ★★★ 设置 ID=2 的索引 ★★★
        static_cast<array_item_t<2> *> (item_)
            ->set_array_index (_items.size ());
    _items.push_back (item_);
}

注意lb_tdist_t 都用 ID=2,但它们不会同时出现在同一个 Socket 中


ID=3:待销毁管道数组

使用者own_t (Ownership 基类)

// own.hpp
class own_t : public object_t
{
  private:
    // 子对象列表(包括 Pipe)
    array_t<own_t, 3> children;  // ← 使用 ID=3
};

使用场景

  • Socket 关闭时,需要销毁所有关联的 Pipe
  • 延迟销毁机制

代码示例

// socket_base.cpp
void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
{
    // 从 _pipes 列表移除
    _pipes.erase (pipe_);
    
    // 如果 Socket 正在终止,确认销毁
    if (is_terminating ())
        unregister_term_ack ();
}

// own.cpp
void zmq::own_t::process_term (int linger_)
{
    // 终止所有子对象
    for (auto &child : children) {
        child->terminate ();
    }
}

6. 实际工作流程

场景:DEALER Socket 有 2 个连接
// 创建 DEALER
void *dealer = zmq_socket(ctx, ZMQ_DEALER);

// 连接第 1 个服务端
zmq_connect(dealer, "tcp://server1:5555");
// 创建 pipe1
// pipe1->array_item<1>::_index = 0  (在_fq 中)
// pipe1->array_item<2>::_index = 0  (在_lb 中)

// 连接第 2 个服务端
zmq_connect(dealer, "tcp://server2:5555");
// 创建 pipe2
// pipe2->array_item<1>::_index = 1  (在_fq 中)
// pipe2->array_item<2>::_index = 1  (在_lb 中)

// 内存状态:
pipe1 对象:
  array_item<1>::_index = 0
  array_item<2>::_index = 0
  array_item<3>::_index = -1  (不在销毁列表)

pipe2 对象:
  array_item<1>::_index = 1
  array_item<2>::_index = 1
  array_item<3>::_index = -1

// _fq._pipes: [pipe1, pipe2]
// _lb._pipes: [pipe1, pipe2]

场景:Socket 关闭
// 关闭 Socket
zmq_close(dealer);

// 内部流程:
// 1. Socket 标记为终止
// 2. 将所有 Pipe 加入销毁列表 (ID=3)
// 3. 终止所有 Pipe

pipe1->array_item<3>::_index = 0  (在销毁列表中)
pipe2->array_item<3>::_index = 1

// 4. 等待所有 Pipe 确认销毁
// 5. 销毁 Socket 自身

7. array_t 如何使用这些索引

// array.hpp
template <typename T, int ID = 0> class array_t
{
  public:
    void push_back (T *item_)
    {
        if (item_)
            // 设置索引(使用对应的 ID)
            static_cast<array_item_t<ID> *> (item_)
                ->set_array_index (_items.size ());
        _items.push_back (item_);
    }
    
    void erase (size_type index_)
    {
        if (_items.empty ())
            return;
        
        // 更新最后一个元素的索引
        static_cast<array_item_t<ID> *> (_items.back ())
            ->set_array_index (static_cast<int> (index_));
        
        // 用最后一个元素覆盖要删除的元素
        _items[index_] = _items.back ();
        _items.pop_back ();
    }
    
    void swap (size_type index1_, size_type index2_)
    {
        // 更新两个元素的索引
        if (_items[index1_])
            static_cast<array_item_t<ID> *> (_items[index1_])
                ->set_array_index (static_cast<int> (index2_));
        if (_items[index2_])
            static_cast<array_item_t<ID> *> (_items[index2_])
                ->set_array_index (static_cast<int> (index1_));
        std::swap (_items[index1_], _items[index2_]);
    }
    
    static size_type index (T *item_)
    {
        return static_cast<size_type> (
            static_cast<array_item_t<ID> *> (item_)->get_array_index ()
        );
    }
    
  private:
    std::vector<T *> _items;
};

关键点

  • ✅ 所有操作都通过 static_cast<array_item_t<ID>*> 访问对应的索引
  • ID 是模板参数,编译时确定
  • ✅ 不同 ID 的 array_t 互不干扰

8. 对比:使用 vs 不使用多重继承

方案 A:使用多重继承(实际方案)
class pipe_t : public array_item_t<1>,
               public array_item_t<2>,
               public array_item_t<3>
{
};

// 使用
fq_t _fq;   // 使用 array_item<1>
lb_t _lb;   // 使用 array_item<2>
own_t own;  // 使用 array_item<3>

// 优点:
// ✅ 类型安全:编译器检查 ID 匹配
// ✅ 零开销:索引直接存储在对象中
// ✅ 代码复用:array_t 通用
方案 B:使用独立成员变量
class pipe_t {
    int _fq_index;    // 在 fq_t 中的索引
    int _lb_index;    // 在 lb_t 中的索引
    int _own_index;   // 在 own_t 中的索引
    
    friend class fq_t;
    friend class lb_t;
    friend class own_t;
};

// 使用
void fq_t::attach(pipe_t *p) {
    p->_fq_index = _pipes.size ();
    _pipes.push_back (p);
}

// 缺点:
// ❌ 需要友元声明
// ❌ 容易混淆(哪个索引对应哪个数组)
// ❌ 不够通用
方案 C:使用外部映射
class fq_t {
    std::map<pipe_t*, int> _indices;
    std::vector<pipe_t*> _pipes;
};

void fq_t::attach(pipe_t *p) {
    _indices[p] = _pipes.size ();
    _pipes.push_back (p);
}

// 缺点:
// ❌ 额外的内存开销(map)
// ❌ 查找开销(O(log n))
// ❌ 缓存不友好

9. 总结

方面 ID=1 ID=2 ID=3
使用者 fq_t lb_t / dist_t own_t
用途 入站队列 出站队列 销毁列表
Socket 类型 DEALER, PULL, ROUTER DEALER, PUSH, CLIENT 所有 Socket
算法 Fair Queuing Round-Robin 延迟销毁
生命周期 连接建立 → 断开 连接建立 → 断开 Socket 关闭

核心设计思想

  1. 多重继承:让同一个对象可以存在于多个数组
  2. 模板 ID:编译时区分不同的数组
  3. 零开销:索引直接存储在对象中
  4. 类型安全:编译器检查 ID 匹配

一句话:三个 array_item_t 继承让 pipe_t 可以同时拥有三个身份,分别用于接收发送销毁!🎯

2. 核心成员变量

2.1 底层无锁队列
typedef ypipe_base_t<msg_t> upipe_t;

upipe_t *_in_pipe;   // ← 入站队列(从网络接收)
upipe_t *_out_pipe;  // ← 出站队列(发送到网络)

结构

Pipe 对象:
┌─────────────────────────┐
│ _in_pipe (ypipe_t)      │ ← 从 Peer 接收消息
│  [msg1, msg2, msg3...]  │
├─────────────────────────┤
│ _out_pipe (ypipe_t)     │ ← 向 Peer 发送消息
│  [msgA, msgB, msgC...]  │
└─────────────────────────┘

2.2 活跃状态
bool _in_active;   // 入站是否活跃
bool _out_active;  // 出站是否活跃

用途

  • _in_active = false:队列空,等待 read_activated 事件
  • _out_active = false:达到 HWM,等待 write_activated 事件

2.3 水位线
int _hwm;  // 出站高水位(outbound)
int _lwm;  // 入站低水位(inbound)

int _in_hwm_boost;   // inproc 的 HWM 增强
int _out_hwm_boost;  // inproc 的 HWM 增强

计算

int zmq::pipe_t::compute_lwm (int hwm_)
{
    // LWM = HWM 的 50%(至少为 1)
    return (hwm_ > 0) ? (hwm_ + 1) / 2 : 0;
}

效果

HWM = 1000
LWM = 500

发送 1000 条消息 → 达到 HWM,阻塞
接收端读取 500 条 → 降到 LWM,发送 activate_write
发送端恢复发送

2.4 消息计数
uint64_t _msgs_read;       // 已读取消息数
uint64_t _msgs_written;    // 已写入消息数
uint64_t _peers_msgs_read; // 对端已读取的消息数

用途

  • 统计信息
  • 流控:每读取 LWM 条消息,通知对端

2.5 对端引用
pipe_t *_peer;  // 配对的 Pipe 对象

关系

Pipe A ←→ Pipe B
  ↓        ↓
Peer=B    Peer=A

2.6 事件接收者
i_pipe_events *_sink;  // 事件回调接口

通常是

  • Socket 对象(socket_base_t
  • Session 对象(session_base_t

2.7 Pipe 状态机
enum {
    active,              // 正常状态
    delimiter_received,  // 收到分隔符
    waiting_for_delimiter, // 等待分隔符
    term_ack_sent,       // 已发送终止确认
    term_req_sent1,      // 用户请求终止
    term_req_sent2       // 双方都请求终止
} _state;

状态转换图

active
  ↓ (收到 delimiter)
delimiter_received
  ↓ (收到 term 命令)
term_ack_sent → 销毁

active
  ↓ (收到 term 命令)
waiting_for_delimiter
  ↓ (读完所有消息)
term_ack_sent → 销毁

active
  ↓ (用户调用 terminate)
term_req_sent1
  ↓ (收到 peer 的 term 命令)
term_req_sent2 → 销毁

2.8 其他成员
bool _delay;  // true=延迟终止(读完所有消息)
              // false=立即终止

bool _conflate;  // true=只保留最新消息(合并旧消息)

blob_t _router_socket_routing_id;  // ROUTER 路由 ID
int _server_socket_routing_id;     // SERVER 路由 ID

endpoint_uri_pair_t _endpoint_pair; // 端点对

msg_t _disconnect_msg;  // 断开连接时发送的消息

3. 核心方法详解

3.1 创建 Pipe
// pipe.cpp 第 41 行
int zmq::pipepair (object_t *parents_[2],
                   pipe_t *pipes_[2],
                   const int hwms_[2],
                   const bool conflate_[2])
{
    // 1. 创建两种 ypipe
    typedef ypipe_t<msg_t, message_pipe_granularity> upipe_normal_t;
    typedef ypipe_conflate_t<msg_t> upipe_conflate_t;
    
    // 2. 创建两个 ypipe(方向相反)
    upipe1 = conflate[0] ? new upipe_conflate_t () 
                         : new upipe_normal_t ();
    upipe2 = conflate[1] ? new upipe_conflate_t () 
                         : new upipe_normal_t ();
    
    // 3. 创建两个 Pipe 对象
    pipes_[0] = new pipe_t (parents_[0], upipe1, upipe2, 
                            hwms_[1], hwms_[0], conflate_[0]);
    pipes_[1] = new pipe_t (parents_[1], upipe2, upipe1, 
                            hwms_[0], hwms_[1], conflate_[1]);
    
    // 4. 互相设置 Peer
    pipes_[0]->set_peer (pipes_[1]);
    pipes_[1]->set_peer (pipes_[0]);
    
    return 0;
}

结构

Pipe[0] (Socket 端)          Pipe[1] (Session 端)
┌─────────────────┐         ┌─────────────────┐
│ _in_pipe = upipe1│ ←─────→ │ _out_pipe=upipe1│
│ _out_pipe=upipe2 │ ──────→ │ _in_pipe =upipe2│
└─────────────────┘         └─────────────────┘
      ↓                           ↓
   Socket                      Session

3.2 读取消息
// pipe.cpp 第 193 行
bool zmq::pipe_t::read (msg_t *msg_)
{
    // 1. 检查状态
    if (!_in_active || (_state != active && _state != waiting_for_delimiter))
        return false;
    
    // 2. 从队列读取
    while (true) {
        if (!_in_pipe->read (msg_)) {
            _in_active = false;
            return false;
        }
        
        // 跳过凭证消息
        if (msg_->is_credential ()) {
            msg_->close ();
            continue;
        }
        break;
    }
    
    // 3. 检查分隔符
    if (msg_->is_delimiter ()) {
        process_delimiter ();
        return false;
    }
    
    // 4. 更新计数
    if (!(msg_->flags () & msg_t::more) && !msg_->is_routing_id ())
        _msgs_read++;
    
    // 5. 流控:每读取 LWM 条消息,通知对端
    if (_lwm > 0 && _msgs_read % _lwm == 0)
        send_activate_write (_peer, _msgs_read);
    
    return true;
}

流程

读取消息
    ↓
检查活跃状态
    ↓
从_in_pipe 读取
    ↓
是分隔符?→ 启动终止流程
    ↓
更新_msgs_read
    ↓
_msgs_read % _lwm == 0?
    ├─ 是 → send_activate_write(_peer)
    └─ 否 → 返回

3.3 写入消息
// pipe.cpp 第 244 行
bool zmq::pipe_t::write (const msg_t *msg_)
{
    // 1. 检查是否可写
    if (!check_write ())
        return false;
    
    // 2. 写入出站队列
    const bool more = (msg_->flags () & msg_t::more) != 0;
    const bool is_routing_id = msg_->is_routing_id ();
    
    _out_pipe->write (*msg_, more);
    
    // 3. 更新计数
    if (!more && !is_routing_id)
        _msgs_written++;
    
    return true;
}

// pipe.cpp 第 233 行
bool zmq::pipe_t::check_write ()
{
    if (!_out_active || _state != active)
        return false;
    
    // 检查 HWM
    const bool full = !check_hwm ();
    if (full) {
        _out_active = false;
        return false;
    }
    
    return true;
}

流程

写入消息
    ↓
检查_out_active
    ↓
检查 HWM
    ├─ 达到 HWM → _out_active=false, 返回 false
    └─ 未达 HWM ↓
        写入_out_pipe
        ↓
        更新_msgs_written
        ↓
        返回 true

3.4 Flush 操作
// pipe.cpp 第 267 行
void zmq::pipe_t::flush ()
{
    // 1. 检查状态
    if (_state == term_ack_sent)
        return;
    
    // 2. 刷新出站队列
    if (_out_pipe && !_out_pipe->flush ()) {
        // 3. 如果读者休眠,发送激活命令
        send_activate_read (_peer);
    }
}

作用

  • ✅ 将消息标记为"已完成"
  • ✅ 唤醒休眠的读者(I/O 线程)

3.5 终止 Pipe
// pipe.cpp 第 411 行
void zmq::pipe_t::terminate (bool delay_)
{
    // 1. 覆盖延迟设置
    _delay = delay_;
    
    // 2. 如果已经终止,忽略
    if (_state == term_req_sent1 || _state == term_req_sent2)
        return;
    
    // 3. 如果正在等待分隔符,直接确认
    if (_state == waiting_for_delimiter) {
        _state = term_ack_sent;
        _out_pipe = NULL;
        send_pipe_term_ack (_peer);
        return;
    }
    
    // 4. 进入 term_req_sent1 状态
    _state = term_req_sent1;
    
    // 5. 如果没有待处理消息,立即发送确认
    if (!_in_pipe->check_read ()) {
        _out_pipe = NULL;
        send_pipe_term_ack (_peer);
    }
}

流程

用户调用 terminate()
    ↓
设置_state = term_req_sent1
    ↓
检查是否有待读消息
    ├─ 有 → 等待读完
    └─ 无 → 发送 pipe_term_ack
        ↓
        等待 peer 确认
        ↓
        销毁

4. 命令处理

pipe_t 继承自 object_t,可以接收命令:

// 命令处理函数
void process_activate_read ();           // 激活读取
void process_activate_write (uint64_t);  // 激活写入
void process_hiccup (void *);            // 管道打嗝
void process_pipe_term ();               // 终止 Pipe
void process_pipe_term_ack ();           // 终止确认
void process_pipe_hwm (int, int);        // 更新 HWM

4.1 process_activate_read
void zmq::pipe_t::process_activate_read ()
{
    if (!_in_active && (_state == active || _state == waiting_for_delimiter)) {
        _in_active = true;
        _sink->read_activated (this);  // ← 通知 Socket
    }
}

触发时机

  • Peer 写入消息后 flush() 发现读者休眠

4.2 process_activate_write
void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
{
    // 记录对端读取进度
    _peers_msgs_read = msgs_read_;
    
    // 恢复写入
    if (!_out_active && _state == active) {
        _out_active = true;
        _sink->write_activated (this);  // ← 通知 Socket
    }
}

触发时机

  • 对端读取了 LWM 条消息

4.3 process_pipe_term
void zmq::pipe_t::process_pipe_term ()
{
    if (_state == active) {
        if (_delay)
            _state = waiting_for_delimiter;  // 读完再终止
        else {
            _state = term_ack_sent;  // 立即终止
            _out_pipe = NULL;
            send_pipe_term_ack (_peer);
        }
    }
    else if (_state == delimiter_received) {
        _state = term_ack_sent;
        send_pipe_term_ack (_peer);
    }
    else if (_state == term_req_sent1) {
        _state = term_req_sent2;
        send_pipe_term_ack (_peer);
    }
}

4.4 process_pipe_term_ack
void zmq::pipe_t::process_pipe_term_ack ()
{
    // 1. 通知事件接收者
    _sink->pipe_terminated (this);
    
    // 2. 清理未读消息
    if (!_conflate) {
        msg_t msg;
        while (_in_pipe->read (&msg)) {
            msg.close ();
        }
    }
    
    // 3. 销毁入站队列
    LIBZMQ_DELETE (_in_pipe);
    
    // 4. 自毁
    delete this;
}

5. Pipe 状态机详解

┌─────────────────────────────────────────────────────────────┐
│                      pipe_t 状态机                           │
└─────────────────────────────────────────────────────────────┘

状态 1: active (正常状态)
┌───────────────────────────────────────────────┐
│ - 可以正常读写                                 │
│ - 处理消息流                                   │
│                                               │
│ 转移:                                        │
│   - 收到 delimiter → delimiter_received        │
│   - 收到 term 命令 → waiting_for_delimiter     │
│   - 用户 terminate() → term_req_sent1          │
└───────────────────────────────────────────────┘
              ↓
状态 2: delimiter_received (收到分隔符)
┌───────────────────────────────────────────────┐
│ - Peer 已发送终止分隔符                        │
│ - 等待 term 命令                                │
│                                               │
│ 转移:                                        │
│   - 收到 term 命令 → term_ack_sent             │
└───────────────────────────────────────────────┘
              ↓
状态 3: waiting_for_delimiter (等待分隔符)
┌───────────────────────────────────────────────┐
│ - 已收到 term 命令                              │
│ - 继续读取剩余消息(_delay=true)              │
│                                               │
│ 转移:                                        │
│   - 读完所有消息 → term_ack_sent               │
│   - 收到 delimiter → term_ack_sent             │
└───────────────────────────────────────────────┘
              ↓
状态 4: term_ack_sent (已发送确认)
┌───────────────────────────────────────────────┐
│ - 所有消息已处理完毕                           │
│ - 等待销毁                                     │
│                                               │
│ 转移:                                        │
│   - process_pipe_term_ack() → 销毁            │
└───────────────────────────────────────────────┘

状态 5: term_req_sent1 (用户请求终止)
┌───────────────────────────────────────────────┐
│ - 用户调用 terminate()                         │
│ - 等待 Peer 确认                                │
│                                               │
│ 转移:                                        │
│   - 收到 term 命令 → term_req_sent2            │
│   - 无待读消息 → 发送 ack → term_ack_sent      │
└───────────────────────────────────────────────┘
              ↓
状态 6: term_req_sent2 (双方都请求终止)
┌───────────────────────────────────────────────┐
│ - 双方都请求终止                               │
│ - 立即销毁                                     │
│                                               │
│ 转移:                                        │
│   - process_pipe_term_ack() → 销毁            │
└───────────────────────────────────────────────┘

6. Pipe 的生命周期

创建阶段:
┌─────────────────────────────────────┐
│ pipepair()                          │
│  - 创建两个 ypipe                    │
│  - 创建两个 pipe_t                  │
│  - 互相设置 peer                     │
└─────────────────────────────────────┘
         ↓
使用阶段:
┌─────────────────────────────────────┐
│ Socket ↔ Pipe ↔ Session            │
│  - write() 写入消息                  │
│  - flush() 刷新到对端                │
│  - read() 从对端读取                 │
│  - 流控:HWM/LWM                    │
└─────────────────────────────────────┘
         ↓
终止阶段:
┌─────────────────────────────────────┐
│ terminate() / process_pipe_term()  │
│  - 进入终止状态                      │
│  - 处理剩余消息                      │
│  - 发送/接收确认                     │
└─────────────────────────────────────┘
         ↓
销毁阶段:
┌─────────────────────────────────────┐
│ process_pipe_term_ack()            │
│  - 通知_pipe_terminated 事件          │
│  - 清理未读消息                      │
│  - 删除_in_pipe                     │
│  - delete this                      │
└─────────────────────────────────────┘

7. Pipe 与 Socket 的关系

Socket (DEALER)
┌─────────────────────────┐
│ _pipes: [pipe1, pipe2]  │
│ _fq: 入站队列            │
│ _lb: 出站队列            │
└───────────┬─────────────┘
            │ i_pipe_events
            ↓
      ┌───────────┐
      │  pipe1    │
      │ _sink     │ ← 指向 Socket
      │ _peer     │ ← 指向 Session 的 pipe
      └─────┬─────┘
            │
            ↓
      Session
      ┌───────────┐
      │  pipe     │
      │ _engine   │ ← 网络 Socket
      └───────────┘

8. 总结

方面 详情
本质 双向无锁消息队列
组成 _in_pipe + _out_pipe
身份 3 个 array_item_t 继承
状态 6 种状态的状态机
流控 HWM/LWM 背压机制
生命周期 创建 → 使用 → 终止 → 销毁
线程模型 应用线程写,I/O 线程读

核心设计

  1. 无锁队列:高性能消息传递
  2. 背压机制:防止内存爆炸
  3. 状态机:优雅处理终止
  4. 事件驱动:高效通知机制
  5. 多身份:同时属于多个数组

一句话pipe_t 是 ZeroMQ 的智能消息通道,连接应用层和网络层,实现高效、可靠的消息传递!🎯

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐