zmq源码分析之pair和请求响应模式对比
·
文章目录
PAIR(一对一)vs REQ-REP(请求响应)的区别
这两种模式有本质区别,让我从多个维度详细对比:
1. 核心设计理念
PAIR - 平等对话
┌────────────┐◄════════════►┌────────────┐
│ PAIR A │ 双向平等 │ PAIR B │
│ │ 自由通信 │ │
└────────────┘ └────────────┘
特点:
- 两个端点完全平等
- 可以随时发送,无需遵循固定模式
- 像电话通话,谁都可以先说话
REQ-REP - 严格主从
┌────────────┐ ┌────────────┐
│ REQ │ 请求 │ REP │
│ (客户端) │ ──────► │ (服务端) │
│ │ ◄────── │ │
│ │ 响应 │ │
└────────────┘ └────────────┘
特点:
- 严格的主从关系
- 必须遵循"请求 - 响应"顺序
- 像顾客 - 服务员,顾客先点单,服务员后上菜
2. 代码行为对比
PAIR - 自由通信
// 端点 A
void *pair_a = zmq_socket(context, ZMQ_PAIR);
zmq_bind(pair_a, "inproc://step1");
// 可以随时发送,无需等待
zmq_send(pair_a, "Hello", 5, 0);
// 也可以随时接收
char buffer[256];
zmq_recv(pair_a, buffer, 256, 0);
// 连续发送多次(完全合法)
zmq_send(pair_a, "Msg1", 4, 0);
zmq_send(pair_a, "Msg2", 4, 0);
zmq_send(pair_a, "Msg3", 4, 0);
// 连续接收多次
zmq_recv(pair_a, buffer, 256, 0);
zmq_recv(pair_a, buffer, 256, 0);
zmq_recv(pair_a, buffer, 256, 0);
REQ-REP - 严格状态机
// REQ 端点
void *req = zmq_socket(context, ZMQ_REQ);
zmq_connect(req, "tcp://localhost:5555");
// ✅ 正确用法:发送请求 → 等待响应 → 再发送
zmq_send(req, "Request1", 8, 0);
zmq_recv(req, buffer, 256, 0); // 必须等待响应
zmq_send(req, "Request2", 8, 0);
zmq_recv(req, buffer, 256, 0);
// ❌ 错误用法:连续发送(会返回 EFSM 错误)
zmq_send(req, "Request1", 8, 0);
zmq_send(req, "Request2", 8, 0); // 报错:EFSM (状态机错误)
// ❌ 错误用法:先接收
zmq_recv(req, buffer, 256, 0); // 报错:EFSM
// REP 端点
void *rep = zmq_socket(context, ZMQ_REP);
zmq_bind(rep, "tcp://*:5555");
// ✅ 正确用法:接收请求 → 发送响应 → 再接收
while (true) {
zmq_recv(rep, buffer, 256, 0); // 必须先接收
zmq_send(rep, "Response", 8, 0); // 再发送
}
// ❌ 错误用法:先发送
zmq_send(rep, "Response", 8, 0); // 报错:EFSM
3. 内部实现差异
PAIR 实现(简单直接)
// pair.cpp - 发送
int zmq::pair_t::xsend (msg_t *msg_)
{
// 直接写入管道,没有任何状态检查
if (!_pipe || !_pipe->write (msg_)) {
errno = EAGAIN;
return -1;
}
if (!(msg_->flags () & msg_t::more))
_pipe->flush ();
msg_->init (); // 清空消息
return 0;
}
// pair.cpp - 接收
int zmq::pair_t::xrecv (msg_t *msg_)
{
// 直接从管道读取,没有任何状态检查
if (!_pipe || !_pipe->read (msg_)) {
msg_->init ();
errno = EAGAIN;
return -1;
}
return 0;
}
REQ 实现(状态机)
// req.cpp - 发送
int zmq::req_t::xsend (msg_t *msg_)
{
// 状态检查:如果还在等待响应,不能发送新请求
if (_receiving_reply) {
if (_strict) {
errno = EFSM; // 状态机错误!
return -1;
}
_receiving_reply = false;
}
// 消息开头自动添加空帧(信封)
if (_message_begins) {
msg_t bottom;
bottom.init ();
bottom.set_flags (msg_t::more);
dealer_t::sendpipe (&bottom, &_reply_pipe);
_message_begins = false;
}
// 发送完成后,切换到"等待响应"状态
if (!(msg_->flags () & msg_t::more)) {
_receiving_reply = true; // 状态翻转
_message_begins = true;
}
return dealer_t::xsend (msg_);
}
// req.cpp - 接收
int zmq::req_t::xrecv (msg_t *msg_)
{
// 状态检查:如果没有发送请求,不能接收
if (!_receiving_reply) {
errno = EFSM; // 状态机错误!
return -1;
}
// 跳过不匹配的响应...
while (_message_begins) {
// 检查请求 ID 匹配...
}
// 接收完成后,切换回"可以发送"状态
if (!(msg_->flags () & msg_t::more)) {
_receiving_reply = false; // 状态翻转
}
return dealer_t::xrecv (msg_);
}
4. 连接数差异
PAIR - 严格一对一
// pair.cpp - 只接受一个连接
void zmq::pair_t::xattach_pipe (pipe_t *pipe_, ...)
{
// 只能连接一个 peer
if (_pipe == NULL)
_pipe = pipe_; // 第一个连接
else
pipe_->terminate (false); // 拒绝后续连接!
}
// 图示
┌────────────┐ ┌────────────┐
│ PAIR A │◄───────►│ PAIR B │
└────────────┘ └────────────┘
│
└── ❌ 不能再连接其他端点
REQ-REP - 多对一
// REP 可以连接多个 REQ
┌────────────┐
│ REQ 1 │───┐
├────────────┤ │
│ REQ 2 │───┼─────┐
├────────────┤ │ │
│ REQ 3 │───┤ ▼
└────────────┘ │ ┌────────────┐
└─│ REP │
│ (服务端) │
└────────────┘
// REP 自动负载均衡(轮询)
REQ1 发送 → REP 处理 → 回复 REQ1
REQ2 发送 → REP 处理 → 回复 REQ2
REQ3 发送 → REP 处理 → 回复 REQ3
5. 消息格式差异
PAIR - 原始数据
┌─────────────────────────────┐
│ 用户数据 │
│ "Hello" │
└─────────────────────────────┘
直接发送,不添加任何额外内容
REQ-REP - 带信封
REQ 发送:
┌──────────┬──────────┬─────────────┐
│ 空帧 │ 分隔符 │ 用户数据 │
│ (Envelope)│(Delimiter)│ "Request" │
└──────────┴──────────┴─────────────┘
↑ ↑
自动添加 自动添加
REP 接收时自动保存信封
REP 回复时自动恢复信封
6. 使用场景对比
| 场景 | PAIR | REQ-REP | 说明 |
|---|---|---|---|
| 线程间通信 | ✅ 最佳 | ❌ 不推荐 | PAIR 延迟最低 |
| 进程间简单通信 | ✅ 适合 | ⚠️ 可以 | 简单场景用 PAIR |
| RPC 调用 | ❌ 不适合 | ✅ 最佳 | REQ-REP 自动处理顺序 |
| 任务分发 | ❌ 不适合 | ✅ 适合 | REP 自动负载均衡 |
| 双向对话 | ✅ 最佳 | ❌ 不适合 | PAIR 可自由发言 |
| 严格一问一答 | ⚠️ 需手动 | ✅ 自动保证 | REQ-REP 强制顺序 |
| 多客户端 | ❌ 不支持 | ✅ 支持 | REP 可连接多个 REQ |
7. 性能对比
| 指标 | PAIR | REQ-REP | 说明 |
|---|---|---|---|
| 延迟 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | PAIR 无额外开销 |
| 吞吐量 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | PAIR 无状态检查 |
| 灵活性 | ⭐⭐⭐⭐⭐ | ⭐⭐ | PAIR 完全自由 |
| 安全性 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | REQ-REP 强制顺序 |
| 可扩展性 | ⭐ | ⭐⭐⭐⭐ | REP 支持多客户端 |
8. 实际例子对比
场景:线程间通信
// ✅ 使用 PAIR - 简单直接
void thread_a() {
void *pair = zmq_socket(context, ZMQ_PAIR);
zmq_bind(pair, "inproc://step1");
// 可以随时发送/接收
zmq_send(pair, "Data", 4, 0);
char buffer[256];
zmq_recv(pair, buffer, 256, 0);
}
void thread_b() {
void *pair = zmq_socket(context, ZMQ_PAIR);
zmq_connect(pair, "inproc://step1");
char buffer[256];
zmq_recv(pair, buffer, 256, 0);
zmq_send(pair, "ACK", 3, 0);
}
// ❌ 使用 REQ-REP - 过度复杂
// 需要严格遵循请求 - 响应顺序
// 不适合平等的线程通信
场景:RPC 服务
// ✅ 使用 REQ-REP - 自动保证顺序
// 客户端
void *req = zmq_socket(context, ZMQ_REQ);
zmq_connect(req, "tcp://server:5555");
zmq_send(req, "GET_TIME", 8, 0);
zmq_recv(req, time_buffer, 256, 0); // 自动等待
zmq_send(req, "GET_USER", 8, 0);
zmq_recv(req, user_buffer, 256, 0);
// 服务端
void *rep = zmq_socket(context, ZMQ_REP);
zmq_bind(rep, "tcp://*:5555");
while (true) {
zmq_recv(rep, buffer, 256, 0); // 接收请求
// 处理请求...
zmq_send(rep, response, len, 0); // 发送响应
}
// ❌ 使用 PAIR - 需要手动管理顺序
// 可能发送混乱,需要自己实现请求 - 响应逻辑
9. 错误处理对比
PAIR - 简单错误
// 可能的错误
zmq_send(pair, "Msg", 3, 0);
// EAGAIN - 管道满了/未连接
// EFAULT - 无效的消息指针
REQ-REP - 状态机错误
// 可能的错误
zmq_send(req, "Msg", 3, 0);
// EAGAIN - 管道满了/未连接
// EFAULT - 无效的消息指针
// EFSM - 状态机错误(在错误的时间发送/接收)
zmq_recv(req, buffer, 256, 0);
// EAGAIN - 没有消息
// EFSM - 还没发送请求就想接收
10. 总结对比表
| 特性 | PAIR | REQ-REP |
|---|---|---|
| 连接数 | 1 对 1 | 多对 1 |
| 通信方向 | 双向平等 | 严格主从 |
| 发送顺序 | 完全自由 | 必须请求 - 响应 |
| 状态机 | 无 | 有(严格) |
| 信封 | 无 | 有(自动) |
| 负载均衡 | 无 | 自动(轮询) |
| 延迟 | 最低 | 略高 |
| 灵活性 | 最高 | 较低 |
| 安全性 | 低(需手动管理) | 高(自动保证) |
| 适用场景 | 线程通信、简单对话 | RPC、任务分发 |
一句话总结
- PAIR = 平等的电话通话,谁都可以随时说话
- REQ-REP = 顾客 - 服务员,顾客先点单,服务员后上菜,顺序严格
选择建议:
- 线程间简单通信 → PAIR
- RPC 调用、任务分发 → REQ-REP
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)