目录

一、开篇

二、向 AI 发请求长什么样

三、DeepSeekProvider 非流式调用8 步拆解

3.1 initModel — 初始化

3.2 sendMessage — 完整 8 步

3.3 这 8 步可以归纳为三个核心阶段

四、流式调用(sendMessageStream)— 核心中的核心

4.1 为什么要流式?

4.2 SSE 协议科普

4.3 SSE vs WebSocket

4.4 响应处理器 — response_handler

4.5 数据接收处理器 — content_receiver(核心!)

4.7 发送请求并兜底

五、content_receiver 的缓冲区设计

六、ChatGPTProvider 的差异点

6.1 API 端点不同

6.2 请求字段不同

6.3 响应结构不同

6.4 流式事件格式不同

七、问答专场


一、开篇

前面三篇我们把框架搭好了,这一篇进入真正的核心代码

你要实现的本质其实很简单:用 C++ 发一个 HTTP 请求给 AI 厂商的服务器,拿到回复。

但这里有两个难点:

  1. 每家厂商的 API 接口不一样 — 所以我们用策略模式解决了
  2. 流式输出 — 不是等 AI 全部说完再给你,而是边说边给你

这篇我们就以 DeepSeekProvider 为主线,完整拆解一个 AI 调用从开始到结束的全过程。然后对比 ChatGPTProvider 看看差异在哪。

二、向 AI 发请求长什么样

不管你用 DeepSeek、ChatGPT 还是 Gemini,本质都是一样的:

一个 HTTP POST 请求,请求体是 JSON,响应也是 JSON。

这个项目用的 HTTP 库是 cpp-httplib(只有头文件,不需要编译),用 jsoncpp 来构造和解析 JSON。

三、DeepSeekProvider 非流式调用8 步拆解

3.1 initModel — 初始化

bool DeepSeekProvider::initModel(const std::map<std::string, std::string>& config) {
    // 第1步:从配置中提取 API Key
    auto it = config.find("api_key");
    if (it == config.end()) {
        ERR("DeepSeekProvider initModel api_key not found");
        return false;
    }
    _apiKey = it->second;

    // 第2步:提取 Base URL,没传就用默认的
    it = config.find("endpoint");
    _endpoint = (it == config.end()) ? "https://api.deepseek.com" : it->second;

    // 第3步:标记初始化成功
    _isAvailable = true;
    INFO("DeepSeekProvider initModel success, endpoint: {}", _endpoint);
    return true;
}

注意endpoint 可以自定义。这意味着你可以把请求转发到任何兼容 OpenAI API 格式的服务端。

3.2 sendMessage — 完整 8 步

