图解式讲解 KVStore 多命令批处理:一条请求执行多条指令
1. 背景:为什么要支持多条指令同时执行?
1.1 单条指令执行的问题
传统的 KV 存储协议是这样的:
客户端 → SET key value → 服务端 OK
客户端 → GET key → 服务端 value
客户端 → DEL key → 服务端 OK
每次指令都要一次网络往返(RTT),延迟累积。如果一次要执行 10 条命令,就得等 10 次 RTT。
1.2 解决方案:MULTI 批处理模式
客户端 → MULTI\nSET k1 v1\nGET k1\nDEL k1\nEND → 服务端
[1] OK
[2] v1
[3] OK
MULTI DONE (3 cmds)
一次网络往返,服务器顺序执行所有指令,带序号返回结果。
1.3 设计目标
| 目标 | 说明 |
|---|---|
| 一条请求,多条指令 | 一次 RTT 完成多次操作 |
| 语义兼容 | 复用所有现有引擎逻辑,不改底层 |
| 清晰返回 | 每条指令带序号返回,方便调试 |
| 可扩展 | 为未来事务化/原子执行留接口 |
2. 协议设计
2.1 消息格式
MULTI\n
[CMD1]\n
[CMD2]\n
[CMD3]\n
...\n
END\n
MULTI必须是首行,表示进入批处理模式END表示批处理结束- 中间每一行是一条独立指令
2.2 返回格式
[1] OK
[2] value
[3] EXIST
[4] OK
MULTI DONE (4 cmds)
每条结果前加 [n] 序号,末尾汇总执行条数。
2.3 支持的指令
批处理模式下可以混用所有引擎的指令:
| 引擎 | 命令 | 示例 |
|---|---|---|
| Array | SET/GET/DEL/MOD/EXIST | SET name Bob |
| 红黑树 | RSET/RGET/RDEL/RMOD/REXIST | RSET score 99 |
| 哈希表 | HSET/HGET/HDEL/HMOD/HEXIST | HSET addr NJ |
| 跳表 | SSET/SGET/SDEL/SMOD/SEXIST | SSET rank 1 |
| 持久化 | FUSAVE/FULOAD/INLOAD | FUSAVE |
3. 实现详解
3.1 核心数据结构
#define MAX_MULTI_CMDS 128 // 最多 128 条指令
#define MAX_MULTI_LINE 512 // 每行最长 512 字节
static int kvs_handle_multi(const char *msg, char *response);
为什么不直接在 kvs_protocol() 里处理?
因为 kvs_protocol() 是单条指令的入口,职责单一。MULTI 是批处理协议,有自己的解析逻辑,独立成函数更清晰。
3.2 分词逻辑
// Step 1:跳过 "MULTI\n",从第二行开始逐行读取
const char *p = msg;
while (*p && *p != '\n') p++; // 跳过 "MULTI"
if (*p == '\n') p++; // 移到第二行
// Step 2:逐行读取,直到 END 或结束
while (*p && cmd_count < MAX_MULTI_CMDS) {
// 读到 END 就停
if (strncmp(p, "END", 3) == 0) break;
int len = 0;
// 读一行
while (*p && *p != '\n' && len < MAX_MULTI_LINE - 1) {
cmds[cmd_count][len++] = *p++;
}
// 去掉行尾 \r 和空格(兼容 Windows 换行)
while (len > 0 && (cmds[cmd_count][len - 1] == '\r' ||
cmds[cmd_count][len - 1] == ' ')) {
len--;
}
cmds[cmd_count][len] = '\0';
if (*p == '\n') p++;
if (len == 0) continue; // 空行跳过
cmd_count++;
}
关键点:
- 用
strncmp检测END,不需要解析整行 \r处理是为了兼容 Windows 换行(\r\n)- 空行直接跳过,不计入统计
3.3 逐条执行
// Step 3:逐条执行
char result_buf[8192];
result_buf[0] = '\0';
char single_resp[512];
for (int i = 0; i < cmd_count; ++i) {
char *sub_tokens[KVS_MAX_TOKENS] = {0};
char line_copy[MAX_MULTI_LINE];
// 复制一份,避免修改原数据
strncpy(line_copy, cmds[i], sizeof(line_copy) - 1);
line_copy[sizeof(line_copy) - 1] = '\0';
// 分词
int sub_count = kvs_split_token(line_copy, sub_tokens);
if (sub_count <= 0) {
snprintf(result_buf + strlen(result_buf),
sizeof(result_buf) - strlen(result_buf),
"[%d] ERR empty\r\n", i + 1);
continue;
}
// 调用现有协议栈执行指令
memset(single_resp, 0, sizeof(single_resp));
kvs_filter_protocol(sub_tokens, sub_count, single_resp);
// 追加结果
snprintf(result_buf + strlen(result_buf),
sizeof(result_buf) - strlen(result_buf),
"[%d] %s", i + 1, single_resp);
}
核心思想:递归复用!
每一条子指令都调用 kvs_filter_protocol() —— 这就是 KVStore 原有单条指令的协议解析函数。它本身已经处理了 SET/GET/DEL/MOD… 各种引擎的各种情况,我们直接复用,零修改底层逻辑。
kvs_handle_multi()
↓ 每行调用一次
kvs_filter_protocol()
↓ 原有逻辑
SET/GET/RSET/HGET/... ← 不需要改动
3.4 入口集成
在 kvs_protocol() 里加一行检测:
int kvs_protocol(char *msg, int length, char *response) {
if (msg == NULL || length <= 0 || response == NULL) return -1;
// 先检查是不是 MULTI 开头
if (strncmp(msg, "MULTI", 5) == 0) {
return kvs_handle_multi(msg, response);
}
// 普通单条指令
char *tokens[KVS_MAX_TOKENS] = {0};
int count = kvs_split_token(msg, tokens);
if (count == -1) return -1;
return kvs_filter_protocol(tokens, count, response);
}
4. 代码改动点
4.1 kvstore.c — 完整改动
① command 数组加 MULTI 和 END
const char *command[] = {
"HELP",
// ... 现有命令 ...
"FUSAVE", "FULOAD", "INLOAD", "EXIT",
"MULTI", "END" // ← 新增
};
② 枚举加两个新命令
enum {
KVS_CMD_START = 0,
// ... 现有枚举 ...
KVS_CMD_EXIT,
KVS_CMD_MULTI, // ← 新增
KVS_CMD_END, // ← 新增
KVS_CMD_COUNT,
};
③ kvs_protocol() 加 MULTI 检测
if (strncmp(msg, "MULTI", 5) == 0) {
return kvs_handle_multi(msg, response);
}
④ HELP 加说明
" MULTI - Batch mode (MULTI + cmds + END)\n"
⑤ MULTI 和 END 的 no-op case(防止 assert 0)
case KVS_CMD_MULTI:
length = sprintf(response, "ERROR: MULTI must be first token\r\n");
break;
case KVS_CMD_END:
length = sprintf(response, "ERROR: END without MULTI\r\n");
break;
⑥ kvs_handle_multi() 函数(完整代码)
#define MAX_MULTI_CMDS 128
#define MAX_MULTI_LINE 512
static int kvs_handle_multi(const char *msg, char *response) {
if (!msg || !response) return -1;
// 跳过 "MULTI\n"
const char *p = msg;
while (*p && *p != '\n') p++;
if (*p == '\n') p++;
char cmds[MAX_MULTI_CMDS][MAX_MULTI_LINE];
int cmd_count = 0;
// 逐行读取,直到 END 或结束
while (*p && cmd_count < MAX_MULTI_CMDS) {
if (strncmp(p, "END", 3) == 0) break;
int len = 0;
while (*p && *p != '\n' && len < MAX_MULTI_LINE - 1) {
cmds[cmd_count][len++] = *p++;
}
while (len > 0 && (cmds[cmd_count][len - 1] == '\r' ||
cmds[cmd_count][len - 1] == ' ')) {
len--;
}
cmds[cmd_count][len] = '\0';
if (*p == '\n') p++;
if (len == 0) continue;
cmd_count++;
}
if (cmd_count == 0) {
return sprintf(response, "ERROR empty MULTI\r\n");
}
char result_buf[8192];
result_buf[0] = '\0';
char single_resp[512];
for (int i = 0; i < cmd_count; ++i) {
char *sub_tokens[KVS_MAX_TOKENS] = {0};
char line_copy[MAX_MULTI_LINE];
strncpy(line_copy, cmds[i], sizeof(line_copy) - 1);
line_copy[sizeof(line_copy) - 1] = '\0';
int sub_count = kvs_split_token(line_copy, sub_tokens);
if (sub_count <= 0) {
snprintf(result_buf + strlen(result_buf),
sizeof(result_buf) - strlen(result_buf),
"[%d] ERR empty\r\n", i + 1);
continue;
}
memset(single_resp, 0, sizeof(single_resp));
kvs_filter_protocol(sub_tokens, sub_count, single_resp);
snprintf(result_buf + strlen(result_buf),
sizeof(result_buf) - strlen(result_buf),
"[%d] %s", i + 1, single_resp);
}
snprintf(result_buf + strlen(result_buf),
sizeof(result_buf) - strlen(result_buf),
"MULTI DONE (%d cmds)\r\n", cmd_count);
return snprintf(response, 8192, "%s", result_buf);
}
5. 测试示例
5.1 客户端输入
MULTI
SET name Bob
SET age 25
GET name
RSET score 99
GET score
HSET addr Nanjing
HGET addr
SSET rank 1
SGET rank
END
5.2 服务器返回
[1] OK
[2] OK
[3] Bob
[4] OK
[5] 99
[6] OK
[7] Nanjing
[8] OK
[9] 1
MULTI DONE (9 cmds)
5.3 异常处理
空块:
MULTI
END
ERROR empty MULTI
END 缺失(不会触发死循环,因为有 MAX_MULTI_CMDS 保护):
MULTI
SET k v
(客户端断开)
→ 处理到 MAX_MULTI_CMDS 条自动停止
6. 架构图解
客户端请求
│
▼
kvs_protocol()
│
├── strncmp(msg, "MULTI", 5) == 0 ──→ kvs_handle_multi()
│ │
│ ├── 逐行拆分 → cmds[]
│ ├── for each cmd
│ │ └── kvs_filter_protocol()
│ │ ├── SET/GET/DEL → kvs_array_*
│ │ ├── RSET/RGET → kvs_rbtree_*
│ │ ├── HSET/HGET → kvs_hash_*
│ │ └── SSET/SGET → kvs_skiplist_*
│ └── 汇总返回 [n] result
│
└── 普通指令 ──→ kvs_filter_protocol() ──→ 原有逻辑
7. 总结
多命令批处理核心思想:
→ 不改底层引擎,只在协议层拦截 MULTI
→ 递归复用 kvs_filter_protocol(),零侵入
→ 一次 RTT 执行多条指令,减少网络往返
→ 为未来事务化/原子执行/流水线打好架构基础
技术特征:
✓ 无需新增线程模型
✓ 无需修改底层存储逻辑
✓ 兼容所有引擎(ARRAY/RBTREE/HASH/SKIPTABLE)
✓ 兼容持久化(FUSAVE/FULOAD/INLOAD 可混用)
✓ 返回值带序号,方便调试
参考https://blog.csdn.net/2301_76218177/article/details/154556729
根据零声教育教学写作https://github.com/0voice
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)