zmq源码分析之pipe_t
·
文章目录
1. pipe_t 是什么?
定义:pipe_t 是 ZeroMQ 中两个对象之间的双向通信通道。
核心作用:
- ✅ 连接 Socket 和 Session
- ✅ 提供无锁消息队列
- ✅ 实现背压(Back Pressure)
- ✅ 管理消息流控(HWM)
- ✅ 处理 Pipe 生命周期
继承关系:
class pipe_t : public object_t, // 命令处理基类
public array_item_t<1>, // 入站数组身份
public array_item_t<2>, // 出站数组身份
public array_item_t<3> // 销毁数组身份
pipe_t 继承的三个 array_item_t 详解
1. 代码定义
// pipe.hpp 第 73-77 行
class pipe_t ZMQ_FINAL : public object_t,
public array_item_t<1>, // ← ID=1
public array_item_t<2>, // ← ID=2
public array_item_t<3> // ← ID=3
{
// ...
};
注释说明(第 70-72 行):
// Note that pipe can be stored in three different arrays.
// The array of inbound pipes (1), the array of outbound pipes (2) and
// the generic array of pipes to be deallocated (3).
翻译:
注意,pipe 可以存储在三个不同的数组中:
- 入站管道数组 (ID=1)
- 出站管道数组 (ID=2)
- 待销毁管道数组 (ID=3)
2. 为什么需要三个继承?
问题:同一个 pipe_t 对象需要同时存在于多个不同的数组中。
场景:
// DEALER Socket
class dealer_t {
fq_t _fq; // 入站公平队列
lb_t _lb; // 出站负载均衡
};
// 一个 Pipe 同时属于:
// 1. _fq._pipes (用于接收消息)
// 2. _lb._pipes (用于发送消息)
// 3. socket_base_t 的销毁列表 (Socket 关闭时)
如果不使用多个继承:
// 错误的设计
class pipe_t {
int _index; // ❌ 只能存储一个索引
};
// 当 Pipe 同时加入 _fq 和 _lb 时:
pipe->_index = 0; // 在_fq 中的索引
pipe->_index = 2; // ❌ 覆盖了!在_lb 中的索引丢失
正确的设计:
// 使用模板继承
class pipe_t : public array_item_t<1>, // 身份 1
public array_item_t<2>, // 身份 2
public array_item_t<3> // 身份 3
{
// 每个基类有自己的 _index 成员
};
// 现在可以同时存储 3 个索引:
pipe->array_item_t<1>::_index = 0; // 在_fq 中排第 1
pipe->array_item_t<2>::_index = 2; // 在_lb 中排第 3
pipe->array_item_t<3>::_index = 1; // 在销毁列表中排第 2
3. array_item_t 的实现
// array.hpp 第 53-67 行
template <int ID = 0> class array_item_t
{
public:
array_item_t () : _array_index (-1) {}
virtual ~array_item_t () ZMQ_DEFAULT;
void set_array_index (int index_) {
_array_index = index_;
}
int get_array_index () const {
return _array_index;
}
private:
int _array_index; // ← 存储在该数组中的索引
ZMQ_NON_COPYABLE_NOR_MOVABLE (array_item_t)
};
关键点:
- ✅ 每个
array_item_t<ID>都有自己独立的_array_index成员 - ✅ 模板参数
ID用于区分不同的数组 - ✅ 初始值为
-1(表示不在数组中)
4. 内存布局
pipe_t 对象的内存布局:
┌─────────────────────────────┐
│ object_t 部分 │
│ (对象基类,含命令队列等) │
├─────────────────────────────┤
│ array_item_t<1> 部分 │
│ _array_index = 0 │ ← 在 fq_t 中的索引
├─────────────────────────────┤
│ array_item_t<2> 部分 │
│ _array_index = 2 │ ← 在 lb_t 中的索引
├─────────────────────────────┤
│ array_item_t<3> 部分 │
│ _array_index = 1 │ ← 在销毁列表中的索引
├─────────────────────────────┤
│ pipe_t 自己的成员 │
│ _in_pipe, _out_pipe │
│ _event_sink, _state... │
└─────────────────────────────┘
总大小:约 100-150 bytes
示意图:
同一个 pipe_t 对象:
┌──────────────────────┐
│ pipe_t │
│ ┌──────────────────┐ │
│ │ array_item<1> │ │ ← index=0 (在_fq 中)
│ ├──────────────────┤ │
│ │ array_item<2> │ │ ← index=2 (在_lb 中)
│ ├──────────────────┤ │
│ │ array_item<3> │ │ ← index=1 (在销毁列表)
│ └──────────────────┘ │
└──────────────────────┘
↓
┌───────────┴───────────┬───────────────┐
↓ ↓ ↓
┌─────────┐ ┌─────────┐ ┌──────────┐
│ fq_t │ │ lb_t │ │ own_t │
│ _pipes │ │ _pipes │ │_children │
│ [Pipe] │ │ [A,B,Pipe]│ │ [Pipe,...]│
│ ID=1 │ │ ID=2 │ │ ID=3 │
└─────────┘ └─────────┘ └──────────┘
5. 三个 ID 的具体用途
ID=1:入站管道数组
使用者:fq_t (Fair Queuing)
// fq.hpp
class fq_t
{
private:
typedef array_t<pipe_t, 1> pipes_t; // ← 使用 ID=1
pipes_t _pipes;
};
使用场景:
- DEALER、ROUTER、PULL 等 Socket 的接收队列
- 公平队列算法,避免某个连接独占
代码示例:
// dealer.cpp
void zmq::dealer_t::xattach_pipe (pipe_t *pipe_)
{
_fq.attach (pipe_); // ← 加入入站队列 (ID=1)
_lb.attach (pipe_); // ← 加入出站队列 (ID=2)
}
// fq.cpp
void zmq::fq_t::attach (pipe_t *pipe_)
{
_pipes.push_back (pipe_); // ← 调用 array_t::push_back
activated (pipe_);
}
// array.hpp
void push_back (T *item_)
{
if (item_)
// ★★★ 设置 ID=1 的索引 ★★★
static_cast<array_item_t<1> *> (item_)
->set_array_index (_items.size ());
_items.push_back (item_);
}
ID=2:出站管道数组
使用者:lb_t (Load Balancer)、dist_t (Distributor)
// lb.hpp
class lb_t
{
private:
typedef array_t<pipe_t, 2> pipes_t; // ← 使用 ID=2
pipes_t _pipes;
};
// dist.hpp
class dist_t
{
private:
typedef array_t<pipe_t, 2> pipes_t; // ← 也使用 ID=2
pipes_t _pipes;
};
使用场景:
- DEALER、PUSH、CLIENT 等 Socket 的发送队列
- 负载均衡算法,轮流发送
代码示例:
// lb.cpp
void zmq::lb_t::attach (pipe_t *pipe_)
{
_pipes.push_back (pipe_); // ← 调用 array_t::push_back
activated (pipe_);
}
// array.hpp
void push_back (T *item_)
{
if (item_)
// ★★★ 设置 ID=2 的索引 ★★★
static_cast<array_item_t<2> *> (item_)
->set_array_index (_items.size ());
_items.push_back (item_);
}
注意:lb_t 和 dist_t 都用 ID=2,但它们不会同时出现在同一个 Socket 中
ID=3:待销毁管道数组
使用者:own_t (Ownership 基类)
// own.hpp
class own_t : public object_t
{
private:
// 子对象列表(包括 Pipe)
array_t<own_t, 3> children; // ← 使用 ID=3
};
使用场景:
- Socket 关闭时,需要销毁所有关联的 Pipe
- 延迟销毁机制
代码示例:
// socket_base.cpp
void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
{
// 从 _pipes 列表移除
_pipes.erase (pipe_);
// 如果 Socket 正在终止,确认销毁
if (is_terminating ())
unregister_term_ack ();
}
// own.cpp
void zmq::own_t::process_term (int linger_)
{
// 终止所有子对象
for (auto &child : children) {
child->terminate ();
}
}
6. 实际工作流程
场景:DEALER Socket 有 2 个连接
// 创建 DEALER
void *dealer = zmq_socket(ctx, ZMQ_DEALER);
// 连接第 1 个服务端
zmq_connect(dealer, "tcp://server1:5555");
// 创建 pipe1
// pipe1->array_item<1>::_index = 0 (在_fq 中)
// pipe1->array_item<2>::_index = 0 (在_lb 中)
// 连接第 2 个服务端
zmq_connect(dealer, "tcp://server2:5555");
// 创建 pipe2
// pipe2->array_item<1>::_index = 1 (在_fq 中)
// pipe2->array_item<2>::_index = 1 (在_lb 中)
// 内存状态:
pipe1 对象:
array_item<1>::_index = 0
array_item<2>::_index = 0
array_item<3>::_index = -1 (不在销毁列表)
pipe2 对象:
array_item<1>::_index = 1
array_item<2>::_index = 1
array_item<3>::_index = -1
// _fq._pipes: [pipe1, pipe2]
// _lb._pipes: [pipe1, pipe2]
场景:Socket 关闭
// 关闭 Socket
zmq_close(dealer);
// 内部流程:
// 1. Socket 标记为终止
// 2. 将所有 Pipe 加入销毁列表 (ID=3)
// 3. 终止所有 Pipe
pipe1->array_item<3>::_index = 0 (在销毁列表中)
pipe2->array_item<3>::_index = 1
// 4. 等待所有 Pipe 确认销毁
// 5. 销毁 Socket 自身
7. array_t 如何使用这些索引
// array.hpp
template <typename T, int ID = 0> class array_t
{
public:
void push_back (T *item_)
{
if (item_)
// 设置索引(使用对应的 ID)
static_cast<array_item_t<ID> *> (item_)
->set_array_index (_items.size ());
_items.push_back (item_);
}
void erase (size_type index_)
{
if (_items.empty ())
return;
// 更新最后一个元素的索引
static_cast<array_item_t<ID> *> (_items.back ())
->set_array_index (static_cast<int> (index_));
// 用最后一个元素覆盖要删除的元素
_items[index_] = _items.back ();
_items.pop_back ();
}
void swap (size_type index1_, size_type index2_)
{
// 更新两个元素的索引
if (_items[index1_])
static_cast<array_item_t<ID> *> (_items[index1_])
->set_array_index (static_cast<int> (index2_));
if (_items[index2_])
static_cast<array_item_t<ID> *> (_items[index2_])
->set_array_index (static_cast<int> (index1_));
std::swap (_items[index1_], _items[index2_]);
}
static size_type index (T *item_)
{
return static_cast<size_type> (
static_cast<array_item_t<ID> *> (item_)->get_array_index ()
);
}
private:
std::vector<T *> _items;
};
关键点:
- ✅ 所有操作都通过
static_cast<array_item_t<ID>*>访问对应的索引 - ✅
ID是模板参数,编译时确定 - ✅ 不同 ID 的
array_t互不干扰
8. 对比:使用 vs 不使用多重继承
方案 A:使用多重继承(实际方案)
class pipe_t : public array_item_t<1>,
public array_item_t<2>,
public array_item_t<3>
{
};
// 使用
fq_t _fq; // 使用 array_item<1>
lb_t _lb; // 使用 array_item<2>
own_t own; // 使用 array_item<3>
// 优点:
// ✅ 类型安全:编译器检查 ID 匹配
// ✅ 零开销:索引直接存储在对象中
// ✅ 代码复用:array_t 通用
方案 B:使用独立成员变量
class pipe_t {
int _fq_index; // 在 fq_t 中的索引
int _lb_index; // 在 lb_t 中的索引
int _own_index; // 在 own_t 中的索引
friend class fq_t;
friend class lb_t;
friend class own_t;
};
// 使用
void fq_t::attach(pipe_t *p) {
p->_fq_index = _pipes.size ();
_pipes.push_back (p);
}
// 缺点:
// ❌ 需要友元声明
// ❌ 容易混淆(哪个索引对应哪个数组)
// ❌ 不够通用
方案 C:使用外部映射
class fq_t {
std::map<pipe_t*, int> _indices;
std::vector<pipe_t*> _pipes;
};
void fq_t::attach(pipe_t *p) {
_indices[p] = _pipes.size ();
_pipes.push_back (p);
}
// 缺点:
// ❌ 额外的内存开销(map)
// ❌ 查找开销(O(log n))
// ❌ 缓存不友好
9. 总结
| 方面 | ID=1 | ID=2 | ID=3 |
|---|---|---|---|
| 使用者 | fq_t |
lb_t / dist_t |
own_t |
| 用途 | 入站队列 | 出站队列 | 销毁列表 |
| Socket 类型 | DEALER, PULL, ROUTER | DEALER, PUSH, CLIENT | 所有 Socket |
| 算法 | Fair Queuing | Round-Robin | 延迟销毁 |
| 生命周期 | 连接建立 → 断开 | 连接建立 → 断开 | Socket 关闭 |
核心设计思想:
- ✅ 多重继承:让同一个对象可以存在于多个数组
- ✅ 模板 ID:编译时区分不同的数组
- ✅ 零开销:索引直接存储在对象中
- ✅ 类型安全:编译器检查 ID 匹配
一句话:三个 array_item_t 继承让 pipe_t 可以同时拥有三个身份,分别用于接收、发送和销毁!🎯
2. 核心成员变量
2.1 底层无锁队列
typedef ypipe_base_t<msg_t> upipe_t;
upipe_t *_in_pipe; // ← 入站队列(从网络接收)
upipe_t *_out_pipe; // ← 出站队列(发送到网络)
结构:
Pipe 对象:
┌─────────────────────────┐
│ _in_pipe (ypipe_t) │ ← 从 Peer 接收消息
│ [msg1, msg2, msg3...] │
├─────────────────────────┤
│ _out_pipe (ypipe_t) │ ← 向 Peer 发送消息
│ [msgA, msgB, msgC...] │
└─────────────────────────┘
2.2 活跃状态
bool _in_active; // 入站是否活跃
bool _out_active; // 出站是否活跃
用途:
_in_active = false:队列空,等待read_activated事件_out_active = false:达到 HWM,等待write_activated事件
2.3 水位线
int _hwm; // 出站高水位(outbound)
int _lwm; // 入站低水位(inbound)
int _in_hwm_boost; // inproc 的 HWM 增强
int _out_hwm_boost; // inproc 的 HWM 增强
计算:
int zmq::pipe_t::compute_lwm (int hwm_)
{
// LWM = HWM 的 50%(至少为 1)
return (hwm_ > 0) ? (hwm_ + 1) / 2 : 0;
}
效果:
HWM = 1000
LWM = 500
发送 1000 条消息 → 达到 HWM,阻塞
接收端读取 500 条 → 降到 LWM,发送 activate_write
发送端恢复发送
2.4 消息计数
uint64_t _msgs_read; // 已读取消息数
uint64_t _msgs_written; // 已写入消息数
uint64_t _peers_msgs_read; // 对端已读取的消息数
用途:
- 统计信息
- 流控:每读取 LWM 条消息,通知对端
2.5 对端引用
pipe_t *_peer; // 配对的 Pipe 对象
关系:
Pipe A ←→ Pipe B
↓ ↓
Peer=B Peer=A
2.6 事件接收者
i_pipe_events *_sink; // 事件回调接口
通常是:
- Socket 对象(
socket_base_t) - Session 对象(
session_base_t)
2.7 Pipe 状态机
enum {
active, // 正常状态
delimiter_received, // 收到分隔符
waiting_for_delimiter, // 等待分隔符
term_ack_sent, // 已发送终止确认
term_req_sent1, // 用户请求终止
term_req_sent2 // 双方都请求终止
} _state;
状态转换图:
active
↓ (收到 delimiter)
delimiter_received
↓ (收到 term 命令)
term_ack_sent → 销毁
active
↓ (收到 term 命令)
waiting_for_delimiter
↓ (读完所有消息)
term_ack_sent → 销毁
active
↓ (用户调用 terminate)
term_req_sent1
↓ (收到 peer 的 term 命令)
term_req_sent2 → 销毁
2.8 其他成员
bool _delay; // true=延迟终止(读完所有消息)
// false=立即终止
bool _conflate; // true=只保留最新消息(合并旧消息)
blob_t _router_socket_routing_id; // ROUTER 路由 ID
int _server_socket_routing_id; // SERVER 路由 ID
endpoint_uri_pair_t _endpoint_pair; // 端点对
msg_t _disconnect_msg; // 断开连接时发送的消息
3. 核心方法详解
3.1 创建 Pipe
// pipe.cpp 第 41 行
int zmq::pipepair (object_t *parents_[2],
pipe_t *pipes_[2],
const int hwms_[2],
const bool conflate_[2])
{
// 1. 创建两种 ypipe
typedef ypipe_t<msg_t, message_pipe_granularity> upipe_normal_t;
typedef ypipe_conflate_t<msg_t> upipe_conflate_t;
// 2. 创建两个 ypipe(方向相反)
upipe1 = conflate[0] ? new upipe_conflate_t ()
: new upipe_normal_t ();
upipe2 = conflate[1] ? new upipe_conflate_t ()
: new upipe_normal_t ();
// 3. 创建两个 Pipe 对象
pipes_[0] = new pipe_t (parents_[0], upipe1, upipe2,
hwms_[1], hwms_[0], conflate_[0]);
pipes_[1] = new pipe_t (parents_[1], upipe2, upipe1,
hwms_[0], hwms_[1], conflate_[1]);
// 4. 互相设置 Peer
pipes_[0]->set_peer (pipes_[1]);
pipes_[1]->set_peer (pipes_[0]);
return 0;
}
结构:
Pipe[0] (Socket 端) Pipe[1] (Session 端)
┌─────────────────┐ ┌─────────────────┐
│ _in_pipe = upipe1│ ←─────→ │ _out_pipe=upipe1│
│ _out_pipe=upipe2 │ ──────→ │ _in_pipe =upipe2│
└─────────────────┘ └─────────────────┘
↓ ↓
Socket Session
3.2 读取消息
// pipe.cpp 第 193 行
bool zmq::pipe_t::read (msg_t *msg_)
{
// 1. 检查状态
if (!_in_active || (_state != active && _state != waiting_for_delimiter))
return false;
// 2. 从队列读取
while (true) {
if (!_in_pipe->read (msg_)) {
_in_active = false;
return false;
}
// 跳过凭证消息
if (msg_->is_credential ()) {
msg_->close ();
continue;
}
break;
}
// 3. 检查分隔符
if (msg_->is_delimiter ()) {
process_delimiter ();
return false;
}
// 4. 更新计数
if (!(msg_->flags () & msg_t::more) && !msg_->is_routing_id ())
_msgs_read++;
// 5. 流控:每读取 LWM 条消息,通知对端
if (_lwm > 0 && _msgs_read % _lwm == 0)
send_activate_write (_peer, _msgs_read);
return true;
}
流程:
读取消息
↓
检查活跃状态
↓
从_in_pipe 读取
↓
是分隔符?→ 启动终止流程
↓
更新_msgs_read
↓
_msgs_read % _lwm == 0?
├─ 是 → send_activate_write(_peer)
└─ 否 → 返回
3.3 写入消息
// pipe.cpp 第 244 行
bool zmq::pipe_t::write (const msg_t *msg_)
{
// 1. 检查是否可写
if (!check_write ())
return false;
// 2. 写入出站队列
const bool more = (msg_->flags () & msg_t::more) != 0;
const bool is_routing_id = msg_->is_routing_id ();
_out_pipe->write (*msg_, more);
// 3. 更新计数
if (!more && !is_routing_id)
_msgs_written++;
return true;
}
// pipe.cpp 第 233 行
bool zmq::pipe_t::check_write ()
{
if (!_out_active || _state != active)
return false;
// 检查 HWM
const bool full = !check_hwm ();
if (full) {
_out_active = false;
return false;
}
return true;
}
流程:
写入消息
↓
检查_out_active
↓
检查 HWM
├─ 达到 HWM → _out_active=false, 返回 false
└─ 未达 HWM ↓
写入_out_pipe
↓
更新_msgs_written
↓
返回 true
3.4 Flush 操作
// pipe.cpp 第 267 行
void zmq::pipe_t::flush ()
{
// 1. 检查状态
if (_state == term_ack_sent)
return;
// 2. 刷新出站队列
if (_out_pipe && !_out_pipe->flush ()) {
// 3. 如果读者休眠,发送激活命令
send_activate_read (_peer);
}
}
作用:
- ✅ 将消息标记为"已完成"
- ✅ 唤醒休眠的读者(I/O 线程)
3.5 终止 Pipe
// pipe.cpp 第 411 行
void zmq::pipe_t::terminate (bool delay_)
{
// 1. 覆盖延迟设置
_delay = delay_;
// 2. 如果已经终止,忽略
if (_state == term_req_sent1 || _state == term_req_sent2)
return;
// 3. 如果正在等待分隔符,直接确认
if (_state == waiting_for_delimiter) {
_state = term_ack_sent;
_out_pipe = NULL;
send_pipe_term_ack (_peer);
return;
}
// 4. 进入 term_req_sent1 状态
_state = term_req_sent1;
// 5. 如果没有待处理消息,立即发送确认
if (!_in_pipe->check_read ()) {
_out_pipe = NULL;
send_pipe_term_ack (_peer);
}
}
流程:
用户调用 terminate()
↓
设置_state = term_req_sent1
↓
检查是否有待读消息
├─ 有 → 等待读完
└─ 无 → 发送 pipe_term_ack
↓
等待 peer 确认
↓
销毁
4. 命令处理
pipe_t 继承自 object_t,可以接收命令:
// 命令处理函数
void process_activate_read (); // 激活读取
void process_activate_write (uint64_t); // 激活写入
void process_hiccup (void *); // 管道打嗝
void process_pipe_term (); // 终止 Pipe
void process_pipe_term_ack (); // 终止确认
void process_pipe_hwm (int, int); // 更新 HWM
4.1 process_activate_read
void zmq::pipe_t::process_activate_read ()
{
if (!_in_active && (_state == active || _state == waiting_for_delimiter)) {
_in_active = true;
_sink->read_activated (this); // ← 通知 Socket
}
}
触发时机:
- Peer 写入消息后
flush()发现读者休眠
4.2 process_activate_write
void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
{
// 记录对端读取进度
_peers_msgs_read = msgs_read_;
// 恢复写入
if (!_out_active && _state == active) {
_out_active = true;
_sink->write_activated (this); // ← 通知 Socket
}
}
触发时机:
- 对端读取了 LWM 条消息
4.3 process_pipe_term
void zmq::pipe_t::process_pipe_term ()
{
if (_state == active) {
if (_delay)
_state = waiting_for_delimiter; // 读完再终止
else {
_state = term_ack_sent; // 立即终止
_out_pipe = NULL;
send_pipe_term_ack (_peer);
}
}
else if (_state == delimiter_received) {
_state = term_ack_sent;
send_pipe_term_ack (_peer);
}
else if (_state == term_req_sent1) {
_state = term_req_sent2;
send_pipe_term_ack (_peer);
}
}
4.4 process_pipe_term_ack
void zmq::pipe_t::process_pipe_term_ack ()
{
// 1. 通知事件接收者
_sink->pipe_terminated (this);
// 2. 清理未读消息
if (!_conflate) {
msg_t msg;
while (_in_pipe->read (&msg)) {
msg.close ();
}
}
// 3. 销毁入站队列
LIBZMQ_DELETE (_in_pipe);
// 4. 自毁
delete this;
}
5. Pipe 状态机详解
┌─────────────────────────────────────────────────────────────┐
│ pipe_t 状态机 │
└─────────────────────────────────────────────────────────────┘
状态 1: active (正常状态)
┌───────────────────────────────────────────────┐
│ - 可以正常读写 │
│ - 处理消息流 │
│ │
│ 转移: │
│ - 收到 delimiter → delimiter_received │
│ - 收到 term 命令 → waiting_for_delimiter │
│ - 用户 terminate() → term_req_sent1 │
└───────────────────────────────────────────────┘
↓
状态 2: delimiter_received (收到分隔符)
┌───────────────────────────────────────────────┐
│ - Peer 已发送终止分隔符 │
│ - 等待 term 命令 │
│ │
│ 转移: │
│ - 收到 term 命令 → term_ack_sent │
└───────────────────────────────────────────────┘
↓
状态 3: waiting_for_delimiter (等待分隔符)
┌───────────────────────────────────────────────┐
│ - 已收到 term 命令 │
│ - 继续读取剩余消息(_delay=true) │
│ │
│ 转移: │
│ - 读完所有消息 → term_ack_sent │
│ - 收到 delimiter → term_ack_sent │
└───────────────────────────────────────────────┘
↓
状态 4: term_ack_sent (已发送确认)
┌───────────────────────────────────────────────┐
│ - 所有消息已处理完毕 │
│ - 等待销毁 │
│ │
│ 转移: │
│ - process_pipe_term_ack() → 销毁 │
└───────────────────────────────────────────────┘
状态 5: term_req_sent1 (用户请求终止)
┌───────────────────────────────────────────────┐
│ - 用户调用 terminate() │
│ - 等待 Peer 确认 │
│ │
│ 转移: │
│ - 收到 term 命令 → term_req_sent2 │
│ - 无待读消息 → 发送 ack → term_ack_sent │
└───────────────────────────────────────────────┘
↓
状态 6: term_req_sent2 (双方都请求终止)
┌───────────────────────────────────────────────┐
│ - 双方都请求终止 │
│ - 立即销毁 │
│ │
│ 转移: │
│ - process_pipe_term_ack() → 销毁 │
└───────────────────────────────────────────────┘
6. Pipe 的生命周期
创建阶段:
┌─────────────────────────────────────┐
│ pipepair() │
│ - 创建两个 ypipe │
│ - 创建两个 pipe_t │
│ - 互相设置 peer │
└─────────────────────────────────────┘
↓
使用阶段:
┌─────────────────────────────────────┐
│ Socket ↔ Pipe ↔ Session │
│ - write() 写入消息 │
│ - flush() 刷新到对端 │
│ - read() 从对端读取 │
│ - 流控:HWM/LWM │
└─────────────────────────────────────┘
↓
终止阶段:
┌─────────────────────────────────────┐
│ terminate() / process_pipe_term() │
│ - 进入终止状态 │
│ - 处理剩余消息 │
│ - 发送/接收确认 │
└─────────────────────────────────────┘
↓
销毁阶段:
┌─────────────────────────────────────┐
│ process_pipe_term_ack() │
│ - 通知_pipe_terminated 事件 │
│ - 清理未读消息 │
│ - 删除_in_pipe │
│ - delete this │
└─────────────────────────────────────┘
7. Pipe 与 Socket 的关系
Socket (DEALER)
┌─────────────────────────┐
│ _pipes: [pipe1, pipe2] │
│ _fq: 入站队列 │
│ _lb: 出站队列 │
└───────────┬─────────────┘
│ i_pipe_events
↓
┌───────────┐
│ pipe1 │
│ _sink │ ← 指向 Socket
│ _peer │ ← 指向 Session 的 pipe
└─────┬─────┘
│
↓
Session
┌───────────┐
│ pipe │
│ _engine │ ← 网络 Socket
└───────────┘
8. 总结
| 方面 | 详情 |
|---|---|
| 本质 | 双向无锁消息队列 |
| 组成 | _in_pipe + _out_pipe |
| 身份 | 3 个 array_item_t 继承 |
| 状态 | 6 种状态的状态机 |
| 流控 | HWM/LWM 背压机制 |
| 生命周期 | 创建 → 使用 → 终止 → 销毁 |
| 线程模型 | 应用线程写,I/O 线程读 |
核心设计:
- ✅ 无锁队列:高性能消息传递
- ✅ 背压机制:防止内存爆炸
- ✅ 状态机:优雅处理终止
- ✅ 事件驱动:高效通知机制
- ✅ 多身份:同时属于多个数组
一句话:pipe_t 是 ZeroMQ 的智能消息通道,连接应用层和网络层,实现高效、可靠的消息传递!🎯
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)