1. 背景:为什么要支持多条指令同时执行?

1.1 单条指令执行的问题

传统的 KV 存储协议是这样的:

客户端 → SET key value     → 服务端 OK
客户端 → GET key           → 服务端 value
客户端 → DEL key           → 服务端 OK

每次指令都要一次网络往返(RTT),延迟累积。如果一次要执行 10 条命令,就得等 10 次 RTT。

1.2 解决方案:MULTI 批处理模式

客户端 → MULTI\nSET k1 v1\nGET k1\nDEL k1\nEND  → 服务端
                                            [1] OK
                                            [2] v1
                                            [3] OK
                                            MULTI DONE (3 cmds)

一次网络往返,服务器顺序执行所有指令,带序号返回结果。

1.3 设计目标

目标 说明
一条请求,多条指令 一次 RTT 完成多次操作
语义兼容 复用所有现有引擎逻辑,不改底层
清晰返回 每条指令带序号返回,方便调试
可扩展 为未来事务化/原子执行留接口

2. 协议设计

2.1 消息格式

MULTI\n
[CMD1]\n
[CMD2]\n
[CMD3]\n
...\n
END\n
  • MULTI 必须是首行,表示进入批处理模式
  • END 表示批处理结束
  • 中间每一行是一条独立指令

2.2 返回格式

[1] OK
[2] value
[3] EXIST
[4] OK
MULTI DONE (4 cmds)

每条结果前加 [n] 序号,末尾汇总执行条数。

2.3 支持的指令

批处理模式下可以混用所有引擎的指令:

引擎 命令 示例
Array SET/GET/DEL/MOD/EXIST SET name Bob
红黑树 RSET/RGET/RDEL/RMOD/REXIST RSET score 99
哈希表 HSET/HGET/HDEL/HMOD/HEXIST HSET addr NJ
跳表 SSET/SGET/SDEL/SMOD/SEXIST SSET rank 1
持久化 FUSAVE/FULOAD/INLOAD FUSAVE

3. 实现详解

3.1 核心数据结构

#define MAX_MULTI_CMDS  128    // 最多 128 条指令
#define MAX_MULTI_LINE  512    // 每行最长 512 字节

static int kvs_handle_multi(const char *msg, char *response);

为什么不直接在 kvs_protocol() 里处理?

因为 kvs_protocol() 是单条指令的入口,职责单一。MULTI 是批处理协议,有自己的解析逻辑,独立成函数更清晰。

3.2 分词逻辑

// Step 1:跳过 "MULTI\n",从第二行开始逐行读取
const char *p = msg;
while (*p && *p != '\n') p++;  // 跳过 "MULTI"
if (*p == '\n') p++;            // 移到第二行
// Step 2:逐行读取,直到 END 或结束
while (*p && cmd_count < MAX_MULTI_CMDS) {
    // 读到 END 就停
    if (strncmp(p, "END", 3) == 0) break;

    int len = 0;
    // 读一行
    while (*p && *p != '\n' && len < MAX_MULTI_LINE - 1) {
        cmds[cmd_count][len++] = *p++;
    }

    // 去掉行尾 \r 和空格(兼容 Windows 换行)
    while (len > 0 && (cmds[cmd_count][len - 1] == '\r' ||
                       cmds[cmd_count][len - 1] == ' ')) {
        len--;
    }

    cmds[cmd_count][len] = '\0';

    if (*p == '\n') p++;
    if (len == 0) continue;  // 空行跳过

    cmd_count++;
}