std::string DeepSeekProvider::sendMessage(
    const std::vector<Message>& messages,
    const std::map<std::string, std::string>& requestParam)
{
    // ========== 第1步:防御性检查 ==========
    if (!isAvailable()) {
        ERR("DeepSeekProvider sendMessage model not available");
        return "";
    }

    // ========== 第2步:解析请求参数 ==========
    double temperature = 0.7;
    int maxTokens = 2048;
    auto it = requestParam.find("temperature");
    if (it != requestParam.end()) temperature = std::stod(it->second);
    it = requestParam.find("max_tokens");
    if (it != requestParam.end()) maxTokens = std::stoi(it->second);

    // ========== 第3步:Message 列表 → JSON 数组 ==========
    Json::Value messagesArray(Json::arrayValue);
    for (const auto& msg : messages) {
        Json::Value msgObj;
        msgObj["role"] = msg._role;
        msgObj["content"] = msg._content;
        messagesArray.append(msgObj);
    }

    // ========== 第4步:构建完整请求体 ==========
    Json::Value requestBody;
    requestBody["model"] = getModelName();
    requestBody["messages"] = messagesArray;
    requestBody["temperature"] = temperature;
    requestBody["max_tokens"] = maxTokens;

    // ========== 第5步:JSON 序列化(对象 → 字符串) ==========
    Json::StreamWriterBuilder writerBuilder;
    writerBuilder["indentation"] = "";     // 压缩 JSON,不带缩进
    std::string requestBodyStr = Json::writeString(writerBuilder, requestBody);
    INFO("DeepSeekProvider requestBody: {}", requestBodyStr);

    // ========== 第6步:创建 HTTP Client 并发送请求 ==========
    httplib::Client client(_endpoint.c_str());
    client.set_connection_timeout(30, 0);    // 连接超时 30 秒
    client.set_read_timeout(60, 0);          // 读取超时 60 秒

    httplib::Headers headers = {
        {"Authorization", "Bearer " + _apiKey},
        {"Content-Type", "application/json"}
    };

    auto response = client.Post("/v1/chat/completions", headers,
                                requestBodyStr, "application/json");

    // ========== 第7步:检查 HTTP 响应状态 ==========
    if (!response) {
        ERR("DeepSeekProvider POST request failed");
        return "";
    }
    if (response->status != 200) {
        ERR("DeepSeekProvider API returned status: {}", response->status);
        return "";
    }
    INFO("DeepSeekProvider response body: {}", response->body);

    // ========== 第8步:解析响应 JSON,提取 AI 回复 ==========
    Json::Value responseBody;
    Json::CharReaderBuilder readerBuilder;
    std::string parseError;
    std::istringstream responseStream(response->body);

    if (!Json::parseFromStream(readerBuilder, responseStream,
                               &responseBody, &parseError)) {
        ERR("DeepSeekProvider parse response failed: {}", parseError);
        return "deepseek response json parse failed";
    }

    // 从 choices[0].message.content 取出回复
    if (responseBody.isMember("choices") &&
        responseBody["choices"].isArray() &&
        !responseBody["choices"].empty() &&
        responseBody["choices"][0].isMember("message") &&
        responseBody["choices"][0]["message"].isMember("content")) {

        std::string reply = responseBody["choices"][0]["message"]["content"].asString();
        INFO("DeepSeekProvider reply: {}", reply);
        return reply;
    }

    ERR("DeepSeekProvider response format invalid");
    return "deepseek response format invalid";
}

3.3 这 8 步可以归纳为三个核心阶段

阶段一:构造请求(第1-5步)
  Message → JSON对象 → JSON字符串

阶段二:发出请求(第6-7步)
  HTTP Client → 设置超时 → 设置请求头 → POST → 检查响应

阶段三:解析响应(第8步)
  JSON字符串 → JSON对象 → 提取 choices[0].message.content

四、流式调用(sendMessageStream)— 核心中的核心

4.1 为什么要流式?

非流式的问题:如果 AI 生成回复需要 10 秒,用户就要干等 10 秒,然后突然看到一整段文字。体验很差。

流式:AI 开始生成后,每生成一个词就推送给客户端。用户看到的是字一个一个蹦出来,体验好得多。

4.2 SSE 协议科普

SSE(服务器推送事件) 是一种基于 HTTP 的实时通信协议。

SSE 的数据格式:

data: {"choices":[{"delta":{"content":"你"}}]}

data: {"choices":[{"delta":{"content":"好"}}]}

data: {"choices":[{"delta":{"content":"!"}}]}

data: [DONE]

关键规则:

  • 每个事件以 data: 开头
  • 事件之间用两个换行符 \n\n 分隔
  • 数据可以是任意文本,AI 厂商返回的是 JSON
  • 用 data: [DONE] 表示流结束

4.3 SSE vs WebSocket

