第1章:让AI开口说话
理论
调通 LLM API,然后套上终端界面支持多轮对话 。
API 调用看着简单,但里面藏着 Agent 架构的核心概念。消息格式决定了对话怎么组织,流式响应决定了用户体验的下限,Token 计费决定了你的钱包能撑多久。
而多轮对话引出了消息管理、状态设计、格式转换这些 Agent 必备的基础设施。把这些搞明白,后面的路才走得稳。
API长什么样子
curl https://api.anthropic.com/v1/messages \ -H 'Content-Type: application/json' \ -H 'anthropic-version: 2023-06-01' \ -H "X-Api-Key: $ANTHROPIC_API_KEY" \ -d '{ "max_tokens": 1024, "messages": [ { "content": "Hello, world", "role": "user" } ], "model": "claude-sonnet-4-6" }'
{ "id": "msg_013Zva2CMHLNnXjNJJKqJ2EF", "type": "message", "role": "assistant", "content": [ { "type": "text", "text": "Hello! How can I help you today?" } ], "model": "claude-sonnet-4-6", "stop_reason": "end_turn", "usage": { "input_tokens": 10, "output_tokens": 12 } }
Messages 格式
先看请求里的 messages 。它是一个数组,每条消息有 role 和 content 两个字段。
role 只有两个值: user (用户说的话)和 assistant (模型的回复)。
LLM 返回一个工具调用请求,这算 assistant 的消息。你执行完工具拿到了结果,需要把结果发回去—— 这个结果要作为 user 消息发送 ,因为从 API 的视角看,所有你发给模型的东西都归 user,模型返回的都归 assistant。
流式响应
流式的意思是:模型一边生成一边推送给你,你一边收到一边显示。用户会看到文字一个一个蹦出来,就像有人在实时打字。
流式响应基于 SSE(Server-Sent Events)协议。
一个请求
system 参数放的是角色设定和环境信息。告诉模型你是谁、该怎么行为,也告诉它当前工作目录是什么、操作系统是什么。你总不希望模型在 Linux 上给你建议用 PowerShell 吧。这部分在一次会话内相对固定,不随对话变化。
messages 数组放的是对话历史和动态上下文。用户和模型之间你一句我一句的对话,以及后续会讲到的项目指令、动态提醒等信息,都放在这里。
tools 参数 放的是工具描述。你的 Agent 有哪些工具可以用、每个工具的参数是什么格式、返回什么结果。这就是 Function Calling,工具系统章节会详细展开。
Token
Token 是 LLM 的计费单位。粗略来说,英文每个单词大约 1-2 个 token,中文每个字大约 1-2 个 token。具体取决于模型使用的 tokenizer,你不需要精确计算,只需要知道它是衡量输入输出量和计费的基本单位。
input_tokens 是你发给模型的所有内容,包括 system prompt、messages 和 tools 描述。 output_tokens 是模型生成的回复。
Extended Thinking: 让模型先想再说
Extended Thinking,让模型在正式回复之前先进行一轮内部推理。开启后响应的 content 数组里会多一个 thinking 类型的内容块,排在 text 块之前。
对 Agent 开发来说,只需要记住两件事。第一,thinking 的 Token 算在 output_tokens 里,是有成本的,本质上是用钱换更准确的工具调用决策。Agent 场景下这通常值得,因为一次准确的工具调用可以省掉好几轮纠错的开销。第二, thinking 内容不能放进后续请求的 messages 里 ,维护对话历史时必须把 thinking 块过滤掉,只保留 text 和 tool_use 块发给 API,否则 API 会报错。
封装的核心原则
配置上只需要四个字段就能覆盖所有主流供应商: protocol 决定走哪家的 API 协议, model 指定模型, base_url 指定端点地址, api_key 做认证。
// 你自己定义的类型(上层代码只用这些) Message { role, content } StreamEvent { type, text?, usage?, error? } Usage { inputTokens, outputTokens } // 你的客户端(根据 protocol 分发到不同的后端实现) class LLMClient: constructor(protocol, model, baseURL, apiKey) function streamChat(systemPrompt, messages) -> Stream<StreamEvent>: // 1. 把自定义 Message 转成对应供应商的格式 // 2. 调用对应的流式 API // 3. 把供应商的事件转成自定义 StreamEvent // 4. 通过异步流返回给调用方
从单轮到多轮
多轮对话是怎么实现的?
每次调 API,把完整的对话历史发过去 。就这么简单。
消息模型
前面我们定义了面向 API 的消息结构: role + content 。但你有没有想过,光这两个字段够用吗?
想想 Agent 运行过程中会产生哪些信息:用户输入、模型回复、启动时的欢迎语、API 调用失败的错误信息,后面还会有工具调用记录。这些东西的角色各不相同,光 user 和 assistant 两种根本不够分。
再想想流式接收的场景。模型的回复是一个字一个字蹦出来的,这条 assistant 消息在接收过程中算什么状态?接收完了呢?中间断了呢?一条消息从创建到结束,其实是有生命周期的。
API 层那个简单的 role + content 根本表达不了这些。
API 层 还是那个 role + content ,专门用来跟 LLM 通信,保持简单干净。
内部层 则丰富得多。角色从两种扩展到四种: user 、 assistant 、 system 、 tool 。每条消息带一个唯一 ID,方便定位和更新。还有时间戳、Token 用量、响应耗时这些元数据。
最关键的是多了一个状态字段。一条 assistant 消息刚创建时是 streaming ,流式接收完毕变成 complete ,出错变成 error 。
对话管理器
有了消息模型,接下来想一个问题:谁来管这些消息?
你可能觉得,搞一个数组往里面 append 不就行了。但别忘了流式接收的场景:后台正在往一条 assistant 消息里追加文字,同时另一边正在读这个消息列表来渲染。两处同时操作同一个列表,不加保护就是数据竞争。
所以你需要一个对话管理器,把消息列表包起来,内部保证并发安全(不同语言做法不同,有的用锁,有的靠单线程异步天然避免竞争)。外部调用方只需要:添加消息的时候拿到一个唯一 ID,流式更新的时候根据 ID 追加内容,需要渲染的时候拿一份消息列表的快照。并发的事情全交给管理器。
格式转换
对话管理器里最关键的方法是 toAPIFormat() :把内部层的消息列表转换成 API 层的格式。
这个转换看似只是格式映射,但里面藏着不少坑。
首先是过滤。内部层有些消息不该发给 API。 system 角色的消息(比如欢迎语)是内部概念,API 有单独的 system prompt 参数,你再发一条 system 过去会让模型困惑。 error 状态的 assistant 消息也得过滤掉,你总不希望模型看到一条报错信息然后尝试接着它说。
然后是合并。虽然 Claude API 能自动合并相邻的同角色消息,但客户端主动合并是更好的做法,减少冗余 token,消息结构也更清晰,方便调试。
最后还得确保首条消息是 user,并且 user/assistant 交替出现。如果过滤掉 system 消息后第一条变成了 assistant,模型可能理解不了上下文。
流式更新与多轮协作
function sendMessage(userText): // 1. 用户消息加入对话管理器 conversation.addMessage({ role: "user", content: userText, status: "complete" }) // 2. 创建一条空的 assistant 消息,状态为 streaming assistantId = conversation.addMessage({ role: "assistant", content: "", status: "streaming" }) // 3. 把完整对话历史转换成 API 格式 apiMessages = conversation.toAPIFormat() // 4. 异步调用 LLM,流式更新 assistant 消息 stream = llmClient.streamChat(systemPrompt, apiMessages) for event in stream: if event.type == "text": conversation.updateMessage(assistantId, appendText(event.text)) if event.type == "done": conversation.updateMessage(assistantId, setComplete(event.usage)) if event.type == "error": conversation.updateMessage(assistantId, setError(event.error))
总结
这一章做了两件事:调通 LLM API,套上终端 UI 支持多轮对话。
Messages 里 user 和 assistant 交替出现的惯例,会深刻影响后面 Agent 循环里的消息管理逻辑。响应里的 content 永远是数组而不是字符串,这个设计到工具调用的时候会变得非常重要。
流式响应是基本要求。SSE 事件序列(message_ start → content_ block_ start → content_ block_ delta → content_ block_ stop → message_ delta → message_ stop)是处理流式响应的基本框架,后面每一章都会用到。
封装外部依赖的核心原则是暴露领域语义,隐藏实现细节。定义自己的消息和事件类型,把 SDK 藏在内部。将来换供应商,上层代码完全不受影响。
LLM API 是无状态的,多轮对话全靠客户端维护消息历史。消息模型分两层:API 层只有 role + content,内部层增加了状态、ID、元数据。
格式转换负责过滤、合并、交替校验,把内部状态转成 API 能用的干净格式。
实操
初步想法
# 我的初步想法 我要从零开始做一个命令行AI助手(Coding Agent),叫MewCode,类似ClaudeCode。用[你的语言]开发。 这一步的目标是:用户在终端启动MewCode后,进入一个交互式对话界面(TUI),可以输入问题,MewCode调用大模型API,把回复流式地 逐字打印出来。支持多轮对话,AI能记住之前说过的话。 技术要求: 支持AnthropicClaude和OpenAI两种API后端,通过配置文件切换流式用SSE,不是等全部生成完再返回 支持Claude的extended thinking Provider层要抽象成统一接口,以后方便加新的后端 这一步不做 tooluse、文件操作、代码编辑这些agent 功能,纯对话就行。 配置格式:用 YAML 配置文件管理 LLM 供应商信息,四个核心字段: protocol 决定走哪家协议 model 指定模型 base_url指定请求的地址 api_key 做认证。
生成的文档
# 多协议 LLM 终端对话客户端 Spec## 背景 mewcode 是一个从零构建的、Claude Code 风格的终端 AI Agent。本项目是其第一块基石: 在没有工具调用、权限、记忆等高级能力之前,先打通"人 ↔ LLM"的最小闭环—— 让用户能在终端里与大模型进行流畅的多轮对话。 ## 目标 - 打通 LLM API 调用:能向大模型发起请求并正确接收回复。 - 同时支持 Anthropic 与 OpenAI 两种协议,通过一份配置切换接入对象(含兼容端点)。 - 提供一个全功能终端界面(TUI),承载输入、流式输出与多轮对话。 - 回复以流式方式实时呈现,结束后以 markdown 形式美化展示。 - 在单次会话内维护完整对话上下文,支持连续多轮交流。 - 对调用失败有可恢复的错误反馈,不中断会话。 ## 功能需求 - F1: 配置加载 从项目内的 YAML 配置文件读取一个 providers 列表(可含多项)。每项包含:可读名称、 协议类型、可选的自定义端点地址、密钥、模型名、是否开启扩展思考。逐项校验必要项 (如密钥),缺失时给出清晰的启动期错误并终止。 - F2: provider 选择 若配置中仅有一个 provider,直接采用它进入对话;若有多个,启动后先呈现一个选择 界面(方向键列表,列出各 provider 的名称与模型),用户选定一项后再进入对话。 被选定者即本次会话的活动 provider。 - F3: 多协议适配 根据活动 provider 的"协议类型"选择对应的请求构造与响应解析方式,统一支持 Anthropic 与 OpenAI 两种协议。若配置了自定义端点地址,则覆盖该协议的默认端点 (从而可接入各类兼容服务)。对上层暴露与协议无关的统一对话接口。 - F4: 发起对话请求 将"内置系统提示词 + 当前完整对话历史"作为上下文,向活动 provider 发起一次对话 请求,并按配置决定是否开启扩展思考。 - F5: 流式接收 以流式方式接收回复,实时解析出正文文本增量并向界面输送。对扩展思考产生的思考 增量正确识别但不渲染(接收即丢弃),不得混入正文。 - F6: 多轮上下文 在单次会话内维护完整对话历史(用户与助手消息交替追加)。每一轮新请求都携带此前 全部上下文,实现连续多轮对话。程序退出后历史不保留。 - F7: 终端界面布局 启动后呈现一个全功能终端界面,自上而下包含: 到达前即可见;本轮结束后显示总耗时。 - AC13: (N1) 等待与流式期间,界面保持可响应(可滚动、不冻结)。
# 多协议 LLM 终端对话客户端 Plan## 技术栈 - 语言:Java 21(LTS;启用 virtual threads;使用 record、sealed interface、pattern matching、switch 模式匹配) - 构建:Maven(`pom.xml`,便于 CI;目标 JDK 21) - TUI:Lanterna(`com.googlecode.lanterna:lanterna`)—— `MultiWindowTextGUI` + `BasicWindow` + `Panel`(`LinearLayout` 纵向); 输入用 `TextBox`(多行模式),状态栏用 `Label`;动态区用 `Label` 持续更新文本; 完成块写入一个"scrollback Panel"中按顺序追加 `Label`(终端滚动历史由 Lanterna 主屏区呈现) - markdown 渲染:flexmark-java(`com.vladsch.flexmark:flexmark-all`)解析 AST,自行将 AST 转为 Lanterna 富文本 (代码块用灰底/不同前景色,列表加 `• ` 前缀,强调加粗,标题加色),宽度跟随终端列数(N6) - 配置:SnakeYAML Engine(`org.snakeyaml:snakeyaml-engine`)解析为 `Map<String,Object>`,手动绑定到 record - LLM 通信:官方 Java SDK —— `com.anthropic:anthropic-java`、`com.openai:openai-java` (两个 SDK 都提供 async 流式:`client.async().messages().createStreaming(params).subscribe(...)`、 `client.async().chat().completions().createStreaming(params).subscribe(...)`;内部已处理 SSE) - 并发:Java 21 virtual thread + `java.util.concurrent.SubmissionPublisher`(实现 `Flow.Publisher<StreamEvent>`), 零外部依赖;ctx 取消用 `volatile boolean cancelled` + `Thread.interrupt()` ## 架构概览(分层) 1. 入口层 `dev.mewcode.Main` —— 加载配置、打印 banner、启动 TUI。 2. 配置层 `dev.mewcode.config` —— 读取并校验 `.mewcode/config.yaml`,给出 providers 列表。 3. LLM 协议层 `dev.mewcode.llm` —— 定义协议无关的 `Provider` 接口与统一消息/流式事件类型; `AnthropicProvider`、`OpenAIProvider` 两个适配器各自封装官方 SDK、统一吐出文本增量(思考增量内部丢弃)。 4. 会话层 `dev.mewcode.conversation` —— 进程内维护多轮历史,提供完整上下文。 5. 提示词/资源 `dev.mewcode.prompt` —— 内置 system prompt 与启动 banner(ASCII 猫)。 6. 终端层 `dev.mewcode.tui` —— Lanterna 应用,含状态机(选择/空闲/流式)、输入框、对话区 Panel、 spinner+计时(Lanterna 定时刷新)、provider 选择列表;以"订阅 `Flow.Publisher`"的方式把 llm 流式事件 投递给 UI 线程(`textGUI.getGUIThread().invokeLater(...)`)。 ## 数据流(一轮对话) 用户输入 → tui 提交 → `Conversation` 追加 user 消息 → 调 `Provider.stream(messages)` → 得到 `Flow.Publisher<StreamEvent>` → tui 用 `Flow.Subscriber` 逐个收文本增量并 `invokeLater` 实时显示 (spinner 计时同步进行)→ 收到 `Done` 事件 → 用 flexmark 渲染整段 → `Conversation` 追加 assistant 消息 → 回到空闲。 ## 核心数据结构与接口 ```java // ───────── config 层 ───────── package dev.mewcode.config; public record ProviderConfig( String name, // 状态栏左侧显示 String protocol, // "anthropic" | "openai" String baseUrl, // 空则用 SDK 默认端点 String apiKey, String model, // 状态栏右侧显示 boolean thinking // 仅 anthropic 生效 ) {} public record Config(List<ProviderConfig> providers) {} public final class ConfigLoader { public static Config load(Path path) throws IOException; // 加载 + 校验 } // ───────── llm 层(协议无关)───────── package dev.mewcode.llm; | 错误处理 | 运行时错误经 `StreamEvent.Failed` 展示,不退出 | 满足 F11 | | 代码风格 | Spotless(google-java-format) | 等价 gofmt;CI 一致 |
# 多协议 LLM 终端对话客户端 Tasks > 顶层包:`dev.mewcode`(Java 21 / Maven)。构建产物在 `target/`。 ## 文件清单 | 操作 | 文件 | 职责 | |------|------|------| | 新建 | `pom.xml` | Maven 模块定义、JDK 21、依赖、main class、Spotless | | 新建 | `.mewcode/config.yaml.example` | 配置模板 | | 修改 | `.gitignore` | 忽略 `.mewcode/config.yaml`、`target/` | | 新建 | `src/main/java/dev/mewcode/config/Config.java` | record Config | | 新建 | `src/main/java/dev/mewcode/config/ProviderConfig.java` | record ProviderConfig | | 新建 | `src/main/java/dev/mewcode/config/ConfigLoader.java` | YAML 加载与校验 | | 新建 | `src/main/java/dev/mewcode/prompt/Prompt.java` | SYSTEM_PROMPT、CAT_BANNER、renderBanner | | 新建 | `src/main/java/dev/mewcode/llm/Message.java` | record Message | | 新建 | `src/main/java/dev/mewcode/llm/StreamEvent.java` | sealed StreamEvent | | 新建 | `src/main/java/dev/mewcode/llm/Provider.java` | Provider 接口 | | 新建 | `src/main/java/dev/mewcode/llm/ProviderFactory.java` | 按 protocol 构造 | | 新建 | `src/main/java/dev/mewcode/conversation/Conversation.java` | 单会话多轮历史 | | 新建 | `src/main/java/dev/mewcode/llm/AnthropicProvider.java` | anthropic 适配器 | | 新建 | `src/main/java/dev/mewcode/llm/OpenAIProvider.java` | openai 适配器 | | 新建 | `src/main/java/dev/mewcode/tui/TuiApp.java` | Lanterna 主应用、状态机、run() | | 新建 | `src/main/java/dev/mewcode/tui/StreamPump.java` | Flow.Subscriber、spinner 计时 | | 新建 | `src/main/java/dev/mewcode/tui/ProviderSelector.java` | provider 选择 ListBox | | 新建 | `src/main/java/dev/mewcode/tui/View.java` | View 拼装、状态栏、错误样式、markdown 定型 | | 新建 | `src/main/java/dev/mewcode/Main.java` | 入口装配 | | 新建 | `src/test/java/dev/mewcode/config/ConfigLoaderTest.java` | config 单测 | | 新建 | `src/test/java/dev/mewcode/conversation/ConversationTest.java` | conversation 单测 | --- ## T1: 初始化 Maven 工程与依赖 **文件:** `pom.xml`、`src/main/java/dev/mewcode/Main.java`(临时占位) **依赖:** 无 **步骤:** 1. 创建 `pom.xml`:`<groupId>dev.mewcode</groupId>`、`<artifactId>mewcode</artifactId>`、 `<maven.compiler.release>21</maven.compiler.release>`。 2. 在 `pom.xml` 加依赖(版本以 Maven Central 上最新稳定为准): - SDK:`com.anthropic:anthropic-java`、`com.openai:openai-java` - TUI:`com.googlecode.lanterna:lanterna` - markdown:`com.vladsch.flexmark:flexmark-all` - 配置:`org.snakeyaml:snakeyaml-engine` - 测试:`org.junit.jupiter:junit-jupiter`(scope=test) 3. 配 `maven-shade-plugin` 打 fat jar 或 `exec-maven-plugin`,`mainClass = dev.mewcode.Main`。 4. 可选:加 `com.diffplug.spotless:spotless-maven-plugin`(google-java-format),目标 `apply` / `check`。 ``` (T4 可与 T2/T5 并行;T7、T8 可并行;T10/T11/T12 在 T9 后可并行推进。)
# 多协议 LLM 终端对话客户端 Checklist > 每一项通过运行代码或观察行为来验证,聚焦系统行为;括号内为验证方式。 ## 实现完整性 - [ ] 配置加载:合法 `.mewcode/config.yaml` 能解析出 providers 列表(验证:单测 + 启动进入对话)。(AC1/F1) - [ ] 配置校验:缺密钥 / 非法 protocol / 文件缺失时给出可读错误并非零退出,无未捕获堆栈(验证:删字段 / 改 protocol / 删文件分别运行)。(AC1/N4) - [ ] 单 provider 直进:仅一条配置时启动直接进入对话(验证:单条配置运行)。(AC2/F2) - [ ] 多 provider 选择:多条配置时出现方向键列表,选定后进入对话(验证:两条配置运行、上下选择 + Enter)。(AC2/F2) - [ ] 内置 system prompt 与历史随请求发送(验证:问"你的角色 / 规则",回答体现内置 prompt;多轮见 AC6)。(AC4/F4) - [ ] thinking:anthropic 配 `thinking: true` 时启用,且界面不出现任何思考文本(验证:开启后观察仅最终回复)。(AC5/F5) - [ ] 流式逐字:回复以纯文本逐字出现(验证:长回复肉眼可见逐步输出)。(AC5/F8) - [ ] markdown 定型:回复结束后整段以 markdown 渲染(代码块 / 列表 / 强调正确)(验证:让模型输出含代码块与列表的内容)。(AC8/F8) - [ ] 多行输入:Alt+Enter(或实测组合键)换行、Enter 提交、提交后输入框清空(验证:输入两行后提交)。(AC9/F9) - [ ] 响应计时:自提交即显示 `Imagining… (Ns)` 且秒数递增,结束后显示总耗时(验证:发一条慢回复观察)。(AC12/F12) - [ ] 错误反馈:错误 key / 不存在模型时,错误在对话区可区分样式(红色)显示且不退出(验证:改坏 key 运行后再正常发一条)。(AC11/F11) - [ ] 退出:`/exit` 与 Ctrl+C 均能安全退出,终端恢复正常(验证:两种方式各试一次,观察无残留 / 错乱)。(AC10/F10/N7) - [ ] 界面布局:启动含猫 banner + 名称版本 + cwd + 就绪提示行 + `❯` 占位输入框 + 状态栏(左 name 右 model)(验证:启动截图比对)。(AC7/F7) ## 集成 - [ ] tui 通过统一 `Provider` 接口驱动两种协议,切换协议不改变上层交互(验证:分别用 anthropic / openai 配置跑同一组对话,行为一致)。(AC3/N3) - [ ] 多轮上下文携带:先告知信息、后追问,模型能正确引用前文;退出再启动后历史为空(验证:两轮对话 + 重启验证)。(AC6/F6) - [ ] 流式不阻塞:等待 / 流式期间界面仍响应、不冻结(验证:长回复期间界面持续刷新;virtual thread 跑 SDK 流,UI 线程通过 `invokeLater` 更新)。(AC13/N1) - [ ] scrollback 渲染(Claude Code 风格):完成的消息(用户输入 / 助手回复 / 错误)追加到 `scrollback` Panel,可滚动回看;动态区仅含输入框 + 正在流式的回复 + 状态栏(验证:tmux 多轮后回滚查看历史)。 - [ ] base_url 覆盖:为某 provider 配自定义 `base_url`(兼容端点)可正常收发(验证:配一个兼容端点跑通一轮)。(F3) - [ ] 窗口自适应:缩放终端宽度后输入框 / 对话区 / markdown 不错版(验证:运行中调整终端宽度;flexmark wrap 列数随 Lanterna `TerminalResize` 事件更新)。(N6) ## 编译与测试 - [ ] `mvn -q -DskipTests package` 无错误(fat jar 可启动)。 - [ ] `mvn test` 通过(`ConfigLoaderTest`、`ConversationTest`)。 - [ ] `mvn spotless:check` 通过(如启用 google-java-format)。 - [ ] 密钥不回显 / 不打印:对话区与任何输出均不出现 `api_key`(验证:通读运行输出、检索无明文 key)。(N5) ## 端到端场景 - [ ] 场景 1(anthropic 多轮):单条 anthropic 配置启动 → 连续两轮、第二轮引用第一轮 → 流式 + 计时 + markdown 定型 → `/exit` 退出。 - [ ] 场景 2(openai 流式):openai 协议配置 → 发一条含代码块的请求 → 流式逐字后 markdown 渲染正确。 - [ ] 场景 3(多 provider 选择):两条配置 → 启动出现列表 → 选第二条 → 状态栏显示其 name / model → 正常对话。 - [ ] 场景 4(错误恢复):错误 key 触发失败 → 对话区红色错误、程序不退出 → 修正后(重启)继续正常对话。
代码解析
llm 负责与大模型通信, conversation 负责管理对话历史。一共 12 个文件,每个文件职责单一,读起来不会迷路。
| 文件 | 职责 |
| llm/LlmClient.java | 核心接口 + 静态工厂,对外只暴露 stream () 和 setMaxOutputTokens () |
| llm/StreamEvent.java | sealed interface + 8 个 record,流式事件的「词汇表」 |
| llm/AnthropicClient.java | 基于 anthropic-java 官方 SDK 的流式客户端 |
| llm/OpenAiClient.java | 基于 openai-java 官方 SDK 的流式客户端 (Responses API) |
| llm/OpenAiCompatClient.java | 走 /chat/completions 协议的兼容客户端,适配 vLLM、Ollama 等 OpenAI 兼容服务 |
| llm/LlmException.java | 4 个异常子类,把 SDK 异常翻译成语义 |
| llm/ModelResolver.java | 模型别名解析,haiku、sonnet、opus 短名映射 |
| conversation/ConversationManager.java | 对话管理器,维护消息历史 |
| conversation/Message.java | 可变消息类,承载文本、思考块、工具调用 |
| conversation/ThinkingBlock.java | 思考块 record |
| conversation/ToolUseBlock.java | 工具调用块 record |
| conversation/ToolResultBlock.java | 工具结果块 record |
如果你只想抓住主线,重点看 LlmClient 、 StreamEvent 、 AnthropicClient 这三个就够了。OpenAI 那边结构几乎一样,只是 SDK 的 builder 长得不同。 OpenAiCompatClient 是给 vLLM、Ollama 这类 OpenAI 兼容服务准备的,走更通用的 /chat/completions 协议,本篇不展开。
核心类型
LlmClient:一个接口,两行契约
public interface LlmClient { BlockingQueue<StreamEvent> stream( ConversationManager conv, List<Map<String, Object>> tools); default void setMaxOutputTokens(int tokens) {} static LlmClient create(ProviderConfig cfg, String systemPrompt) { return switch (cfg.getProtocol()) { case "anthropic" -> new AnthropicClient(cfg, systemPrompt); case "openai" -> new OpenAiClient(cfg, systemPrompt); case "openai-compat" -> new OpenAiCompatClient(cfg, systemPrompt); default -> throw new IllegalArgumentException( "Unknown protocol: " + cfg.getProtocol()); }; } }
整个接口只有一个核心方法 stream() ,入参是对话管理器和工具列表,返回一个 BlockingQueue<StreamEvent> 。调用方不需要关心底层是 Anthropic 还是 OpenAI,拿到队列之后不停 take() 就行,直到收到 StreamEnd 或 Error 。
setMaxOutputTokens() 是个 default 方法,给了个空实现。这样上层想动态调整输出长度时可以调,不想管的话也不用实现。这种「可选配置」用 default 方法比加 setter 接口要轻量得多。
最有趣的是那个 create() 静态工厂方法。它直接写在接口里,用 switch 表达式根据协议字段分发到具体实现。调用方只需要 LlmClient.create(cfg, prompt) 一行,完全不碰具体类名。接口里写静态方法从 Java 8 起就支持,这里用它把工厂和契约放在同一个地方,调用方只需要认识一个类型。
StreamEvent:sealed interface 定义事件词汇表
public sealed interface StreamEvent { record TextDelta(String text) implements StreamEvent {} record ThinkingDelta(String text) implements StreamEvent {} record ThinkingComplete(String thinking, String signature) implements StreamEvent {} record ToolCallStart(String toolId, String toolName) implements StreamEvent {} record ToolCallDelta(String text) implements StreamEvent {} record ToolCallComplete(String toolId, String toolName, Map<String, Object> arguments) implements StreamEvent {} record StreamEnd(String stopReason, int inputTokens, int outputTokens) implements StreamEvent {} record Error(String message) implements StreamEvent {} }
8 个 record,全部密封在一个接口里。 sealed 的好处是编译器帮你检查:如果你在 switch 里漏掉了某个事件类型,编译会警告。而 record 本身就是不可变的值对象,不用写 getter、 equals() 、 hashCode() ,天然适合做事件载体。
从命名上可以看出事件的生命周期。文本流有 TextDelta ;思考流有 ThinkingDelta 和 ThinkingComplete (后者带签名,用于多轮传回);工具调用拆成 Start → Delta → Complete 三段;最后以 StreamEnd 或 Error 收尾。这套词汇表是 provider 无关的,不管后端是 Claude 还是 GPT,上层看到的都是同一组事件类型。
ConversationManager:对话历史的容器
public class ConversationManager { private final List<Message> history = new ArrayList<>(); private boolean ltmInjected = false; public void addUserMessage(String content) { history.add(new Message("user", content)); } public void addAssistantMessage(String content) { history.add(new Message("assistant", content)); } public void addAssistantFull(String text, List<ThinkingBlock> thinking, List<ToolUseBlock> toolUses) { ... } public void addToolResultsMessage( List<ToolResultBlock> results) { ... } public List<Message> getMessages() { return List.copyOf(history); } }
主流程走读
我们以 Anthropic 为例,从上层调 stream() 开始,一路走到收到最后一个事件。
第一步:虚拟线程 + 队列的生产者-消费者模型
public BlockingQueue<StreamEvent> stream( ConversationManager conv, List<Map<String, Object>> tools) { var queue = new LinkedBlockingQueue<StreamEvent>(64); Thread.startVirtualThread(() -> { try { doStream(conv, tools, queue); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { try { queue.put(new StreamEvent.Error( classifyError(e).getMessage())); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } }); return queue; }
这段代码做的事情很简洁:创建一个容量 64 的阻塞队列,启动一个虚拟线程去执行真正的流式请求,然后立刻把队列返回给调用方。调用方可以马上开始 take() ,虚拟线程在后台不断往队列里 put() 事件。
为什么用虚拟线程而不是 CompletableFuture 或 Reactor?因为 SSE 流式读取本质上是一个阻塞循环:不断读下一个事件,直到流结束。用虚拟线程写阻塞代码,读起来像同步逻辑一样直白,但不会占用平台线程。队列容量设成 64 是一个背压机制:如果消费方处理不过来,生产方在 put() 时会自动阻塞,不会把内存撑爆。
外层的 catch 也值得注意:任何异常都不会悄悄丢失,而是被 classifyError() 翻译成业务异常后,包装成 StreamEvent.Error 塞进队列。这样消费方不需要额外的错误回调,只要在正常的事件循环里处理 Error 事件就行。一条通道,既走数据又走错误。
第二步:用 SDK builder 构建请求
var paramsBuilder = MessageCreateParams.builder() .model(model) .maxTokens(maxOutputTokens) .system(systemPrompt) .messages(buildMessages(conv.getMessages()));
以前的版本是手动拼 Map,key 写错了只有运行时才炸。现在用 SDK 的 builder,每个字段都有类型检查,IDE 还能自动补全。 model 是经过 ModelResolver.resolve() 之后的全名, messages 是通过 buildMessages() 从内部 Message 转出来的 List<MessageParam> 。
thinking 的配置根据模型能力分两种路径:
if (thinking) { if (ModelResolver.supportsAdaptiveThinking(model)) { paramsBuilder.thinking( ThinkingConfigAdaptive.builder().build()); } else { paramsBuilder.thinking(ThinkingConfigEnabled.builder() .budgetTokens(maxOutputTokens - 1) .build()); } }
Claude Opus 4.6 和 Sonnet 4.6 支持自适应 thinking,不需要手动指定 token 预算,SDK 提供了 ThinkingConfigAdaptive 。老一点的模型则要用 ThinkingConfigEnabled 并明确给出 budgetTokens 。这里设成 maxOutputTokens - 1 是因为 API 要求 thinking 预算必须严格小于 max_tokens。
第三步:SDK 流式迭代
请求构建好之后,用 SDK 的 createStreaming() 发出去,拿到一个 StreamResponse :
try (StreamResponse<RawMessageStreamEvent> streamResponse = sdkClient.messages() .createStreaming(paramsBuilder.build())) { var iterator = streamResponse.stream().iterator(); while (iterator.hasNext()) { var event = iterator.next(); // 处理每一个 SSE 事件 } }
StreamResponse 实现了 AutoCloseable ,放在 try-with-resources 里,流结束或出错时自动关闭底层连接。内部的 stream() 返回一个 Java Stream,我们取它的 iterator 来逐个处理事件。
事件处理的核心逻辑是一个 if-else 链,根据事件类型分发:
if (event.isContentBlockStart()) { var block = event.asContentBlockStart().contentBlock(); if (block.isThinking()) { inThinking = true; thinkingAccum.setLength(0); } else if (block.isToolUse()) { var tu = block.asToolUse(); currentToolName = tu.name(); currentToolId = tu.id(); queue.put(new StreamEvent.ToolCallStart( currentToolId, currentToolName)); } }
contentBlockStart 标志着一个新内容块的开始。如果是 thinking 块,重置累加器准备收集思考文本;如果是 tool_use 块,记下工具名和 ID,同时往队列里发一个 ToolCallStart 事件。
else if (event.isContentBlockDelta()) { var delta = event.asContentBlockDelta().delta(); if (delta.isThinking()) { queue.put(new StreamEvent.ThinkingDelta( delta.asThinking().thinking())); } else if (delta.isText()) { queue.put(new StreamEvent.TextDelta( delta.asText().text())); } else if (delta.isInputJson()) { jsonAccum.append(delta.asInputJson().partialJson()); queue.put(new StreamEvent.ToolCallDelta( delta.asInputJson().partialJson())); } }
delta 阶段是流式数据的主体。注意工具调用的参数是以 JSON 片段的形式一点一点传过来的,需要用 jsonAccum 累加起来,等到 contentBlockStop 的时候再完整解析。这也是为什么工具调用要拆成 Start、Delta、Complete 三个事件的原因:开始时知道调哪个工具,中间是参数的碎片流,结束时才能拿到完整参数。
两层消息模型与格式转换
这一章最关键的设计决策之一是「两层消息模型」。内部用统一的 Message 对象存储对话历史,发请求时再转成各家 SDK 要求的类型。
为什么不直接存 SDK 类型?
如果把 Anthropic 的 MessageParam 直接存进 ConversationManager ,那切换到 OpenAI 的时候整个历史就没法用了。而如果存原始的 Map,虽然灵活,但没有类型安全,字段名拼错了编译器不会提醒。
所以折中方案是: conversation 包里定义自己的 Message 、 ThinkingBlock 、 ToolUseBlock 、 ToolResultBlock ,它们是 provider 无关的。然后每个 Client 内部写一个转换方法,把这些类型映射到 SDK 类型。
AnthropicClient.buildMessages()
这个方法的核心任务是把 Message 列表转成 MessageParam 列表。直觉上觉得应该很简单,但实际上有不少细节。
if ("assistant".equals(msg.getRole()) && (hasThinking || hasToolUses)) { var content = new ArrayList<ContentBlockParam>(); if (hasThinking) { for (var tb : msg.getThinkingBlocks()) { content.add(ContentBlockParam.ofThinking( ThinkingBlockParam.builder() .thinking(tb.thinking()) .signature(tb.signature()) .build())); } } if (msg.getContent() != null && !msg.getContent().isEmpty()) { content.add(ContentBlockParam.ofText( TextBlockParam.builder() .text(msg.getContent()).build())); } // ... tool use blocks result.add(MessageParam.builder() .role(MessageParam.Role.ASSISTANT) .contentOfBlockParams(content) .build()); }
当一条 assistant 消息同时包含思考块、文本和工具调用时,需要把它们全部塞进一个 ContentBlockParam 列表里。SDK 的 MessageParam 对 content 字段有两种表达:纯文本可以直接传 String,混合内容则要用 contentOfBlockParams() 传一个列表。
思考块特别需要注意 signature 字段。Claude 的 API 要求多轮对话中,上一轮的 thinking 在回传时必须带上签名,用于验证这确实是模型自己产出的思考内容而不是用户伪造的。这就是为什么 ThinkingComplete 事件里有 signature,而 ThinkingBlock record 里也保存了它。
连续同角色消息合并
Anthropic 的 API 有一个硬性约束:消息列表必须严格交替 user → assistant → user → assistant。但实际对话中经常出现连续的 user 消息(比如用户发了一条文字又发了一个工具结果),这时就需要合并。
private List<MessageParam> mergeConsecutiveSameRole( List<MessageParam> messages) { var merged = new ArrayList<MessageParam>(); merged.add(messages.getFirst()); for (int i = 1; i < messages.size(); i++) { var prev = merged.getLast(); var curr = messages.get(i); if (prev.role().equals(curr.role())) { if (prevContent.isString() && currContent.isString()) { merged.set(merged.size() - 1, MessageParam.builder() .role(prev.role()) .content(prevContent.asString() + "\n\n" + currContent.asString()) .build()); } else { merged.add(curr); } } else { merged.add(curr); } } return merged; }
逻辑很直白:遍历列表,如果当前消息和前一条角色相同且都是纯文本,就拼到一起。如果其中有一条是复杂内容(block 列表),就不合并,直接保留。这种处理方式略显保守,但胜在安全:混合内容的合并容易出错,不如让 API 自己报错来得清楚。
OpenAI 那边怎么做?
OpenAI 的 buildInput() 结构上类似,但用的是 Responses API 的类型体系:
var role = "assistant".equals(msg.getRole()) ? EasyInputMessage.Role.ASSISTANT : EasyInputMessage.Role.USER; result.add(ResponseInputItem.ofEasyInputMessage( EasyInputMessage.builder() .role(role) .content(msg.getContent()) .build()));
工具调用在 OpenAI 侧是 ResponseFunctionToolCall ,工具结果是 FunctionCallOutput 。类名不同,但映射逻辑是对称的。这也印证了两层模型的价值:内部 Message 不变,只有最后一公里的转换代码不同。
错误分类与 SDK 异常映射
LLM API 的错误五花八门,但对上层来说只关心几种:认证失败、限流、上下文太长、网络中断。 LlmException 定义了这四个子类, classifyError() 负责把 SDK 抛出的异常翻译过来。
private LlmException classifyError(Exception e) { if (e instanceof LlmException le) return le; if (e instanceof com.anthropic.errors.UnauthorizedException ue) { return new LlmException.AuthenticationException( "Invalid API key: " + ue.getMessage()); } if (e instanceof com.anthropic.errors.RateLimitException) { return new LlmException.RateLimitException( "Rate limited. Please wait.", ""); } if (e instanceof com.anthropic.errors.BadRequestException bre) { String msg = bre.getMessage() != null ? bre.getMessage().toLowerCase() : ""; if (msg.contains("prompt is too long") || msg.contains("too many tokens")) { return new LlmException.ContextTooLongException( "Context too long: " + bre.getMessage()); } } if (e instanceof com.anthropic.errors.AnthropicIoException) { return new LlmException.NetworkException( "Network error: " + e.getMessage(), e); } return new LlmException( "Unexpected error: " + e.getMessage(), e); }
对比之前手动解析 HTTP 状态码的版本,现在直接 instanceof SDK 的异常类型,既准确又不容易遗漏。 BadRequestException 比较特殊:它涵盖了多种 400 错误,所以还需要检查消息内容来区分「上下文太长」和其他 bad request。
这个翻译层看着不起眼,但它让上层代码完全不需要知道底层用的是哪家 SDK。Agent Loop 里只需要 catch (LlmException e) 然后检查是不是 RateLimitException 、是不是 ContextTooLongException ,处理逻辑就明确了。
模型解析与能力探测
private static final Map<String, String> ALIASES = Map.of( "haiku", "claude-haiku-4-5-20251001", "sonnet", "claude-sonnet-4-6-20250514", "opus", "claude-opus-4-6-20250514"); public static String resolve(String model) { return ALIASES.getOrDefault(model, model); } public static boolean supportsAdaptiveThinking(String model) { String resolved = resolve(model); return resolved.contains("opus-4-6") || resolved.contains("sonnet-4-6"); }
ModelResolver 很小但很重要。用户在配置文件里写 model: sonnet ,代码里通过 resolve() 拿到完整的模型 ID。 supportsAdaptiveThinking() 则根据模型 ID 中的版本号判断是否支持自适应 thinking。这样当新模型发布时,只需要更新这个 Map 和判断逻辑,其他代码一行不用改。
你可能注意到 supportsAdaptiveThinking() 用的是 contains() 而不是精确匹配。这是个务实的选择:模型 ID 的格式可能随时间变化(比如日期后缀),用子串匹配更不容易误判。
总结
| 设计决策 | 具体做法 | 收益 |
| 接口 + 静态工厂 | LlmClient.create () 根据 protocol 分发 | 调用方只认一个类型,新增 provider 只加一个 case |
| sealed interface + record | StreamEvent 8 个密封 record | 编译期穷举检查,不可变,零样板代码 |
| 虚拟线程 + 阻塞队列 | stream () 返回 BlockingQueue,虚拟线程后台生产 | 同步写法、自动背压、异常也走队列 |
| 两层消息模型 | 内部 Message 与 SDK 类型(MessageParam/ResponseInputItem)分离 | provider 无关的历史存储,格式转换各管各的 |
| SDK builder 替代手拼 Map | MessageCreateParams.builder() / ResponseCreateParams.builder() | 类型安全、IDE 补全、少一类 runtime 拼写错误 |
| 同角色消息合并 | mergeConsecutiveSameRole () 在发送前处理 | 满足 Anthropic 严格交替约束,对上层透明 |
| SDK 异常映射 | classifyError () 用 instanceof 匹配 SDK 异常类 | 比解析 HTTP 状态码更准确,新异常类型加了就能 catch |
| 模型别名 + 能力探测 | ModelResolver 集中管理短名和能力标记 | 用户配置简洁,能力判断有单一出口 |
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)