关键点:

  • strncmp 检测 END,不需要解析整行
  • \r 处理是为了兼容 Windows 换行(\r\n
  • 空行直接跳过,不计入统计

3.3 逐条执行

// Step 3:逐条执行
char result_buf[8192];
result_buf[0] = '\0';
char single_resp[512];

for (int i = 0; i < cmd_count; ++i) {
    char *sub_tokens[KVS_MAX_TOKENS] = {0};
    char line_copy[MAX_MULTI_LINE];

    // 复制一份,避免修改原数据
    strncpy(line_copy, cmds[i], sizeof(line_copy) - 1);
    line_copy[sizeof(line_copy) - 1] = '\0';

    // 分词
    int sub_count = kvs_split_token(line_copy, sub_tokens);
    if (sub_count <= 0) {
        snprintf(result_buf + strlen(result_buf),
                 sizeof(result_buf) - strlen(result_buf),
                 "[%d] ERR empty\r\n", i + 1);
        continue;
    }

    // 调用现有协议栈执行指令
    memset(single_resp, 0, sizeof(single_resp));
    kvs_filter_protocol(sub_tokens, sub_count, single_resp);

    // 追加结果
    snprintf(result_buf + strlen(result_buf),
             sizeof(result_buf) - strlen(result_buf),
             "[%d] %s", i + 1, single_resp);
}

核心思想:递归复用!

每一条子指令都调用 kvs_filter_protocol() —— 这就是 KVStore 原有单条指令的协议解析函数。它本身已经处理了 SET/GET/DEL/MOD… 各种引擎的各种情况,我们直接复用,零修改底层逻辑

kvs_handle_multi()
    ↓ 每行调用一次
kvs_filter_protocol()
    ↓ 原有逻辑
SET/GET/RSET/HGET/...  ← 不需要改动

3.4 入口集成

kvs_protocol() 里加一行检测:

int kvs_protocol(char *msg, int length, char *response) {
    if (msg == NULL || length <= 0 || response == NULL) return -1;

    // 先检查是不是 MULTI 开头
    if (strncmp(msg, "MULTI", 5) == 0) {
        return kvs_handle_multi(msg, response);
    }

    // 普通单条指令
    char *tokens[KVS_MAX_TOKENS] = {0};
    int count = kvs_split_token(msg, tokens);
    if (count == -1) return -1;

    return kvs_filter_protocol(tokens, count, response);
}

4. 代码改动点

4.1 kvstore.c — 完整改动

① command 数组加 MULTI 和 END

const char *command[] = {
    "HELP",
    // ... 现有命令 ...
     "FUSAVE", "FULOAD", "INLOAD", "EXIT",
    "MULTI", "END"    // ← 新增
};

② 枚举加两个新命令

enum {
    KVS_CMD_START = 0,
    // ... 现有枚举 ...
    KVS_CMD_EXIT,
    KVS_CMD_MULTI,   // ← 新增
    KVS_CMD_END,      // ← 新增
    KVS_CMD_COUNT,
};

③ kvs_protocol() 加 MULTI 检测

if (strncmp(msg, "MULTI", 5) == 0) {
    return kvs_handle_multi(msg, response);
}

④ HELP 加说明

"  MULTI                          - Batch mode (MULTI + cmds + END)\n"

⑤ MULTI 和 END 的 no-op case(防止 assert 0)

case KVS_CMD_MULTI:
    length = sprintf(response, "ERROR: MULTI must be first token\r\n");
    break;
case KVS_CMD_END:
    length = sprintf(response, "ERROR: END without MULTI\r\n");
    break;

⑥ kvs_handle_multi() 函数(完整代码)

#define MAX_MULTI_CMDS  128
#define MAX_MULTI_LINE  512

static int kvs_handle_multi(const char *msg, char *response) {
    if (!msg || !response) return -1;

    // 跳过 "MULTI\n"
    const char *p = msg;
    while (*p && *p != '\n') p++;
    if (*p == '\n') p++;

    char cmds[MAX_MULTI_CMDS][MAX_MULTI_LINE];
    int cmd_count = 0;

    // 逐行读取,直到 END 或结束
    while (*p && cmd_count < MAX_MULTI_CMDS) {
        if (strncmp(p, "END", 3) == 0) break;

        int len = 0;
        while (*p && *p != '\n' && len < MAX_MULTI_LINE - 1) {
            cmds[cmd_count][len++] = *p++;
        }

        while (len > 0 && (cmds[cmd_count][len - 1] == '\r' ||
                           cmds[cmd_count][len - 1] == ' ')) {
            len--;
        }

        cmds[cmd_count][len] = '\0';

        if (*p == '\n') p++;
        if (len == 0) continue;

        cmd_count++;
    }

    if (cmd_count == 0) {
        return sprintf(response, "ERROR empty MULTI\r\n");
    }

    char result_buf[8192];
    result_buf[0] = '\0';
    char single_resp[512];

    for (int i = 0; i < cmd_count; ++i) {
        char *sub_tokens[KVS_MAX_TOKENS] = {0};
        char line_copy[MAX_MULTI_LINE];
        strncpy(line_copy, cmds[i], sizeof(line_copy) - 1);
        line_copy[sizeof(line_copy) - 1] = '\0';

        int sub_count = kvs_split_token(line_copy, sub_tokens);
        if (sub_count <= 0) {
            snprintf(result_buf + strlen(result_buf),
                     sizeof(result_buf) - strlen(result_buf),
                     "[%d] ERR empty\r\n", i + 1);
            continue;
        }

        memset(single_resp, 0, sizeof(single_resp));
        kvs_filter_protocol(sub_tokens, sub_count, single_resp);

        snprintf(result_buf + strlen(result_buf),
                 sizeof(result_buf) - strlen(result_buf),
                 "[%d] %s", i + 1, single_resp);
    }

    snprintf(result_buf + strlen(result_buf),
             sizeof(result_buf) - strlen(result_buf),
             "MULTI DONE (%d cmds)\r\n", cmd_count);

    return snprintf(response, 8192, "%s", result_buf);
}

5. 测试示例

5.1 客户端输入

MULTI
SET name Bob
SET age 25
GET name
RSET score 99
GET score
HSET addr Nanjing
HGET addr
SSET rank 1
SGET rank
END

5.2 服务器返回

[1] OK
[2] OK
[3] Bob
[4] OK
[5] 99
[6] OK
[7] Nanjing
[8] OK
[9] 1
MULTI DONE (9 cmds)

5.3 异常处理

空块:

MULTI
END
ERROR empty MULTI

END 缺失(不会触发死循环,因为有 MAX_MULTI_CMDS 保护):

MULTI
SET k v
(客户端断开)
→ 处理到 MAX_MULTI_CMDS 条自动停止

6. 架构图解

客户端请求
    │
    ▼
kvs_protocol()
    │
    ├── strncmp(msg, "MULTI", 5) == 0 ──→ kvs_handle_multi()
    │                                         │
    │                                         ├── 逐行拆分 → cmds[]
    │                                         ├── for each cmd
    │                                         │     └── kvs_filter_protocol()
    │                                         │           ├── SET/GET/DEL → kvs_array_*
    │                                         │           ├── RSET/RGET → kvs_rbtree_*
    │                                         │           ├── HSET/HGET → kvs_hash_*
    │                                         │           └── SSET/SGET → kvs_skiplist_*
    │                                         └── 汇总返回 [n] result
    │
    └── 普通指令 ──→ kvs_filter_protocol() ──→ 原有逻辑

7. 总结

多命令批处理核心思想:
  → 不改底层引擎,只在协议层拦截 MULTI
  → 递归复用 kvs_filter_protocol(),零侵入
  → 一次 RTT 执行多条指令,减少网络往返
  → 为未来事务化/原子执行/流水线打好架构基础

技术特征:
  ✓ 无需新增线程模型
  ✓ 无需修改底层存储逻辑
  ✓ 兼容所有引擎(ARRAY/RBTREE/HASH/SKIPTABLE)
  ✓ 兼容持久化(FUSAVE/FULOAD/INLOAD 可混用)
  ✓ 返回值带序号,方便调试

参考https://blog.csdn.net/2301_76218177/article/details/154556729
根据零声教育教学写作https://github.com/0voice

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