对比项 SSE WebSocket
方向 服务器→客户端单向 双向通信
协议 基于 HTTP 独立协议(ws://)
实现复杂度 极简 较复杂
自动重连 浏览器原生支持 要自己实现
适用场景 AI 流式输出、股票行情推送 在线游戏、实时协作编辑

为什么 AI 厂商选 SSE 而不是 WebSocket?

因为 AI 对话是单向数据流(客户端请求 → 服务端流式返回),不需要双向通信。SSE 实现简单,基于标准 HTTP 协议,兼容性好。

4.4 响应处理器 — response_handler

这一步在 HTTP 响应头到达时触发。用来检查状态码:

req.response_handler = [&](const httplib::Response& res) {
    if (res.status != 200) {
        gotError = true;
        errorMsg = "HTTP status code: " + std::to_string(res.status);
        return false;   // 返回 false 终止请求
    }
    return true;        // 返回 true 继续接收数据
};

4.5 数据接收处理器 — content_receiver(核心!)

这是整个项目最重要的 50 行代码。每当 HTTP 收到一块 chunk 数据,就会触发这个回调:

req.content_receiver = [&](const char* data, size_t len,
                            size_t offset, size_t totalLength) {
    // 如果响应头已经报错,不再接收后续数据
    if (gotError) return false;

    //把新数据追加到缓冲区 
    buffer.append(data, len);
    INFO("DeepSeekProvider buffer: {}", buffer);

    //从缓冲区中按 \n\n 切割出完整事件
    size_t pos = 0;
    while ((pos = buffer.find("\n\n")) != std::string::npos) {
        // 取出一条完整的事件
        std::string chunk = buffer.substr(0, pos);
        buffer.erase(0, pos + 2);    // 移除以处理的数据

        // 跳过空行和注释行(以 : 开头)
        if (chunk.empty() || chunk[0] == ':') continue;

        //提取 "data: " 之后的真实数据
        if (chunk.compare(0, 6, "data: ") == 0) {
            std::string modelData = chunk.substr(6);

            //检测结束标记
            if (modelData == "[DONE]") {
                callback("", true);       // 通知上层:结束了
                streamFinish = true;
                return true;
            }

            // 解析 JSON,提取增量内容
            Json::Value modelDataJson;
            Json::CharReaderBuilder reader;
            std::string errors;
            std::istringstream modelDataStream(modelData);

            if (Json::parseFromStream(reader, modelDataStream,
                                      &modelDataJson, &errors)) {
                // 提取 choices[0].delta.content
                if (modelDataJson.isMember("choices") &&
                    modelDataJson["choices"].isArray() &&
                    !modelDataJson["choices"].empty() &&
                    modelDataJson["choices"][0].isMember("delta") &&
                    modelDataJson["choices"][0]["delta"].isMember("content")) {

                    std::string content =
                        modelDataJson["choices"][0]["delta"]["content"].asString();

                    fullResponse += content;          // 累积完整回复
                    callback(content, false);         // 回调给上层
                }
            }
        }
    }
    return true;    // 继续接收后续数据
};

4.7 发送请求并兜底

    // 发送请求
    auto result = client.send(req);
    if (!result) {
        ERR("Network error: {}", to_string(result.error()));
        return "";
    }

    //确保流式操作正确结束
    if (!streamFinish) {
        WARN("stream ended without [DONE] marker");
        callback("", true);
    }

    return fullResponse;
}

五、content_receiver 的缓冲区设计

这个缓冲区(std::string buffer)是流式处理中最精妙的设计。

场景:网络传输是分块的,一个完整的事件可能被拆成两半发送。

举例

第1次收到: "data: {\"choices\":[{\"delta\":{\"con"
第2次收到: "tent\":\"你好\"}}]}\n\n"

如果单次处理,第一次收到的数据不完整,解析 JSON 会报错。

解决方案:用 buffer 累积所有收到数据,每次新数据到达时,在 buffer 中搜索完整的 \n\n 分隔符,只处理完整的行,不完整的留在 buffer 里等下次。

buffer 初始为空

第1次:收到 "data: {""",追加到 buffer
       buffer = "data: {"""
       找不到 \n\n → 继续等

第2次:收到 "data内容"}\n\n"
       buffer = "data: {""data内容"}\n\n"
       找到 \n\n!→ 取出事件,处理,清空 buffer

六、ChatGPTProvider 的差异点

ChatGPTProvider 的接口跟 DeepSeekProvider 完全一样(因为都继承 LLMProvider),但内部实现有三个关键差异。

6.1 API 端点不同

// DeepSeek 用的是 Chat Completions API
response = client.Post("/v1/chat/completions", headers, body, "application/json");

// ChatGPT 用的是 Responses API(OpenAI 新版)
response = client.Post("/v1/responses", headers, body, "application/json");

6.2 请求字段不同

// DeepSeek 的请求体
requestBody["model"] = "deepseek-chat";
requestBody["messages"] = messagesArray;
requestBody["max_tokens"] = maxTokens;

// ChatGPT 的请求体(Responses API)
requestBody["model"] = "gpt-4o-mini";
requestBody["input"] = messagesArray;            // 不是 "messages"!
requestBody["max_output_tokens"] = maxTokens;    // 不是 "max_tokens"!

6.3 响应结构不同

// DeepSeek 解析方式
string reply = responseJson["choices"][0]["message"]["content"].asString();

// ChatGPT(Responses API)解析方式
string reply = responseJson["output"][0]["content"][0]["text"].asString();

6.4 流式事件格式不同

DeepSeek 的事件(简单):

data: {"choices":[{"delta":{"content":"好"}}]}

ChatGPT 的事件(带 event type):

event: response.output_text.delta
data: {"type":"response.output_text.delta","delta":"好"}

event: response.output_item.done
data: {"type":"response.output_item.done","item":{...}}

event: response.completed
data: {"type":"response.completed"}

七、问答专场

Q1:非流式和流式的核心区别是什么?

对比 非流式 流式
请求体 无 stream 字段 加 "stream": true
请求头 不需要 加 Accept: text/event-stream
超时设置 60 秒 300 秒
响应解析 client.Post() 一次返回完整 JSON content_receiver 逐块接收
用户体验 等待全部生成才看到 边生成边看到
实现复杂度 简单 较复杂(缓冲+切割+JSON解析+回调)

Q2:content_receiver 中的缓冲区为什么要用 string 而不是 vector<char>

std::string 有丰富的成员函数:

  • find("\n\n") — 查找分隔符
  • substr(0, pos) — 截取子串
  • erase(0, pos + 2) — 删除已处理数据
  • append(data, len) — 追加新数据

如果用 vector<char>,这些操作全要手写,既容易出错又浪费时间。

Q3:如果发请求后网络断了会怎样?

client.send(req) 返回一个 Result 对象,可以通过 result.error() 获取具体错误:

auto result = client.send(req);
if (!result) {
    auto err = result.error();
    // err 可能的值:
    //   httplib::Error::Connection     — 连接失败
    //   httplib::Error::Read           — 读取超时
    //   httplib::Error::SSLConnection  — SSL 错误
    //   httplib::Error::Canceled       — 请求被取消
    ERR("Network error: {}", to_string(err));
    return "";
}

Q4:Json::StreamWriterBuilder 中的 indentation 是什么意思?

Json::StreamWriterBuilder writer;
writer["indentation"] = "";     // 压缩模式:不缩进
// 输出:{"model":"deepseek-chat","messages":[...]}

writer["indentation"] = "  ";   // 美化模式:2空格缩进
// 输出:
// {
//   "model": "deepseek-chat",
//   "messages": [...]
// }

调试时用美化模式方便阅读,生产环境用压缩模式减少网络传输量。

Q5:[DONE] 标记如果一直收不到会怎样?

代码中有兜底逻辑:

if (!streamFinish) {
    WARN("stream ended without [DONE] marker");
    callback("", true);     // 强制结束
}

加上 read_timeout(300, 0) 超时保护,300 秒后 HTTP 连接会自动断开,触发这个兜底逻辑。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