eino进阶
3.2 chain graph
在大模型应用中,Components 组件是提供 『原子能力』的最小单元,比如:
ChatModel提供了大模型的对话能力Embedding提供了基于语义的文本向量化能力Retriever提供了关联内容召回的能力ToolsNode提供了执行外部工具的能力
一个大模型应用,除了需要这些原子能力之外,还需要根据场景化的业务逻辑,对这些原子能力进行组合、串联,这就是 『编排』。
大模型应用的开发有其自身典型的特征: 自定义的业务逻辑本身不会很复杂,几乎主要都是对『原子能力』的组合串联。
传统代码开发过程中,业务逻辑用 “代码的执行逻辑” 来表达,迁移到大模型应用开发中时,最直接想到的方法就是 “自行调用组件,自行把结果作为下一组件的输入进行调用”。这样的结果,就是 代码杂乱、很难复用、没有切面能力……
于是,Eino 提供了 “基于 Graph 模型 (node + edge) 的,以组件为原子节点的,以上下游类型对齐为基础的编排” 的解决方案。
- 编排要成为在业务逻辑之上的清晰的一层,不能让业务逻辑融入到编排中。业务逻辑复杂度封装到组件内部,编排层拥有更全局的视角,让逻辑层次变得非常清晰。编排代码里,绝对不出现业务规则、业务判断、业务处理代码。
- 一切以 “组件” 为核心,规范了业务功能的封装方式,让职责划分变得清晰,让复用变成自然而然
- 抽象视角看编排:编排是在构建一张网络,数据则在这个网络中流动,网络的每个节点都对流动的数据有格式/内容的要求,一个能顺畅流动的数据网络,关键就是 “上下游节点间的数据格式是否对齐?”。提供了 “类型对齐” 的开发方式的强化,降低开发者心智负担,把 golang 的类型安全特性发挥出来
- 提供了切面能力,callback 机制支持了基于节点的统一治理能力
- 提供了 call option 的机制,扩展性是快速迭代中的系统最基本的诉求
- 提供了 “流的自动转换” 能力,让 “流” 在「编排系统的复杂性来源榜」中除名
设计理念
Graph 本身是强大且语义完备的,可以用这项底层几乎绘制出所有的 “数据流动网络”,比如 “分支”、“并行”、“循环”。
但 Graph 并不是没有缺点的,基于 “点” “边” 模型的 Graph 在使用时,要求开发者要使用
graph.AddXXXNode()和graph.AddEdge()两个接口来创建一个数据通道,强大但是略显复杂。而在现实的大多数业务场景中,往往仅需要 “按顺序串联” 即可,因此,Eino 封装了接口更易于使用的
Chain。Chain 是对 Graph 的封装,除了 “环” 之外,Chain 暴露了几乎所有 Graph 的能力。
编排对比
Chain = 一条流水线(线性)
Graph = 一张流程图(可分支 / 可循环 / 可并行)👉 Chain = 固定流程(你写死)
👉 Agent = 动态决策(模型决定怎么走)
Chain 很快会不够用,比如: 做不了这种 动态决策
如果用户问代码 → 调 code tool
如果问文档 → 调 rag
如果问天气 → 调 API
Graph(图编排)
节点 + 边(有向图)
┌── Tool A ──┐
User → LLM → LLM → 输出
└── Tool B ──┘
甚至可以:
┌─条件1─→ 节点A ─┐
输入 → 判断 → 输出
└─条件2─→ 节点B ─┘
-
支持动态路径
-
支持循环(Agent本质)
-
支持多工具调度
-
可扩展复杂逻辑
以上下游 类型对齐 为基本准则
大模型应用编排框架的主流语言是 python,这门语言以其灵活性著称,灵活性给 sdk 的开发带来便利,但同时也给 sdk 的使用者带来了心智负担。
基于 golang 的 eino 则是 静态类型 ,在 Compile 时做类型检查,避免了 python 等动态语言的运行时类型问题。
eino 的最基础编排方式为 graph,以及简化的封装 chain。不论是哪种编排方式,其本质都是
逻辑节点+上下游关系。在编排的产物运行时,都是从一个逻辑节点运行,然后下一步运行和这个节点相连的下一个节点。
这之间蕴含了一个基本假设:前一个运行节点的输出值,可以作为下一个节点的输入值。
在 golang 中,要实现这个假设,有两个基本方案:
-
把不同节点的输入输出都变成一种更泛化的类型,例如
any、map[string]any等。-
采用泛化成 any 的方案,但对应的代价是: 开发者在写代码时,需要显式转换成具体类型才能使用。这会极大增加开发者的心智负担,因此最终放弃此方案。
-
langchain 的方案可以看做是全程传递
map[string]any,各个逻辑节点根据自己的需要,用对应的 key 去取对应的 value。在 langchaingo 的实现中,即是按照这种方式实现,但同样,golang 中的 any 要被使用依然要使用类型断言才可使用。这种方案在开发者使用时依然有很大的心智负担。
-
-
每一个节点的输入输出类型保持开发者的预期,在 Compile 阶段保证上下游的类型是一致的。
方案 2 即是 eino 最终选定的方案。这种方案是编排时最容易被理解的,整个过程就像是 搭积木 一样,每一个积木突出的部分和凹陷的部分有各自的规格,仅有规格匹配了才能成为上下游关系。
就如下图:
对于一个编排而言,只有下游能识别和处理上游的输出,这个编排才能正常运行。 这个基本假设在 eino 中被清晰地表达了出来,让开发者在用 eino 做编排时,能够有十足的信心清楚编排的逻辑是如何运行和流转的,而不是从一系列的 any 中去猜测传过来的值是否正确。
graph 中的类型对齐
edge
在 graph 中,一个节点的输出将顺着 边(edge) 流向下一节点,因此,用边连接的节点间必须要类型对齐。
普通边(绿色)= 直接赋值 = 类型必须对齐带转换的边(黄色)= 先转再赋 = 可以不对齐
Graph 里的节点连接:不一定需要天然类型对齐(因为 Graph 支持自动转换、InputKey/OutputKey、黄色边转换)
如下图:
这是一个模拟 ① 直接和大模型对话 ② 使用 RAG 模式 的场景,最后结果可用于对比两种模式的效果
图中绿色的部分,就是普通的 Edge 连接,其要求上游的输出必须能 assign 给下游,可以接收的类型有:
① 上下游类型相同: 例如上游输出 *schema.Message 下游输入也是 *schema.Message
② 下游接收接口,上游实现了该接口: 例如上游结构体实现了 Format() 接口,下游接收的是一个 interface{ Format() }。特殊情况是下游是 any(空接口),上游一定实现了 any,因此一定可以连接。
③ 上游是 interface,下游是具体类型: 当下游具体类型 implements 上游的 interface 类型时,有可能可以,有可能不行,在 compile 时无法确定,只有在运行时,等上游的具体类型确定了,才能最终确定。时,详细描述可见: Eino: 编排的设计理念
图中黄色的部分,则是 eino 提供的另一个类型转换的机制,即: 若下游接收的类型是 map[string]any,但是上游输出的类型并不是 map[string]any,可以使用 graph.AddXXXNode(node_key, xxx, compose.WithOutputKey("outkey") 的方式将上游输出的类型转化为 map[string]any,其中 map 的 key 是 option 中指定的 OutputKey。 一般在多条边汇聚到某一个节点时,这种机制使用起来较为方便。
同理,若上游是 map[string]any ,但是下游输入的类型并不是 map[string]any,则可以使用 graph.AddXXXNode(node_key, xxx, compose.WithInputKey("inkey") 来获取上游输出的其中一个 key 的 value,作为下游的输入。
branch
如果一个节点后面连接了多个 edge,则每条 edge 的下游节点都会运行一次。
branch 则是另一种机制: 一个 branch 后接了 n 个节点,但仅会运行 condition 返回的那个 node key 对应的节点。同一个 branch 后的节点,必须要类型对齐。
如下图:
这是一个模拟 react agent 的运行逻辑
可以看到,一个 branch 本身拥有一个 condition, 这个 function 的输入必须和上游类型对齐。同时,一个 branch 后所接的各个节点,也必须和 condition 一样,要能接收上游的输出。
chain 中的类型对齐
chain
从抽象角度看,chain 就是一个 链条,如下所示:
逻辑节点的类型可以分为 3 类:
-
可编排组件 (例如 chat model、 chat template、 retriever、 lambda、graph 等等)
-
branch 节点
-
parallel 节点
可以看到,在 chain 的视角下,不论是简单的节点(eg: chat model) 还是复杂的节点 (eg: graph、branch、parallel),都是一样的,在运行过程中,一步的执行就是一个节点的运行。
也因此,chain 的上下游节点间,类型必须是对齐的,如下:
func TestChain() {
chain := compose.NewChain[map[string]interface,string]()
nodeTemplate := &fakeChatTemplate{} // input: map[string]any, output: []*schema.Message
nodeHistoryLambda := &fakeLambda{} // input: []*schema.Message, output: []*schema.Message
nodeChatModel := &fakeChatModel{} // input: []*schema.Message, output: *schema.Message
nodeConvertResLambda := &fakeLambda{} // input: *schema.Message, output: string
chain.
AppendChatTemplate(nodeTemplate).
AppendLambda(nodeHistoryLambda).
AppendChatModel(nodeChatModel).
AppendLambda(nodeConvertResLambda)
}
上面的逻辑用图来表示如下:
若上下游的类型没有对齐,chain 会在 chain.Compile() 时返回错误。而 graph 会在 graph.AddXXXNode() 时就报错。
parallel
parallel 在 chain 中是一类特殊的节点,从 chain 的角度看 parallel 和其他的节点没啥区别。在 parallel 内部,其基本拓扑结构如下:
graph 中的多 edge 形成的结构其中一种就是这个,这里的基本假设是: 一个 parallel 的每一条边上有且仅有一个节点。当然,这一个节点也可以是 graph。但注意,目前框架没有直接提供在 parallel 中嵌套 branch 或 parallel 的能力。
在 parallel 中的每个节点,由于其上游节点是同一个,因此他们都要和上游节点的输出类型对齐,比如图中上游节点输出了 *schema.Message ,则每个节点都要能接收这个类型。接收的方式和 graph 中的一致,通常可以用 相同类型 、接口定义 、any、input key option 的方式。
parallel 节点的输出一定是一个 map[string]any,其中的 key 则是在 parallel.AddXXX(output_key, xxx, opts...) 时指定的 output_key,value 是节点内部的实际输出。
一个 parallel 的构建例子如下:
func TestParallel() {
chain := compose.NewChain[map[string]any, map[string]*schema.Message]()
parallel := compose.NewParallel()
model01 := &fakeChatModel{} // input: []*schema.Message, output: *schema.Message
model02 := &fakeChatModel{} // input: []*schema.Message, output: *schema.Message
model03 := &fakeChatModel{} // input: []*schema.Message, output: *schema.Message
parallel.
AddChatModel("outkey_01", model01).
AddChatModel("outkey_02", model02).
AddChatModel("outkey_03", model03)
lambdaNode := &fakeLambdaNode{} // input: map[string]any, output: map[string]*schema.Message
chain.
AppendParallel(parallel).
AppendLambda(lambdaNode)
}
一个 parallel 在 chain 中的视角如下:
图中是模拟同一个提问,由不同的大模型去回答,结果可用于对比效果
需要注意的是,这个结构只是逻辑上的视角,由于 chain 本身也是用 graph 实现的,parallel 在底层 graph 中会平铺到图中。
branch
chain 的 branch 和 graph 中的 branch 类似,branch 中的所有节点都要和上游节点的类型对齐,此处不再赘述。chain branch 的特殊之处是,branch 的所有可能的分支节点,都会连到 chain 中的同一个节点,或者都会连到 END。
Workflow 中的类型对齐
Workflow 的类型对齐的维度,由整体的 Input & Output 改成了字段级别。具体可分为:
-
上游输出的整体,类型对齐到下游的某个具体字段。
-
上游输出的某个具体字段,类型对齐到下游的整体。
-
上游输出的某个具体字段,类型对齐到下游输入的某个具体字段。
原理和规则与整体的类型对齐相同。
StateHandler 的类型对齐
StatePreHandler: 输入类型需要对齐对应节点的非流式输入类型。
// input 类型为 []*schema.Message,对齐 ChatModel 的非流式输入类型
preHandler := func(ctx context.Context, input []*schema.Message, state *state) ([]*schema.Message, error) {
// your handler logic
}
AddChatModelNode("xxx", model, WithStatePreHandler(preHandler))
StatePostHandler: 输入类型需要对齐对应节点的非流式输出类型。
// input 类型为 *schema.Message,对齐 ChatModel 的非流式输出类型
postHandler := func(ctx context.Context, input *schema.Message, state *state) (*schema.Message, error) {
// your handler logic
}
AddChatModelNode("xxx", model, WithStatePostHandler(postHandler))
StreamStatePreHandler: 输入类型需要对齐对应节点的流式输入类型。
// input 类型为 *schema.StreamReader[[]*schema.Message],对齐 ChatModel 的流式输入类型
preHandler := func(ctx context.Context, input *schema.StreamReader[[]*schema.Message], state *state) (*schema.StreamReader[[]*schema.Message], error) {
// your handler logic
}
AddChatModelNode("xxx", model, WithStreamStatePreHandler(preHandler))
StreamStatePostHandler: 输入类型需要对齐对应节点的流式输出类型。
// input 类型为 *schema.StreamReader[*schema.Message],对齐 ChatModel 的流式输出类型
postHandler := func(ctx context.Context, input *schema.StreamReader[*schema.Message], state *state) (*schema.StreamReader[*schema.Message], error) {
// your handler logic
}
AddChatModelNode("xxx", model, WithStreamStatePostHandler(postHandler))
invoke 和 stream 下的类型对齐方式
在 Eino 中,编排的结果是 graph 或 chain,若要运行,则需要使用 Compile() 来生成一个 Runnable 接口。
Runnable 的一个重要作用就是提供了 「Invoke」、「Stream」、「Collect」、「Transform」 四种调用方式。
上述几种调用方式的介绍以及详细的 Runnable 介绍可以查看: Eino 流式编程要点
假设我们有一个 Graph[[]*schema.Message, []*schema.Message],里面有一个 ChatModel 节点,一个 Lambda 节点,Compile 之后是一个 Runnable[[]*schema.Message, []*schema.Message]。
package main
import (
"context"
"io"
"testing"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/stretchr/testify/assert"
)
func TestTypeMatch(t *testing.T) {
ctx := context.Background()
g1 := compose.NewGraph[[]*schema.Message, string]()
_ = g1.AddChatModelNode("model", &mockChatModel{})
_ = g1.AddLambdaNode("lambda", compose.InvokableLambda(func(_ context.Context, msg *schema.Message) (string, error) {
return msg.Content, nil
}))
_ = g1.AddEdge(compose.START, "model")
_ = g1.AddEdge("model", "lambda")
_ = g1.AddEdge("lambda", compose.END)
runner, err := g1.Compile(ctx)
assert.NoError(t, err)
c, err := runner.Invoke(ctx, []*schema.Message{
schema.UserMessage("what's the weather in beijing?"),
})
assert.NoError(t, err)
assert.Equal(t, "the weather is good", c)
s, err := runner.Stream(ctx, []*schema.Message{
schema.UserMessage("what's the weather in beijing?"),
})
assert.NoError(t, err)
var fullStr string
for {
chunk, err := s.Recv()
if err != nil {
if err == io.EOF {
break
}
panic(err)
}
fullStr += chunk
}
assert.Equal(t, c, fullStr)
}
当我们以 Stream 方式调用上面编译好的 Runnable 时,model 节点会输出 *schema.StreamReader[*Message],但是 lambda 节点是 InvokableLambda,只接收非流式的 *schema.Message 作为输入。这也符合类型对齐规则,因为 Eino 框架会自动把流式的 Message 拼接成完整的 Message。
在 stream 模式下,拼接帧 是一个非常常见的操作,拼接时,会先把 *StreamReader[T] 中的所有元素取出来转成 []T,再尝试把 []T 拼接成一个完整的 T。框架内已经内置支持了如下类型的拼接:
-
*schema.Message: 详情见schema.``ConcatMessages``() -
string: 实现逻辑等同于+= -
[]*schema.Message: 详情见compose.concatMessageArray() -
Map: 把相同 key 的 val 进行合并,合并逻辑同上,若存在无法合并的类型,则失败 (ps: 不是覆盖) -
其他 slice:只有当 slice 中只有一个元素是非零值时,才能合并。
对其他场景,或者当用户想用定制逻辑覆盖掉上面的默认行为时,开发者可自行实现 concat 方法,并使用 compose.RegisterStreamChunkConcatFunc() 注册到全局的拼接函数中。
示例如下:
// 假设我们自己的结构体如下
type tStreamConcatItemForTest struct {
s string
}
// 实现一个拼接的方法
func concatTStreamForTest(items []*tStreamConcatItemForTest) (*tStreamConcatItemForTest, error) {
var s string
for _, item := range items {
s += item.s
}
return &tStreamConcatItemForTest{s: s}, nil
}
func Init() {
// 注册到全局的拼接方法中
compose.RegisterStreamChunkConcatFunc(concatTStreamForTest)
}
类型对齐在运行时检查的场景
eino 的 Graph 类型对齐检查,会在 err = graph.AddEdge("node1", "node2") 时检查两个节点类型是否匹配,也就能在 构建 graph 的过程,或 Compile 的过程 发现类型不匹配的错误,这适用于 Eino: 编排的设计理念 中所列举的 ① ② ③ 条规则。
当上游节点的输出为 interface 时,若下游节点类型实现了该 interface,则上游有可能可以转成下游类型 (类型断言),但只能在 运行过程 才能清楚能否转换成功,该场景的类型检查移到了运行过程中。
其结构可见下图:
这种场景适用于开发者能自行处理好上下游类型对齐的情况,可根据不同类型选择下游执行节点。
带有明确倾向性的设计选择
外部变量只读原则
Eino 的 Graph 中的数据在 Node、Branch、Handler 间流转时,一律是变量赋值,不是 Copy。当 Input 是引用类型,如 Struct 指针、map、slice 时,在 Node、Branch、Handler 内部对 Input 的修改,会对外部有副作用,可能导致并发问题。因此,Eino 遵循外部变量只读原则:Node、Branch、Handler 内部不对 Input 做修改,如需修改,先自行 Copy。
这个原则对 StreamReader 中的 Chunk 同样生效。
扇入与合并
扇入:多个上游的数据汇入到下游,一起作为下游的输入。需要明确定义多个上游的输出,如何**合并(Merge)**起来。
默认情况下,首先要求多个上游输出的实际类型必须相同且为 Map,且相互间 key 不可重复。其次:
-
在非流式场景下,合并后成为一个 Map,包含所有上游的所有键值对。
-
在流式场景下,将类型相同的多个上游 StreamReader 合并为一个 StreamReader。实际 Recv 时效果为从多个上游 StreamReader 中公平读取。
在 AddNode 时,可以通过添加 WithOutputKey 这个 Option 来把节点的输出转成 Map:
// 这个节点的输出,会从 string 改成 map[string]any,
// 且 map 中只有一个元素,key 是 your_output_key,value 是实际的的节点输出的 string
graph.AddLambdaNode("your_node_key", compose.InvokableLambda(func(ctx context.Context, input []*schema.Message) (str string, err error) {
// your logic
return
}), compose.WithOutputKey("your_output_key"))
也可以通过注册 Merge 方法来实现任意类型的 merge:
// eino/compose/values_merge.go
func RegisterValuesMergeFunc[T any](fn func([]T) (T, error))
Workflow 可以做到多个上游的多个输出字段映射到下游节点的不同字段。这并不属于合并场景,而是点对点的字段映射。事实上,eino workflow 目前不支持“多个上游字段同时映射到相同的下游字段”。
流式处理
Eino 认为,组件应当只需要实现业务场景中真实的流式范式,比如 ChatModel 不需要实现 Collect。因此,在编排场景中,Eino 自动帮助所有的节点补全缺失的流式范式。
以 Invoke 方式运行 Graph,内部各节点均以 Invoke 范式运行,以 Stream, Collect 或 Transform 方式运行 Graph,内部各节点均以 Transform 范式运行。
自动拼接(Concatenate):Stream chunk 拼接为完整内容的场景,优先使用用户注册的自定义拼接函数,其次执行框架提供的默认行为,包括 Message, Message 数组,String,Map 和 Struct 及 Struct 指针。
自动流化(Box):需要将非流式的 T 变成 StreamReader[T] 的场景,框架自动执行。
自动合并(Merge):见上文“扇入与合并”环节。
自动复制(Copy):在需要做流的复制的场景自动进行流的复制,包括一个流扇出到多个下游节点,一个流进入一个或多个 callback handler。
最后,Eino 要求所有编排元素能够感知和处理流。包括 branch,state handler,callback handler,passthrough,lambda 等。
关于 Eino 对流的处理能力,详见 Eino 流式编程要点。
全局状态
State:在 NewGraph 时通过 compose.WithGenLocalState 传入 State 的创建方法。这个请求维度的全局状态在一次请求的各环节可读写使用。
Eino 推荐用 StatePreHandler 和 StatePostHandler,功能定位是:
-
StatePreHandler:在每个节点执行前读写 State,以及按需替换节点的 Input。输入需对齐节点的非流式输入类型。
-
StatePostHandler:在每个节点执行后读写 State,以及按需替换节点的 Output。输入需对齐节点的非流式输出类型。
针对流式场景,使用对应的 StreamStatePreHandler 和 StreamStatePostHandler,输入需分别对齐节点的流式输入和流式输出类型。
这些 state handlers 位于节点外部,通过对 Input 或 Output 的修改影响节点,从而保证了节点的“状态无关”特性。
如果需要在节点内部读写 State,Eino 提供了 ProcessState[S any](ctx context.Context**, handler func(context.Context, **S) error) error 函数。
Eino 框架会在所有读写 State 的位置加锁。
回调注入
Eino 编排框架认为,进入编排的组件,可能内部埋入了 Callback 切面,也可以没有。这个信息由组件是否实现了 Checker 接口,以及接口中 IsCallbacksEnabled 方法的返回值来判断。
-
当
IsCallbacksEnabled返回 true 时,Eino 编排框架使用组件实现内部的 Callback 切面。 -
否则,自动在组件实现外部包上 Callback 切面,(只能)上报 input 和 output。
无论哪种,都会自动推断出 RunInfo。
同时,对 Graph 整体,也一定会注入 Callback 切面,RunInfo 为 Graph 自身。
关于 Eino 的 Callback 能力完整说明,见 Eino: Callback 用户手册。
Option 分配
Eino 支持各种维度的 Call Option 分配方式:
-
默认全局,即分配到所有节点,包括嵌套的内部图。
-
可添加某个组件类型的 Option,这时默认分配到该类型的所有节点,比如 AddChatModelOption。定义了独有 Option 类型的 Lambda,也可以这样把 Option 指定到自身。
-
可指定任意个具体的节点,使用
DesignateNode(key ...string). -
可指定任意深度的嵌套图,或者其中的任意个具体的节点,使用
DesignateNodeWithPath(path ...*NodePath).
关于 Eino 的 Call Option 能力完整说明,见 Eino: CallOption 能力与规范。
图嵌套
图编排产物 Runnable 与 Lambda 的接口形式非常相似。因此编译好的图可以简单的封装为 Lambda,并以 Lambda 节点的形式嵌套进其他图中。
另一种方式,在编译前,Graph,Chain,Workflow 等都可以直接通过 AddGraph 的方式嵌套进其他图中。两个方式的差异是:
-
Lambda 的方式,在 trace 上会多一级 Lambda 节点。其他 Callback handler 视角看也会多一层。
-
Lambda 的方式,需要通过 Lambda 的 Option 来承接 CallOption,无法通过 DesignateNodeWithPath。
-
Lambda 的方式,内部图需事先编译。直接 AddGraph,则内部图随上级图一起编译。
内部机制
执行时序
以一个添加了 StatePreHandler、StatePostHandler、InputKey、OutputKey,且内部没有实现 Callback 切面的 InvokableLambda(输入为 string,输出为 int)为例,在图中的流式执行完整时序如下:
在 workflow 的场景中,字段映射发生在两个位置:
-
在节点执行后的 StatePostHandler 以及“流复制”步骤后,每个下游需要的字段会分别抽取出来。
-
在节点执行前的“合并”步骤之后、StatePreHandler 之前,会将抽取出来的上游字段值转换为当前节点的输入。
运行引擎
NodeTriggerMode == AnyPredecessor 时,图以 pregel 引擎执行,对应的拓扑结构是有向有环图。特点是:
-
当前执行中的一个或多个节点,所有的后序节点,作为一个 SuperStep,整体一起执行。这时,这些新的节点,会成为“当前”节点。
-
支持 Branch,支持图中有环,但是可能需要人为添加 passthrough 节点,来确保 SuperStep 中的节点符合预期,如下图:
上图中 Node 4 和 Node 5 按规则被放在一起执行,大概率不符合预期。需要改成:
NodeTriggerMode == AllPredecessor 时,图以 dag 引擎执行,对应的拓扑结构是有向无环图。特点是:
-
每个节点有确定的前序节点,当所有前序节点都完成后,本节点才具备运行条件。
-
不支持图中有环,因为会打破“每个节点有确定的前序节点”这一假定。
-
支持 Branch。在运行时,将 Branch 未选中的节点记为已跳过,不影响 AllPredecessor 的语义。
💡 设置 NodeTriggerMode = AllPredecessor 后,节点会在所有前驱就绪后执行,但并不是立即执行,而是依然遵循 SuperStep——在一批节点全部执行完成后再运行新的可运行节点。
如果在 Compile 时传入 compose.WithEagerExecution(),则就绪的节点会立刻运行。
在 Eino v0.4.0 版本及之后的版本中,设置 NodeTriggerMode = AllPredecessor 后会默认开启 EagerExecution。
总结起来,pregel 模式灵活强大但有额外的心智负担,dag 模式清晰简单但场景受限。在 Eino 框架中,Chain 是 pregel 模式,Workflow 是 dag 模式,Graph 则都支持,可由用户从 pregel 和 dag 中选择。
实现
Graph
package main
import (
"context"
"fmt"
"io"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/components/prompt"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
)
const (
nodeOfModel = "model"
nodeOfPrompt = "prompt"
)
func main() {
ctx := context.Background()
g := compose.NewGraph[map[string]any, *schema.Message]()
pt := prompt.FromMessages(
schema.FString,
schema.UserMessage("what's the weather in {location}?"),
)
_ = g.AddChatTemplateNode(nodeOfPrompt, pt)
_ = g.AddChatModelNode(nodeOfModel, &mockChatModel{}, compose.WithNodeName("ChatModel"))
_ = g.AddEdge(compose.START, nodeOfPrompt)
_ = g.AddEdge(nodeOfPrompt, nodeOfModel)
_ = g.AddEdge(nodeOfModel, compose.END)
r, err := g.Compile(ctx)
if err != nil {
panic(err)
}
in := map[string]any{"location": "beijing"}
ret, err := r.Invoke(ctx, in)
fmt.Println("invoke result: ", ret)
// stream
s, err := r.Stream(ctx, in)
if err != nil {
panic(err)
}
defer s.Close()
for {
chunk, err := s.Recv()
if err != nil {
if err == io.EOF {
break
}
panic(err)
}
fmt.Println("stream chunk: ", chunk)
}
}
type mockChatModel struct{}
func (m *mockChatModel) Generate(ctx context.Context, input []*schema.Message, opts ...model.Option) (*schema.Message, error) {
return schema.AssistantMessage("the weather is good", nil), nil
}
func (m *mockChatModel) Stream(ctx context.Context, input []*schema.Message, opts ...model.Option) (*schema.StreamReader[*schema.Message], error) {
sr, sw := schema.Pipe[*schema.Message](0)
go func() {
defer sw.Close()
sw.Send(schema.AssistantMessage("the weather is", nil), nil)
sw.Send(schema.AssistantMessage("good", nil), nil)
}()
return sr, nil
}
func (m *mockChatModel) BindTools(tools []*schema.ToolInfo) error {
panic("implement me")
}
ToolCallAgent
go get github.com/cloudwego/eino-ext/components/model/openai@latest
go get github.com/cloudwego/eino@latest
package main
import (
"context"
"os"
"github.com/cloudwego/eino-ext/components/model/openai"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components/prompt"
"github.com/cloudwego/eino/components/tool"
"github.com/cloudwego/eino/components/tool/utils"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/cloudwego/eino-examples/internal/gptr"
"github.com/cloudwego/eino-examples/internal/logs"
)
func main() {
openAIBaseURL := os.Getenv("OPENAI_BASE_URL")
openAIAPIKey := os.Getenv("OPENAI_API_KEY")
modelName := os.Getenv("MODEL_NAME")
ctx := context.Background()
callbacks.AppendGlobalHandlers(&loggerCallbacks{})
// 1. create an instance of ChatTemplate as 1st Graph Node
systemTpl := `你是一名房产经纪人,结合用户的薪酬和工作,使用 user_info API,为其提供相关的房产信息。邮箱是必须的`
chatTpl := prompt.FromMessages(schema.FString,
schema.SystemMessage(systemTpl),
schema.MessagesPlaceholder("message_histories", true),
schema.UserMessage("{user_query}"),
)
modelConf := &openai.ChatModelConfig{
BaseURL: openAIBaseURL,
APIKey: openAIAPIKey,
ByAzure: true,
Model: modelName,
Temperature: gptr.Of(float32(0.7)),
APIVersion: "2024-06-01",
}
// 2. create an instance of ChatModel as 2nd Graph Node
chatModel, err := openai.NewChatModel(ctx, modelConf)
if err != nil {
logs.Errorf("NewChatModel failed, err=%v", err)
return
}
// 3. create an instance of tool.InvokableTool for Intent recognition and execution
userInfoTool := utils.NewTool(
&schema.ToolInfo{
Name: "user_info",
Desc: "根据用户的姓名和邮箱,查询用户的公司、职位、薪酬信息",
ParamsOneOf: schema.NewParamsOneOfByParams(map[string]*schema.ParameterInfo{
"name": {
Type: "string",
Desc: "用户的姓名",
},
"email": {
Type: "string",
Desc: "用户的邮箱",
},
}),
},
func(ctx context.Context, input *userInfoRequest) (output *userInfoResponse, err error) {
return &userInfoResponse{
Name: input.Name,
Email: input.Email,
Company: "Bytedance",
Position: "CEO",
Salary: "9999",
}, nil
})
info, err := userInfoTool.Info(ctx)
if err != nil {
logs.Errorf("Get ToolInfo failed, err=%v", err)
return
}
// 4. bind ToolInfo to ChatModel. ToolInfo will remain in effect until the next BindTools.
err = chatModel.BindForcedTools([]*schema.ToolInfo{info})
if err != nil {
logs.Errorf("BindForcedTools failed, err=%v", err)
return
}
// 5. create an instance of ToolsNode as 3rd Graph Node
toolsNode, err := compose.NewToolNode(ctx, &compose.ToolsNodeConfig{
Tools: []tool.BaseTool{userInfoTool},
})
if err != nil {
logs.Errorf("NewToolNode failed, err=%v", err)
return
}
const (
nodeKeyOfTemplate = "template"
nodeKeyOfChatModel = "chat_model"
nodeKeyOfTools = "tools"
)
// 6. create an instance of Graph
// input type is 1st Graph Node's input type, that is ChatTemplate's input type: map[string]any
// output type is last Graph Node's output type, that is ToolsNode's output type: []*schema.Message
g := compose.NewGraph[map[string]any, []*schema.Message]()
// 7. add ChatTemplate into graph
_ = g.AddChatTemplateNode(nodeKeyOfTemplate, chatTpl)
// 8. add ChatModel into graph
_ = g.AddChatModelNode(nodeKeyOfChatModel, chatModel)
// 9. add ToolsNode into graph
_ = g.AddToolsNode(nodeKeyOfTools, toolsNode)
// 10. add connection between nodes
_ = g.AddEdge(compose.START, nodeKeyOfTemplate)
_ = g.AddEdge(nodeKeyOfTemplate, nodeKeyOfChatModel)
_ = g.AddEdge(nodeKeyOfChatModel, nodeKeyOfTools)
_ = g.AddEdge(nodeKeyOfTools, compose.END)
// 9. compile Graph[I, O] to Runnable[I, O]
r, err := g.Compile(ctx)
if err != nil {
logs.Errorf("Compile failed, err=%v", err)
return
}
out, err := r.Invoke(ctx, map[string]any{
"message_histories": []*schema.Message{},
"user_query": "我叫 zhangsan, 邮箱是 zhangsan@bytedance.com, 帮我推荐一处房产",
})
if err != nil {
logs.Errorf("Invoke failed, err=%v", err)
return
}
logs.Infof("Generation: %v Messages", len(out))
for _, msg := range out {
logs.Infof(" %v", msg)
}
}
type userInfoRequest struct {
Name string `json:"name"`
Email string `json:"email"`
}
type userInfoResponse struct {
Name string `json:"name"`
Email string `json:"email"`
Company string `json:"company"`
Position string `json:"position"`
Salary string `json:"salary"`
}
type loggerCallbacks struct{}
func (l *loggerCallbacks) OnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
logs.Infof("name: %v, type: %v, component: %v, input: %v", info.Name, info.Type, info.Component, input)
return ctx
}
func (l *loggerCallbacks) OnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
logs.Infof("name: %v, type: %v, component: %v, output: %v", info.Name, info.Type, info.Component, output)
return ctx
}
func (l *loggerCallbacks) OnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
logs.Infof("name: %v, type: %v, component: %v, error: %v", info.Name, info.Type, info.Component, err)
return ctx
}
func (l *loggerCallbacks) OnStartWithStreamInput(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
return ctx
}
func (l *loggerCallbacks) OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
return ctx
}
Graph with state
Graph 可以有 graph 自身的“全局”状态,在创建 Graph 时传入 WithGenLocalState Option 开启此功能:
// compose/generic_graph.go
// type GenLocalState[S any] func(ctx context.Context) (state S)
func WithGenLocalState[S any](gls GenLocalState[S]) NewGraphOption {
// --snip--
}
Add node 时添加 Pre/Post Handler 来处理 State:
// compose/graph_add_node_options.go
// type StatePreHandler[I, S any] func(ctx context.Context, in I, state S) (I, error)
// type StatePostHandler[O, S any] func(ctx context.Context, out O, state S) (O, error)
func WithStatePreHandler[I, S any](pre StatePreHandler[I, S]) GraphAddNodeOpt {
// --snip--
}
func WithStatePostHandler[O, S any](post StatePostHandler[O, S]) GraphAddNodeOpt {
// --snip--
}
在 Node 内部,用 ProcessState,传入一个读写 State 的 函数:
// flow/agent/react/react.go
var msg *schema.Message
err = compose.ProcessState[*state](ctx, func(_ context.Context, state *state) error {
for i := range msgs {
if msgs[i] != nil && msgs[i].ToolCallID == state.ReturnDirectlyToolCallID {
msg = msgs[i]
return nil
}
}
return nil
})
完整使用例子:
package main
import (
"context"
"errors"
"io"
"runtime/debug"
"strings"
"unicode/utf8"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/cloudwego/eino/utils/safe"
"github.com/cloudwego/eino-examples/internal/logs"
)
func main() {
ctx := context.Background()
const (
nodeOfL1 = "invokable"
nodeOfL2 = "streamable"
nodeOfL3 = "transformable"
)
type testState struct {
ms []string
}
gen := func(ctx context.Context) *testState {
return &testState{}
}
sg := compose.NewGraph[string, string](compose.WithGenLocalState(gen))
l1 := compose.InvokableLambda(func(ctx context.Context, in string) (out string, err error) {
return "InvokableLambda: " + in, nil
})
l1StateToInput := func(ctx context.Context, in string, state *testState) (string, error) {
state.ms = append(state.ms, in)
return in, nil
}
l1StateToOutput := func(ctx context.Context, out string, state *testState) (string, error) {
state.ms = append(state.ms, out)
return out, nil
}
_ = sg.AddLambdaNode(nodeOfL1, l1,
compose.WithStatePreHandler(l1StateToInput), compose.WithStatePostHandler(l1StateToOutput))
l2 := compose.StreamableLambda(func(ctx context.Context, input string) (output *schema.StreamReader[string], err error) {
outStr := "StreamableLambda: " + input
sr, sw := schema.Pipe[string](utf8.RuneCountInString(outStr))
// nolint: byted_goroutine_recover
go func() {
for _, field := range strings.Fields(outStr) {
sw.Send(field+" ", nil)
}
sw.Close()
}()
return sr, nil
})
l2StateToOutput := func(ctx context.Context, out string, state *testState) (string, error) {
state.ms = append(state.ms, out)
return out, nil
}
_ = sg.AddLambdaNode(nodeOfL2, l2, compose.WithStatePostHandler(l2StateToOutput))
l3 := compose.TransformableLambda(func(ctx context.Context, input *schema.StreamReader[string]) (
output *schema.StreamReader[string], err error) {
prefix := "TransformableLambda: "
sr, sw := schema.Pipe[string](20)
go func() {
defer func() {
panicErr := recover()
if panicErr != nil {
err := safe.NewPanicErr(panicErr, debug.Stack())
logs.Errorf("panic occurs: %v\n", err)
}
}()
for _, field := range strings.Fields(prefix) {
sw.Send(field+" ", nil)
}
for {
chunk, err := input.Recv()
if err != nil {
if err == io.EOF {
break
}
// TODO: how to trace this kind of error in the goroutine of processing sw
sw.Send(chunk, err)
break
}
sw.Send(chunk, nil)
}
sw.Close()
}()
return sr, nil
})
l3StateToOutput := func(ctx context.Context, out string, state *testState) (string, error) {
state.ms = append(state.ms, out)
logs.Infof("state result: ")
for idx, m := range state.ms {
logs.Infof(" %vth: %v", idx, m)
}
return out, nil
}
_ = sg.AddLambdaNode(nodeOfL3, l3, compose.WithStatePostHandler(l3StateToOutput))
_ = sg.AddEdge(compose.START, nodeOfL1)
_ = sg.AddEdge(nodeOfL1, nodeOfL2)
_ = sg.AddEdge(nodeOfL2, nodeOfL3)
_ = sg.AddEdge(nodeOfL3, compose.END)
run, err := sg.Compile(ctx)
if err != nil {
logs.Errorf("sg.Compile failed, err=%v", err)
return
}
out, err := run.Invoke(ctx, "how are you")
if err != nil {
logs.Errorf("run.Invoke failed, err=%v", err)
return
}
logs.Infof("invoke result: %v", out)
stream, err := run.Stream(ctx, "how are you")
if err != nil {
logs.Errorf("run.Stream failed, err=%v", err)
return
}
for {
chunk, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
logs.Infof("stream.Recv() failed, err=%v", err)
break
}
logs.Tokenf("%v", chunk)
}
stream.Close()
sr, sw := schema.Pipe[string](1)
sw.Send("how are you", nil)
sw.Close()
stream, err = run.Transform(ctx, sr)
if err != nil {
logs.Infof("run.Transform failed, err=%v", err)
return
}
for {
chunk, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
logs.Infof("stream.Recv() failed, err=%v", err)
break
}
logs.Infof("%v", chunk)
}
stream.Close()
}
Chain
Chain 可以视为是 Graph 的简化封装
package main
import (
"context"
"fmt"
"log"
"math/rand"
"os"
"github.com/cloudwego/eino-ext/components/model/openai"
"github.com/cloudwego/eino/components/prompt"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/cloudwego/eino-examples/internal/gptr"
"github.com/cloudwego/eino-examples/internal/logs"
)
func main() {
openAPIBaseURL := os.Getenv("OPENAI_BASE_URL")
openAPIAK := os.Getenv("OPENAI_API_KEY")
modelName := os.Getenv("MODEL_NAME")
ctx := context.Background()
// build branch func
const randLimit = 2
branchCond := func(ctx context.Context, input map[string]any) (string, error) { // nolint: byted_all_nil_return
if rand.Intn(randLimit) == 1 {
return "b1", nil
}
return "b2", nil
}
b1 := compose.InvokableLambda(func(ctx context.Context, kvs map[string]any) (map[string]any, error) {
logs.Infof("hello in branch lambda 01")
if kvs == nil {
return nil, fmt.Errorf("nil map")
}
kvs["role"] = "cat"
return kvs, nil
})
b2 := compose.InvokableLambda(func(ctx context.Context, kvs map[string]any) (map[string]any, error) {
logs.Infof("hello in branch lambda 02")
if kvs == nil {
return nil, fmt.Errorf("nil map")
}
kvs["role"] = "dog"
return kvs, nil
})
// build parallel node
parallel := compose.NewParallel()
parallel.
AddLambda("role", compose.InvokableLambda(func(ctx context.Context, kvs map[string]any) (string, error) {
// may be change role to others by input kvs, for example (dentist/doctor...)
role, ok := kvs["role"].(string)
if !ok || role == "" {
role = "bird"
}
return role, nil
})).
AddLambda("input", compose.InvokableLambda(func(ctx context.Context, kvs map[string]any) (string, error) {
return "你的叫声是怎样的?", nil
}))
modelConf := &openai.ChatModelConfig{
BaseURL: openAPIBaseURL,
APIKey: openAPIAK,
ByAzure: true,
Model: modelName,
Temperature: gptr.Of(float32(0.7)),
APIVersion: "2024-06-01",
}
// create chat model node
cm, err := openai.NewChatModel(context.Background(), modelConf)
if err != nil {
log.Panic(err)
return
}
rolePlayerChain := compose.NewChain[map[string]any, *schema.Message]()
rolePlayerChain.
AppendChatTemplate(prompt.FromMessages(schema.FString, schema.SystemMessage(`You are a {role}.`), schema.UserMessage(`{input}`))).
AppendChatModel(cm)
// =========== build chain ===========
chain := compose.NewChain[map[string]any, string]()
chain.
AppendLambda(compose.InvokableLambda(func(ctx context.Context, kvs map[string]any) (map[string]any, error) {
// do some logic to prepare kv as input val for next node
// just pass through
logs.Infof("in view lambda: %v", kvs)
return kvs, nil
})).
AppendBranch(compose.NewChainBranch(branchCond).AddLambda("b1", b1).AddLambda("b2", b2)). // nolint: byted_use_receiver_without_nilcheck
AppendPassthrough().
AppendParallel(parallel).
AppendGraph(rolePlayerChain).
AppendLambda(compose.InvokableLambda(func(ctx context.Context, m *schema.Message) (string, error) {
// do some logic to check the output or something
logs.Infof("in view of messages: %v", m.Content)
return m.Content, nil
}))
// compile
r, err := chain.Compile(ctx)
if err != nil {
log.Panic(err)
return
}
output, err := r.Invoke(context.Background(), map[string]any{})
if err != nil {
log.Panic(err)
return
}
logs.Infof("output is : %v", output)
}
workflow
是一套编排的 API,与 Graph API 在架构上处于同一层:

Eino compose engine
Graph API
Workflow API
Chain API
本质特点是:
-
与 Graph API 具有同等级别的能力,都是编排“围绕大模型的信息流”的合适框架工具。
-
在节点类型、流处理、callback、option、state、interrupt & checkpoint 等方面保持一致。
-
实现 AnyGraph 接口,可以在 AddGraphNode 时作为子图加入上级 Graph/Chain/Workflow。
-
也可以把其他 Graph/Chain/Workflow 添加为自己的子图。
-
-
字段级别映射能力:节点的输入可以由任意前驱节点的任意输出字段组合而成。
-
原生支持 struct,map 以及任意嵌套层级的 struct 和 map 之间的相互映射。
-
-
控制流与数据流分离:Graph 的 Edge 是既决定执行顺序,又决定数据传递。Workflow 中可以一起传递,也可以分开传递。
-
不支持环(即类似 react agent 的 chatmodel->toolsNode->chatmodel 的环路)。NodeTriggerMode 固定为 AllPredecessor。
灵活的输入输出类型
例如需要编排两个 lambda 节点,里面是两个“现存的业务函数 f1, f2”,输入输出类型都是符合业务场景的特定结构体,各自不一样:
Workflow 编排时,将 f1 的输出字段 F1,直接映射到 f2 的输入字段 F3,同时保留 f1,f2 的原始函数签名。达到的效果是:每个节点是“业务场景决定输入输出”,不需要考虑“谁给我输入,以及谁用我的输出”。
Graph 编排时,因为“类型对齐”的要求,如果 f1 -> f2,则 f1 的输出类型和 f2 的输入类型需要对齐,需要二选一:
-
定义一个新的 common struct,把 f1 的输出类型和 f2 的输入类型都改成这个 common struct。有成本,可能入侵业务逻辑。
-
f1 的输出类型和 f2 的输入类型都改成 map。丢失了强类型对齐的特性。
控制流和数据流分离
看下面这个场景:
节点 D 同时引用了 A、B、C 的某些输出字段。其中 A-D 的这条虚线,是单纯的“数据流”,不传递“控制”信息,即 A 执行完成与否,不决定 D 是否开始执行。
节点 D 到 E 之间的粗箭头,代表节点 E 不引用节点 D 的任何输出,是单纯的“控制流”,不传递“数据”。即 D 执行完成与否,决定 E 是否开始执行,但是 D 的输出不影响 E 的输入。
图中其他的线,是控制流与数据流合一的。
需要注意的是,数据流能传递的前提,是一定有一条控制流存在,比如 A->D 的数据流,依赖 A->branch->B->D 或者 A->branch->C->D 的控制流存在。即数据流只能引用前驱节点的输出。
例如这个“跨节点”传递特定数据的场景:
上图中,chat template 节点的输入可以是非常明确的:
map[string]any{"prompt": "prompt from START", "context": "retrieved context"}
相对的,如果使用 Graph 或者 Chain API,需要二选一:
-
用 OutputKey 转换节点输出类型(START 节点没法加,所以得额外加 passthrough 节点),ChatTemplate 节点的输入会包含 START 和 retriever 的全量输出(而不是真正需要的某几个字段).
-
START 节点的 prompt 放到 state 里面,ChatTemplate 从 state 中读。额外引入了 state。
如何使用 Workflow
最简单的 workflow
START -> node -> END
// creates and invokes a simple workflow with only a Lambda node.
// Since all field mappings are ALL to ALL mappings
// (by using AddInput without field mappings),
// this simple workflow is equivalent to a Graph: START -> lambda -> END.
func main() {
// create a Workflow, just like creating a Graph
wf := compose.NewWorkflow[int, string]()
// add a lambda node to the Workflow, just like adding the lambda to a Graph
wf.AddLambdaNode("lambda", compose.InvokableLambda(
func(ctx context.Context, in int) (string, error) {
return strconv.Itoa(in), nil
})).
// add an input to this lambda node from START.
// this means mapping all output of START to the input of the lambda.
// the effect of AddInput is to set both a control dependency
// and a data dependency.
AddInput(compose.START)
// obtain the compose.END of the workflow for method chaining
wf.End().
// add an input to compose.END,
// which means 'using ALL output of lambda node as output of END'.
AddInput("lambda")
// compile the Workflow, just like compiling a Graph
run, err := wf.Compile(context.Background())
if err != nil {
logs.Errorf("workflow compile error: %v", err)
return
}
// invoke the Workflow, just like invoking a Graph
result, err := run.Invoke(context.Background(), 1)
if err != nil {
logs.Errorf("workflow run err: %v", err)
return
}
logs.Infof("%v", result)
}
核心的几个 API:
-
func NewWorkflow[I, O any](opts ...NewGraphOption) *Workflow[I, O]-
构建一个新的 Workflow。
-
与
NewGraph签名完全一致。
-
-
func (wf *Workflow[I, O]) AddChatModelNode(key string, chatModel model.BaseChatModel, opts ...GraphAddNodeOpt) *WorkflowNode-
向 Workflow 中添加一个新的节点。
-
可添加的节点类型与 Graph 完全一致。
-
与 Graph 的 AddXXXNode 的差异是,Workflow 不会立刻返回 error,而是在最终 Compile 的时候统一处理和返回 error。
-
AddXXXNode 拿到的是一个 WorkflowNode,后续向 Node 上添加字段映射等操作,直接用 Method Chaining 来做
-
-
func (n *WorkflowNode) AddInput(fromNodeKey string, inputs ...*FieldMapping) *WorkflowNode-
给一个 WorkflowNode 添加输入字段映射
-
返回 WorkflowNode,可继续 Method Chaining。
-
-
(wf *Workflow[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)-
Compile 一个 Workflow。
-
与 Compile Graph 的签名完全一致。
-
字段映射
START(输入 struct)-> [并行 lambda1, lambda2] -> END(输出 map)。
我们举一个“计算 string 中字符出现次数的”例子。workflow 整体输入一个 eino 的 Message 和一个 sub string,将 Message.Content 给一个计数器 c1,将 Message.ReasoningContent 给另一个计数器 c2,并行分别计算 sub string 的出现次数,再分别映射到 END:
上图中,workflow 整体的输入是 message 结构体,c1, c2 两个 lambda 的输入都是 counter 结构体,输出都是 int,workflow 整体输出是 map[string]any. 代码如下:
// demonstrates the field mapping ability of eino workflow.
func main() {
type counter struct {
FullStr string // exported because we will do field mapping for this field
SubStr string // exported because we will do field mapping for this field
}
// wordCounter is a lambda function that count occurrences of SubStr within FullStr
wordCounter := func(ctx context.Context, c counter) (int, error) {
return strings.Count(c.FullStr, c.SubStr), nil
}
type message struct {
*schema.Message // exported because we will do field mapping for this field
SubStr string // exported because we will do field mapping for this field
}
// create a workflow just like a Graph
wf := compose.NewWorkflow[message, map[string]any]()
// add lambda c1 just like in Graph
wf.AddLambdaNode("c1", compose.InvokableLambda(wordCounter)).
AddInput(compose.START, // add an input from START, specifying 2 field mappings
// map START's SubStr field to lambda c1's SubStr field
compose.MapFields("SubStr", "SubStr"),
// map START's Message's Content field to lambda c1's FullStr field
compose.MapFieldPaths([]string{"Message", "Content"}, []string{"FullStr"}))
// add lambda c2 just like in Graph
wf.AddLambdaNode("c2", compose.InvokableLambda(wordCounter)).
AddInput(compose.START, // add an input from START, specifying 2 field mappings
// map START's SubStr field to lambda c1's SubStr field
compose.MapFields("SubStr", "SubStr"),
// map START's Message's ReasoningContent field to lambda c1's FullStr field
compose.MapFieldPaths([]string{"Message", "ReasoningContent"}, []string{"FullStr"}))
wf.End(). // Obtain the compose.END for method chaining
// add an input from c1,
// mapping full output of c1 to the map key 'content_count'
AddInput("c1", compose.ToField("content_count")).
// also add an input from c2,
// mapping full output of c2 to the map key 'reasoning_content_count'
AddInput("c2", compose.ToField("reasoning_content_count"))
// compile the workflow just like compiling a Graph
run, err := wf.Compile(context.Background())
if err != nil {
logs.Errorf("workflow compile error: %v", err)
return
}
// invoke the workflow just like invoking a Graph
result, err := run.Invoke(context.Background(), message{
Message: &schema.Message{
Role: schema.Assistant,
Content: "Hello world!",
ReasoningContent: "I need to say something meaningful",
},
SubStr: "o", // would like to count the occurrences of 'o'
})
if err != nil {
logs.Errorf("workflow run err: %v", err)
return
}
logs.Infof("%v", result)
}
这个例子的主要信息是 AddInput 方法可以传递 0-n 个字段映射规则,同时可以多次调用 AddInput。这意味着:
-
节点可以从一个前驱节点的输出中引用任意多个字段。
-
节点可以从任意多个前驱节点中引用字段。
-
一个映射,可以是“整体到字段”,可以是“字段到整体”,也可以是“整体到整体”,也可以是嵌套字段间的映射。
-
上面不同的类型,有不同的 API 来表达这个映射:
-
顶层字段到顶层字段:
MapFields(string, string) -
全部输出到顶层字段:
ToField(string) -
顶层字段到全部输入:
FromField(string) -
嵌套字段到嵌套字段:
MapFieldPaths(FieldPath, FieldPath),只要上游或下游有一方是嵌套的,就需要用 -
全部输出到嵌套字段:
ToFieldPath(FieldPath) -
嵌套字段到全部输入:
FromFieldPath(FieldPath) -
全部输出到全部输入:直接使用
AddInput,不需要传FieldMapping
-
进阶功能
只有数据流,没有控制流
想象一个简单的场景:START -> 加法节点 -> 乘法节点 -> END。其中“乘法节点”是将 START 的一个字段和加法节点的结果相乘:
上图中,乘法节点在加法节点之后执行,即“乘法节点”被“加法节点”控制。但 START 节点不直接控制“乘法节点”,仅仅把数据传了过去。在代码中通过 AddInputWithOptions(fromNode, fieldMappings, WithNoDirectDependency) 来指定纯数据流:
func main() {
type calculator struct {
Add []int
Multiply int
}
adder := func(ctx context.Context, in []int) (out int, err error) {
for _, i := range in {
out += i
}
return out, nil
}
type mul struct {
A int
B int
}
multiplier := func(ctx context.Context, m mul) (int, error) {
return m.A * m.B, nil
}
wf := compose.NewWorkflow[calculator, int]()
wf.AddLambdaNode("adder", compose.InvokableLambda(adder)).
AddInput(compose.START, compose.FromField("Add"))
wf.AddLambdaNode("mul", compose.InvokableLambda(multiplier)).
AddInput("adder", compose.ToField("A")).
AddInputWithOptions(compose.START, []*compose.FieldMapping{compose.MapFields("Multiply", "B")},
// use WithNoDirectDependency to declare a 'data-only' dependency,
// in this case, START node's execution status will not determine whether 'mul' node can execute.
// START node only passes one field of its output to 'mul' node.
compose.WithNoDirectDependency())
wf.End().AddInput("mul")
runner, err := wf.Compile(context.Background())
if err != nil {
logs.Errorf("workflow compile error: %v", err)
return
}
result, err := runner.Invoke(context.Background(), calculator{
Add: []int{2, 5},
Multiply: 3,
})
if err != nil {
logs.Errorf("workflow run err: %v", err)
return
}
logs.Infof("%d", result)
}
这个例子中新引入的 API:
func (n *WorkflowNode) AddInputWithOptions(fromNodeKey string, inputs []*FieldMapping, opts ...WorkflowAddInputOpt) *WorkflowNode {
return n.addDependencyRelation(fromNodeKey, inputs, getAddInputOpts(opts))
}
以及新的 Option:
func WithNoDirectDependency() WorkflowAddInputOpt {
return func(opt *workflowAddInputOpts) {
opt.noDirectDependency = true
}
}
组合起来,可以给节点添加纯“数据依赖关系”。
只有控制流,没有数据流
想象一个“依次竞拍,但报价保密”的场景:START -> 竞拍者 1 -> 是否达标 -> 竞拍者 2 -> END:
在上图中,普通连线是“控制 + 数据”,虚线是“只有数据”,加粗线是“只有控制”。逻辑是:输入一个初始价格,竞拍者 1 给出报价 1,分支判断是否足够高,如果足够高则直接结束,否则把初始价格再给到竞拍者 2,给出报价 2,最后将报价 1、2 汇总输出。
当竞拍者 1 给出报价后,发布公告”竞拍者完成竞拍“。注意 bidder1->announcer 是粗实线,“只有控制”,因为发布公告的时候需要对金额保密!
分支出来的两条加粗线,都是“只有控制”,因为无论 bidder2 还是 END,都不依赖分支给出数据。在代码中通过 AddDependency(fromNode) 来指定纯控制流:
func main() {
bidder1 := func(ctx context.Context, in float64) (float64, error) {
return in + 1.0, nil
}
bidder2 := func(ctx context.Context, in float64) (float64, error) {
return in + 2.0, nil
}
announcer := func(ctx context.Context, in any) (any, error) {
logs.Infof("bidder1 had lodged his bid!")
return nil, nil
}
wf := compose.NewWorkflow[float64, map[string]float64]()
wf.AddLambdaNode("b1", compose.InvokableLambda(bidder1)).
AddInput(compose.START)
// just add a node to announce bidder1 had lodged his bid!
// It should be executed strictly after bidder1, so we use `AddDependency("b1")`.
// Note that `AddDependency()` will only form control relationship,
// but not data passing relationship.
wf.AddLambdaNode("announcer", compose.InvokableLambda(announcer)).
AddDependency("b1")
// add a branch just like adding branch in Graph.
wf.AddBranch("b1", compose.NewGraphBranch(func(ctx context.Context, in float64) (string, error) {
if in > 5.0 {
return compose.END, nil
}
return "b2", nil
}, map[string]bool{compose.END: true, "b2": true}))
wf.AddLambdaNode("b2", compose.InvokableLambda(bidder2)).
// b2 executes strictly after b1 (through branch dependency),
// but does not rely on b1's output,
// which means b2 depends on b1 conditionally,
// but no data passing between them.
AddInputWithOptions(compose.START, nil, compose.WithNoDirectDependency())
wf.End().AddInput("b1", compose.ToField("bidder1")).
AddInput("b2", compose.ToField("bidder2"))
runner, err := wf.Compile(context.Background())
if err != nil {
logs.Errorf("workflow compile error: %v", err)
return
}
result, err := runner.Invoke(context.Background(), 3.0)
if err != nil {
logs.Errorf("workflow run err: %v", err)
return
}
logs.Infof("%v", result)
}
这个例子中引入的新 API:
func (n *WorkflowNode) AddDependency(fromNodeKey string) *WorkflowNode {
return n.addDependencyRelation(fromNodeKey, nil, &workflowAddInputOpts{dependencyWithoutInput: _true_})
}
可以通过 AddDependency 来给节点指定纯“控制依赖关系”。
分支(Branch)
在上面的例子中,我们用与 Graph API 几乎完全相同的方式添加了一个 branch:
// add a branch just like adding branch in Graph.
wf.AddBranch("b1", compose.NewGraphBranch(func(ctx context.Context, in float64) (string, error) {
if in > 5.0 {
return compose.END, nil
}
return "b2", nil
}, map[string]bool{compose.END: true, "b2": true}))
branch 语义与 Graph 的 AllPredecessor 模式下的 branch 语义相同:
-
有且只有一个’fromNode’,即一个 branch 的前置控制节点只能有一个。
-
可单选(NewGraphBranch),可多选(NewGraphMultiBranch)。
-
Branch 选中的分支,可执行。未选中的分支,标记为 skip。
-
一个节点,只有在所有入边都完成(成功或 skip),且至少有一条边成功时,这个节点才可以执行。(如上面例子中的 END)
-
如果一个节点的所有入边都是 skip,则这个节点的所有出边自动标为 skip。
同时,workflow branch 与 graph branch 有一个核心差异:
-
Graph branch 始终是“控制和数据合一的”,branch 下游节点的输入,一定是 branch fromNode 的输出。
-
Workflow branch 始终是“只有控制的”,branch 下游节点的输入,自行通过 AddInputWithOptions 的方式指定。
涉及到的新 API:
func (wf *Workflow[I, O]) AddBranch(fromNodeKey string, branch *GraphBranch) *WorkflowBranch {
wb := &WorkflowBranch{
fromNodeKey: fromNodeKey,
GraphBranch: branch,
}
wf.workflowBranches = append(wf.workflowBranches, wb)
return wb
}
与 Graph.AddBranch 签名几乎完全相同,可以给 workflow 添加一个分支。
静态值(Static Values)
让我们修改下上面的“竞拍”例子,给竞拍者 1 和竞拍者 2 分别给一个“预算”的静态配置:
budget1 和 budget2 会分别以“静态值”的形式注入到 bidder1 和 bidder2 的 input 中。使用 SetStaticValue 方法给 workflow 节点配置静态值:
func main() {
type bidInput struct {
Price float64
Budget float64
}
bidder := func(ctx context.Context, in bidInput) (float64, error) {
if in.Price >= in.Budget {
return in.Budget, nil
}
return in.Price + rand.Float64()*in.Budget, nil
}
wf := compose.NewWorkflow[float64, map[string]float64]()
wf.AddLambdaNode("b1", compose.InvokableLambda(bidder)).
AddInput(compose.START, compose.ToField("Price")).
// set 'Budget' field to 3.0 for b1
SetStaticValue([]string{"Budget"}, 3.0)
// add a branch just like adding branch in Graph.
wf.AddBranch("b1", compose.NewGraphBranch(func(ctx context.Context, in float64) (string, error) {
if in > 5.0 {
return compose.END, nil
}
return "b2", nil
}, map[string]bool{compose.END: true, "b2": true}))
wf.AddLambdaNode("b2", compose.InvokableLambda(bidder)).
// b2 executes strictly after b1, but does not rely on b1's output,
// which means b2 depends on b1, but no data passing between them.
AddDependency("b1").
AddInputWithOptions(compose.START, []*compose.FieldMapping{compose.ToField("Price")}, compose.WithNoDirectDependency()).
// set 'Budget' field to 4.0 for b2
SetStaticValue([]string{"Budget"}, 4.0)
wf.End().AddInput("b1", compose.ToField("bidder1")).
AddInput("b2", compose.ToField("bidder2"))
runner, err := wf.Compile(context.Background())
if err != nil {
logs.Errorf("workflow compile error: %v", err)
return
}
result, err := runner.Invoke(context.Background(), 3.0)
if err != nil {
logs.Errorf("workflow run err: %v", err)
return
}
logs.Infof("%v", result)
}
这里涉及到的新 API:
func (n *WorkflowNode) SetStaticValue(path FieldPath, value any) *WorkflowNode {
n.staticValues[path.join()] = value
return n
}
通过这个方法给 Workflow 节点的指定字段上设置静态值。
流式效果
回到之前的“字符计数”例子,如果我们的 workflow 的输入不再是单个 message,而是一个 message 流,并且我们的计数函数可以对流中的每个 message chunk 分别计数并返回“计数流”:
我们对之前的例子做一些修改:
-
InvokableLambda 改成 TransformableLambda,从而可以消费流,并产生流。
-
把输入里面的 SubStr 改成静态值,注入到 c1 和 c2 中。
-
Workflow 的整体输入改成 *schema.Message。
-
以 Transform 方式来调用 workflow,并传入包含 2 个 *schema.Message 的流。
完成后的代码:
// demonstrates the stream field mapping ability of eino workflow.
// It's modified from 2_field_mapping.
func main() {
type counter struct {
FullStr string // exported because we will do field mapping for this field
SubStr string // exported because we will do field mapping for this field
}
// wordCounter is a transformable lambda function that
// count occurrences of SubStr within FullStr, for each trunk.
wordCounter := func(ctx context.Context, c *schema.StreamReader[counter]) (
*schema.StreamReader[int], error) {
var subStr, cachedStr string
return schema.StreamReaderWithConvert(c, func(co counter) (int, error) {
if len(co.SubStr) > 0 {
// static values will not always come in the first chunk,
// so before the static value (SubStr) comes in,
// we need to cache the full string
subStr = co.SubStr
fullStr := cachedStr + co.FullStr
cachedStr = ""
return strings.Count(fullStr, subStr), nil
}
if len(subStr) > 0 {
return strings.Count(co.FullStr, subStr), nil
}
cachedStr += co.FullStr
return 0, schema.ErrNoValue
}), nil
}
// create a workflow just like a Graph
wf := compose.NewWorkflow[*schema.Message, map[string]int]()
// add lambda c1 just like in Graph
wf.AddLambdaNode("c1", compose.TransformableLambda(wordCounter)).
AddInput(compose.START, // add an input from START, specifying 2 field mappings
// map START's Message's Content field to lambda c1's FullStr field
compose.MapFields("Content", "FullStr")).
// we can set static values even if the input will be stream
SetStaticValue([]string{"SubStr"}, "o")
// add lambda c2 just like in Graph
wf.AddLambdaNode("c2", compose.TransformableLambda(wordCounter)).
AddInput(compose.START, // add an input from START, specifying 2 field mappings
// map START's Message's ReasoningContent field to lambda c1's FullStr field
compose.MapFields("ReasoningContent", "FullStr")).
SetStaticValue([]string{"SubStr"}, "o")
wf.End(). // Obtain the compose.END for method chaining
// add an input from c1,
// mapping full output of c1 to the map key 'content_count'
AddInput("c1", compose.ToField("content_count")).
// also add an input from c2,
// mapping full output of c2 to the map key 'reasoning_content_count'
AddInput("c2", compose.ToField("reasoning_content_count"))
// compile the workflow just like compiling a Graph
run, err := wf.Compile(context.Background())
if err != nil {
logs.Errorf("workflow compile error: %v", err)
return
}
// call the workflow using Transform just like calling a Graph with Transform
result, err := run.Transform(context.Background(),
schema.StreamReaderFromArray([]*schema.Message{
{
Role: schema.Assistant,
ReasoningContent: "I need to say something meaningful",
},
{
Role: schema.Assistant,
Content: "Hello world!",
},
}))
if err != nil {
logs.Errorf("workflow run err: %v", err)
return
}
var contentCount, reasoningCount int
for {
chunk, err := result.Recv()
if err != nil {
if err == io.EOF {
result.Close()
break
}
logs.Errorf("workflow receive err: %v", err)
return
}
logs.Infof("%v", chunk)
contentCount += chunk["content_count"]
reasoningCount += chunk["reasoning_content_count"]
}
logs.Infof("content count: %d", contentCount)
logs.Infof("reasoning count: %d", reasoningCount)
}
基于上面这个例子,我们总结出 workflow 流式的一些特点:
-
依然是 100% 的 Eino stream:四种范式(invoke, stream, collect, transform),由 Eino 框架自动转换、复制、拼接、合并。
-
字段映射的配置,不需要特殊处理流:无论实际的输入输出是不是流,AddInput 的写法都一样,Eino 框架负责处理基于流的映射。
-
静态值,不需要特殊处理流:即使实际输入是个流,也可以一样的方式 SetStaticValue。Eino 框架会把静态值放在 input stream 中,但不一定是第一个读到的 chunk。
字段映射各场景
类型对齐
Workflow 遵循与 Graph 同一套类型对齐规则,只是对齐的粒度由完整的输入输出对齐,变为了映射成对的字段间的类型对齐。具体为:
-
类型完全相同,在 Compile 时会校验通过,一定能对齐。
-
类型不同,但上游可以 Assign 到下游(比如上游具体类型,下游 Any),在 Compile 时会校验通过,一定能对齐。
-
上游无法 Assign 到下游(比如上游 int,下游 string),在 Compile 时会报错。
-
上游可能能 Assign 到下游(比如上游 Any,下游 int),在 Compile 时无法确定,会推迟到执行时,取出上游的实际类型,再判断。此时如果判断上游不能 Assign 到下游,则会抛出 error。
Merge 的各场景
Merge 是指一个节点的输入映射自多个 FieldMapping 的情况。
-
映射到多个不同的字段:支持
-
映射到一个相同的字段:不支持
-
映射到整体,同时也有映射到字段:冲突,不支持
嵌套的 map[string]any
比如这个映射:ToFieldPath([]string{"a","b"}),目标节点的输入类型是 map[string]any,映射时的顺序是:
-
第一级“a”,此时的结果是
map[string]any{"a": nil} -
第二级“b”,此时的结果是
map[string]any{"a": map[string]any{"b": x}}
可以看到,在第二级的时候,Eino 框架自动把 any 替换为了实际的 map[string]any
CustomExtractor
有些场景,标准的字段映射语义无法支持,比如上游是 []int,想取出第一个元素映射到下游,此时我们用 WithCustomExtractor :
t.Run("custom extract from array element", func(t *testing.T) {
wf := NewWorkflow[[]int, map[string]int]()
wf.End().AddInput(_START_, ToField("a", WithCustomExtractor(func(input any) (any, error) {
return input.([]int)[0], nil
})))
r, err := wf.Compile(context.Background())
assert.NoError(t, err)
result, err := r.Invoke(context.Background(), []int{1, 2})
assert.NoError(t, err)
assert.Equal(t, map[string]int{"a": 1}, result)
})
当使用 WithCustomExtractor 时,一切 Compile 时的类型对齐校验都无法进行,只能推迟到执行时校验。
一些约束
-
Map Key 的限制:只支持 string,或者 string alias(能 convert 到 string 的类型)。
-
不支持的 CompileOption:
-
WithNodeTriggerMode,因为固定为AllPredecessor。 -
WithMaxRunSteps,因为不会有环。
-
-
如果映射来源是 Map Key,要求 Map 中必须有这个 key。但如果映射来源是 Stream,Eino 无法判断 stream 中的所有帧中是否至少有一次出现这个 key,因此 Stream 时无法校验。
-
如果映射来源字段或者目标字段属于 struct ,则要求这些字段必须是导出的,因为内部使用了反射。
-
映射来源是 nil:一般情况下支持,只有当映射目标不可能是 nil 时报错,比如目标是基础类型(int 等)。
实际应用
Coze-Studio 工作流
Coze-Studio 开源版的工作流引擎是基于 Eino Workflow 编排框架。参见:11. 新增工作流节点类型(后端)
流式编程编排
编排流式概述
编排流式的 Graph 时,需要考虑的几个关键要素:
-
组件/Lambda 中包含哪几种 Lambda 算子: 从 Invoke、Stream、Collect、Transform 中任选
-
编排拓扑图中,上下游节点的输入、输出是否同为流或同为非流。
-
如果上下游节点的流类型无法匹配。 需要借助 流化、合包 两个操作
-
流化(Streaming):将 T 流化成单 Chunk 的 Stream[T]
-
合包(Concat):将 Stream[T] 合并成一个完整的 T。Stream[T] 中的每一“帧”是这个完整 T 的一部分。
-
Eino 流式编程的内涵
有的组件,天然支持分“帧”来输出,每次输出一个完整出参的一部分,即“流式”输出。流式输出完成后,需要下游把这些“帧”拼接(concat)成完整的出参。典型的例子,是 LLM。
有的组件,天然支持分“帧”来输入,接收到不完整的入参时,就能开始有意义的业务处理,甚至完成业务处理的过程。比如 react agent 中用来判断是调 tool 还是结束运行的 branch 里面,拿到 LLM 的流式输出,从第一个帧里面就可以通过判断 message 是否包含 tool call 来做出决策。
因此,一个组件,从入参角度看,有“非流式”入参和“流式”入参两种,从出参角度看,有“非流式”出参和“流式”出参两种。
组合起来,有四种可能的流式编程范式
|
函数名 |
模式说明 |
交互模式名称 |
Lambda 构造方法 |
说明 |
|
Invoke |
输入非流式、输出非流式 |
Ping-Pong 模式 |
compose.InvokableLambda() |
|
|
Stream |
输入非流式、输出流式 |
Server-Streaming 模式 |
compose.StreamableLambda() |
|
|
Collect |
输入流式、输出非流式 |
Client-Streaming |
compose.CollectableLambda() |
|
|
Transform |
输入流式、输出流式 |
Bidirectional-Streaming |
compose.TransformableLambda() |
单个组件角度的流式
Eino 是个 “component first” 的框架,组件可以独立使用。定组件接口的时候,需要考虑流式编程的问题吗?简单的答案是不需要。复杂的答案是“以业务真实场景为准”。
组件自身的业务范式
一个典型的组件,比如 Chat Model,Retriever 等,根据实际的业务语义定接口就行,如果实际上支持某种流式的范式,就实现那一种流式范式,如果实际上某种流式范式没有真正的业务场景,那就不需要实现。比如
-
Chat Model,除了 Invoke 这种非流式的范式外,还天然支持 Stream 这种流式范式,因此 Chat Model 的接口中,实现了 Generate 和 Stream 两个接口。但是 Collect 和 Transform 没有对应的真实业务场景,所以就没有实现相应的接口:
type ChatModel interface {
Generate(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.Message, error)
Stream(ctx context.Context, input []*schema.Message, opts ...Option) (
*schema.StreamReader[*schema.Message], error)
// other methods omitted...
}
-
Retriever,除了 Invoke 这种非流式的范式外,另外三种流式范式都没有真实的业务场景,因此只实现了 Retrieve 一个接口:
type Retriever interface {
Retrieve(ctx context.Context, query string, opts ...Option) ([]*schema.Document, error)
}
组件具体支持的范式
|
组件名称 |
是否实现 Invoke |
是否实现 Stream |
是否实现 Collect |
是否实现 Transform |
|
Chat model |
yes |
yes |
no |
no |
|
Chat template |
yes |
no |
no |
no |
|
Retriever |
yes |
no |
no |
no |
|
Indexer |
yes |
no |
no |
no |
|
Embedder |
yes |
no |
no |
no |
|
Document Loader |
yes |
no |
no |
no |
|
Document Transformer |
yes |
no |
no |
no |
|
Tool |
yes |
yes |
no |
no |
Eino 官方组件中,除了 Chat Model 和 Tool 额外支持 stream 外,其他所有组件都只支持 invoke。组件具体介绍参见:[更新中]Eino: Components 抽象&实现
Collect 和 Transform 两种流式范式,目前只在编排场景有用到。
多个组件编排角度的流式
组件在编排中的流式范式
一个组件,单独使用时,入参和出参的流式范式是框定的,不可能超出组件定义的接口范围。
-
比如 Chat Model,入参只可能是非流式的 []Message,出参则可能是非流式的 Message 或者流式的 StreamReader[Message],因为 Chat Model 只实现了 Invoke 和 Stream 两个范式。
但是,一个组件,一旦处在多个组件组合使用的“编排”场景中,它的入参和出参就没那么固定了,而是取决于这个组件在编排场景中的“上游输出”和“下游输入”。比如 React Agent 的典型编排示意图:
上图中,如果 Tool 是个 StreamableTool,也就是输出是 StreamReader[Message],则 Tool -> ChatModel 就可能是流式的输出。但是 Chat Model 并没有接收流式输入的业务场景,也没有对应的接口。这时 Eino 框架会自动帮助 ChatModel 补足接收流式输入的能力:
上面的 Concat message stream 是 Eino 框架自动提供的能力,即使不是 message,是任意的 T,只要满足特定的条件,Eino 框架都会自动去做这个 StreamReader[T] 到 T 的转化,这个条件是:在编排中,当一个组件的上游输出是 StreamReader[T],但是组件只提供了 T 作为输入的业务接口时,框架会自动将 StreamReader[T] concat 成 T,再输入给这个组件。
💡 框架自动将 StreamReader[T] concat 成 T 的过程,可能需要用户提供一个 Concat function。详见 Eino: 编排的设计理念 中关于“合并帧”的章节。
另一方面,考虑一个相反的例子。还是 React Agent,这次是一个更完整的编排示意图:
在上图中,branch 接收 chat model 输出的 message,并根据 message 中是否包含 tool call,来选择直接结束 agent 本次运行并将 message 输出,还是调用 Tool 并将调用结果再次给 Chat Model 循环处理。由于这个 Branch 可以通过 message stream 的首个帧就完成逻辑判断,因此我们给这个 Branch 定义的是 Collect 接口,即流式输入,非流式输出:
compose.NewStreamGraphBranch(func(ctx context.Context, sr *schema.StreamReader[*schema.Message]) (endNode string, err error) {
msg, err := sr.Recv()
if err != nil {
return "", err
}
defer sr.Close()
if len(msg.ToolCalls) == 0 {
return compose._END_, nil
}
return nodeKeyTools, nil
}
ReactAgent 有两个接口,Generate 和 Stream,分别实现了 Invoke 和 Stream 的流式编程范式。当一个 ReactAgent 以 Stream 的方式被调用时,Chat Model 的输出是 StreamReader[Message],因此 Branch 的输入是 StreamReader[Message],符合这个 Branch condition 的函数签名定义,不需要做任何的转换就可以运行。
但是,当这个 ReactAgent 以 Generate 的方式被调用时,Chat Model 的输出是 Message,因此 Branch 的输入也会是 Message,不符合 Branch Condition 的 StreamReader[Message] 的函数签名定义。这时,Eino 框架会自动将 Message 装箱成 StreamReader[Message],再传给 Branch,而这个 StreamReader 里面只会有一个帧。
💡 这种只有一个帧的流,俗称“假流”,因为它并没有带来流式的实际好处即“首包延迟低”,而是仅仅为了满足流式出入参接口签名的要求而做的简单装箱。
总结起来,就是:在编排中,当一个组件的上游输出是 T,但是组件只提供了 StreamReader[T] 作为输入的业务接口时,框架会自动将 T 装箱成 StreamReader[T] 的单帧流,再输入给这个组件。
编排辅助元素的流式范式
上面提到的 Branch,并不是一个可单独使用的组件,而是只在编排场景中才有意义的“编排辅助元素”,类似的仅编排场景有意义的“组件”,还有一些,详见下图:
|
组件名称 |
使用场景 |
是否实现 Invoke |
是否实现 Stream |
是否实现 Collect |
是否实现 Transform |
|
Branch |
根据上游输出,在一组下游 Node 中动态选择一个
|
yes |
no |
yes |
no |
|
StatePreHandler |
Graph中,进入 Node 前修改 State 或/与 Input。可支持流式。 |
yes |
no |
no |
yes |
|
StatePostHandler |
Graph中,Node 完成后修改 State 或/与 Output。可支持流式 |
yes |
no |
no |
yes |
|
Passthrough |
在并行情况下,为了打平每个并行分支的 Node 个数,可以给 Node 个数少的分支加 Passthrough 节点。Passthrough 节点的输入输出相同,跟随上游节点的输出或跟随下游节点的输入(预期应当相同)。 |
yes |
no |
no |
yes |
|
Lambda |
封装官方组件未定义的业务逻辑。业务逻辑是哪种范式,就选择对应的那种流式范式来实现。 |
yes |
yes |
yes |
yes |
另外还有一种只有编排场景才有意义的“组件”,就是把编排产物作为一个整体来看待,比如编排后的 Chain,Graph。这些整体的编排产物,既可以作为“组件”来单独调用,也可以作为节点加入到更上级的编排产物中。
编排整体角度的流式
编排产物的“业务”范式
既然整体的编排产物,可以被看做一个“组件”,那从组件的视角可以提出问题:编排产物这个“组件”,有没有像 Chat Model 等组件那样的,符合“业务场景”的接口范式?答案是既“有”也“没有”。
-
“没有”:整体而言,Graph,Chain 等编排产物,自身是没有业务属性的,只为抽象的编排服务的,因此也就没有符合业务场景的接口范式。同时,编排需要支持各种范式的业务场景。所以,Eino 中代表编排产物的 Runnable[I, O] 接口,不做选择也无法选择,提供了所有流式范式的方法:
type Runnable[I, O any] interface {
Invoke(ctx context.Context, input I, opts ...Option) (output O, err error)
Stream(ctx context.Context, input I, opts ...Option) (output *schema.StreamReader[O], err error)
Collect(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output O, err error)
Transform(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output *schema.StreamReader[O], err error)
}
-
“有”:具体而言,某一个具体的 Graph、Chain,一定是承载了具体的业务逻辑的,因此也就一定有适合那个特定业务场景的流式范式。比如类似 React Agent 的 Graph,匹配的业务场景是 Invoke 和 Stream,因此这个 Graph 在调用时,符合逻辑的调用方式是 Invoke 和 Stream。虽然编排产物本身接口 Runnable[I, O] 中有 Collect 和 Transform 的方法,但是正常的业务场景不需要使用。
编排产物内部各组件在运行时的范式
从另一个角度看,既然编排产物整体可以被看做“组件”,那“组件”必然有自己的内部实现,比如 ChatModel 的内部实现逻辑,可能是把入参的 []Message 转化成各个模型的 API request,之后调用模型的 API,获取 response 后再转化成出参的 Message。那么类比的话,Graph 这个“组件”的内部实现是什么?是数据在 Graph 内部各个组件间以用户指定的流转方向和流式范式来流转。其中,“流转方向”不在当前讨论范围内,而各组件运行时的流式范式,则由 Graph 整体的触发方式决定,具体来说:
如果用户通过 Invoke 来调用 Graph,则 Graph 内部所有组件都以 Invoke 范式来调用。如果某个组件,没有实现 Invoke 范式,则 Eino 框架自动根据组件实现了的流式范式,封装出 Invoke 调用范式,优先顺位如下:
-
若组件实现了 Stream,则将 Stream 封装成 Invoke,即自动 concat 输出流。
-
否则,若组件实现了 Collect,则将 Collect 封装成 Invoke,即非流式入参转单帧流。
-
如果都没实现,则必须实现 Transform,将 Transform 封装成 Invoke,即入参转单帧流,出参 concat。
如果用户通过 Stream/Collect/Transform 来调用 Graph,则 Graph 内部所有组件都以 Transform 范式来调用。如果某个组件,没有实现 Transform 范式,则 Eino 框架自动根据组件实现了的流式范式,封装出 Transform 调用范式,优先顺位如下:
-
若组件实现了 Stream,则将 Stream 封装成 Transform,即自动 concat 输入流。
-
否则,若组件实现了 Collect,则将 Collect 封装成 Transform,即非流式出参转单帧流。
-
如果都没实现,则必须实现 Invoke,将 Invoke 封装成 Transform,即入参流 concat,出参转单帧流
结合上面穷举的各种案例,Eino 框架对 T 和 Stream[T] 的自动转换,可以总结为:
-
T -> Stream[T]: 将完整的 T 装箱为单帧的 Stream[T]。非流式变假流式。
-
Stream[T] -> T: 将 Stream[T] Concat 为完整的 T。当 Stream[T] 不是单帧流时,可能需要提供针对 T 的 Concat 方法。
看了上面的实现原理,可能会有疑问,为什么对 graph 的 Invoke,会要求所有内部组件都以 Invoke 调用?以及为什么对 graph 的 Stream/Collect/Transform,会要求所有内部组件都以 Transform 调用?毕竟,可以举出一些反例:
-
A, B 两个组件编排为一个 Chain,以 Invoke 调用。其中 A 的业务接口实现了 Stream,B 的业务接口实现了 Collect。这时 graph 内部组件的调用范式有两个选择:
-
A 以 stream 调用,B 以 collect 调用,整体的 Chain 依然是 Invoke 语义,同时保留了真流式的内部语义。即 A 的输出流不需要做 Concat,可以实时的输入到 B 中。
-
目前 Eino 的实现,A、B 都以 Invoke 调用,需要把 A 的输出流 Concat,并把 B 的输入做成假流式。失去了真流式的内部语义。
-
-
A,B 两个组件编排为一个 Chain,以 Collect 调用。其中 A 实现了 Transform 和 Collect,B 实现了 Invoke。两个选择:
-
A 以 Collect 调用,B 以 Invoke 调用:整体还是 Collect 的语义,不需要框架做任何的自动转化和装箱操作。
-
目前 Eino 的实现,A、B 都以 Transform 调用,由于 A 的业务接口里实现了 Transform,因此 A 的输出和 B 的输入都可能是真流式,而 B 的业务接口里只实现了 Invoke,根据上面的分析,B 的入参会需要由真流式 concat 成非流式。这时就需要用户额外提供 B 的入参的 concat 函数,这本可以避免。
-
上面两个例子,都可以找到一个明确的、与 Eino 的约定不同的,但却更优的流式调用路径。但是,当泛化到任意的编排场景时,很难找到一个明确定义的、与 Eino 的约定不同的、却总是更优的普适的规则。比如,A->B->C,以 Collect 语义调用,是 A->B 的时候 Collect,还是 B->C 的时候 Collect?潜在的因素有 A、B、C 具体实现的业务接口,可能还有“尽量多的使用真流式”的判断,也许还有哪个参数实现了 Concat,哪个没有实现。如果是更复杂的 Graph,需要考虑的因素会快速增加。在这种情况下,即使框架能定义出一套明确的、更优的普适规则,也很难解释清楚,理解和使用成本会很高,很可能已经超过了这个新规则实际带来的好处。
综上,我们可以说,Eino 编排产物内部各组件在运行时的范式,是 By Design 的,明确如下:
-
整体以 Invoke 调用,内部各组件均以 Invoke 调用,不存在任何流式的过程。
-
整体以 Stream/Collect/Transform 调用,内部各组件均以 Transform 调用,当出现 Stream[T] -> T 的 concat 过程时,可能需要额外提供 T 的 concat function。
callback&& calloption
callback治理
解决的问题
Component(包括 Lambda)、Graph 编排共同解决“把业务逻辑定义出来”的问题。而 logging, tracing, metrics, 上屏展示等横切面性质的功能,需要有机制把功能注入到 Component(包括 Lambda)、Graph 中。
另一方面,用户可能想拿到某个具体 Component 实现的执行过程中的中间信息,比如 VikingDBRetriever 额外给出查询的 DB Name,ArkChatModel 额外给出请求的 temperature 等参数。需要有机制把中间状态透出。
Callbacks 支持“横切面功能注入”和“中间状态透出”,具体是:用户提供、注册“function”(Callback Handler),Component 和 Graph 在固定的“时机”(或者说切面、位点)回调这些 function,给出对应的信息。
核心概念
核心概念串起来,就是:Eino 中的 Component 和 Graph 等实体,在固定的时机 (Callback Timing),回调用户提供的 function (Callback Handler),并把自己是谁 (RunInfo),以及当时发生了什么 (Callback Input & Output) 传出去。
触发实体
Component(包括官方定义的组件类型和 Lambda),Graph Node(以及 Chain/Workflow Node),Graph 自身(以及 Chain/Workflow)。这三类实体,都有横切面功能注入、中间状态透出的需求,因此都会触发 callback。具体见下面的“触发方式”一节。
触发时机
// CallbackTiming enumerates all the timing of callback aspects.
type CallbackTiming = callbacks.CallbackTiming
const (
TimingOnStart CallbackTiming = iota // 进入并开始执行
TimingOnEnd // 成功完成即将 return
TimingOnError // 失败并即将 return err
TimingOnStartWithStreamInput // OnStart,但是输入是 StreamReader
TimingOnEndWithStreamOutput // OnEnd,但是输出是 StreamReader
)
不同的触发实体,在不同场景下,是触发 OnStart 还是 OnStartWithStreamInput (OnEnd/OnEndWithStreamOutput 同理),具体的规则,详见下面的“触发方式”一节。
Callback Handler
type Handler interface {
OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
OnError(ctx context.Context, info *RunInfo, err error) context.Context
OnStartWithStreamInput(ctx context.Context, info *RunInfo,
input *schema.StreamReader[CallbackInput]) context.Context
OnEndWithStreamOutput(ctx context.Context, info *RunInfo,
output *schema.StreamReader[CallbackOutput]) context.Context
}
一个 Handler 是一个实现了上面 5 个方法(对应 5 个触发时机)的结构体。每个方法都会接收三个信息:
-
Context: 用于接收同一个 Handler 的前序触发时机可能设置的定制信息。
-
RunInfo: 触发回调的实体元信息。
-
Input/Output/InputStream/OutputStream: 触发回调时的业务信息。
并都会返回新的 Context:用于同一个 Handler 的不同触发时机之间传递信息。
如果一个 Handler,不想关注所有的 5 个触发时机,只想关注一部分,比如只关注 OnStart,建议使用 NewHandlerBuilder().OnStartFn(...).Build()。如果不想关注所有的组件类型,只想关注特定组件,比如 ChatModel,建议使用 NewHandlerHelper().ChatModel(...).Handler(),可以只接收 ChatModel 的回调并拿到一个具体类型的 CallbackInput/CallbackOutput。具体见“Handler 实现方式”一节。
不同 Handler 之间,触发顺序没有保证。
RunInfo
描述了触发 Callback 的实体自身的元信息。
// RunInfo contains information about the running component that triggers callbacks.
type RunInfo struct {
Name string // the 'Name' with semantic meaning for the running component, specified by end-user
Type string // the specific implementation 'Type' of the component, e.g. 'OpenAI'
Component components.Component // the component abstract type, e.g. 'ChatModel'
}
-
Name:有业务含义的名称,需用户指定,不指定就是空字符串。对不同的触发实体:
-
Component:在 Graph 中时,用 Node Name。在 Graph 外单独的使用时,用户手动设置。详见“注入 RunInfo” 和 “单独使用 Component”
-
Graph Node:用 Node Name
func WithNodeName(n string) GraphAddNodeOpt -
Graph 自身:
-
顶层图用 Graph Name
func WithGraphName(graphName string) GraphCompileOption -
内部嵌套图,会用加入到上级图时添加的 Node Name
-
-
-
Type:组件具体实现来规定:
-
有接口的 Component:如果实现了 Typer 接口,用 GetType() 方法的结果。否则用反射获取 Struct/Func 名。
-
Lambda:如果用
func WithLambdaType(t string) LambdaOpt指定了 Type,用这个,否则是空字符串。 -
Graph Node:用内部 Component/Lambda/Graph 的值。
-
Graph 自身:空字符串。
-
-
Component:
-
有接口的 Component:是啥接口,就是啥
-
Lambda:固定值 Lambda
-
Graph Node: 用内部的 Component/Lambda/Graph 的值。
-
Graph 自身:固定值 Graph / Chain / Workflow. (之前曾有 StateGraph / StateChain ,现已整合到 Graph / Chain 中)
-
Callback Input & Output
本质是任意类型,因为不同的 Component 的输入输出、内部状态完全不同。
type CallbackInput any
type CallbackOutput any
具体到某个组件,有更具体的类型,比如 Chat Model
// CallbackInput is the input for the model callback.
type CallbackInput struct {
// Messages is the messages to be sent to the model.
Messages []*schema.Message
// Tools is the tools to be used in the model.
Tools []*schema.ToolInfo
// Config is the config for the model.
Config *Config
// Extra is the extra information for the callback.
Extra map[string]any
}
// CallbackOutput is the output for the model callback.
type CallbackOutput struct {
// Message is the message generated by the model.
Message *schema.Message
// Config is the config for the model.
Config *Config
// TokenUsage is the token usage of this request.
TokenUsage *TokenUsage
// Extra is the extra information for the callback.
Extra map[string]any
}
在 Chat Model 的具体实现,比如 OpenAI Chat Model 中,建议组件作者向 Callback Handler 中传入具体的 Input/Output 类型,而不是 Any。这样可以透出更具体的、定制化的中间状态信息。
如果是 Graph Node 来触发 Callback,因为 Node 拿不到组件内部中间状态信息,只能拿到组件接口中规定的输入和输出,所以给 Callback Handler 的只能是这些。对 Chat Model,就是 []*schema.Message 和 *schema.Message。
Graph 自身触发 Callback 时,输入输出就是 Graph 整体的输入和输出。
注入 Handler
Handler 需要注入到 Context 中才能被触发。
全局注入 Handler
通过 callbacks.AppendGlobalHandlers 注入全局的 Handler。注入后,所有的触发回调行为,都会自动触发这些全局的 Handler。典型的场景是 tracing,logging 等全局一致、业务场景无关的功能。
不是并发安全的。建议在服务初始化时注入一次。
向 Graph 中注入 Handler
通过 compose.WithCallbacks 在 graph 运行时注入 Handler,这些 Handler 会在 graph 的本次运行整体上生效,包括 Graph 内各 Node 和 Graph 自身(以及各内嵌的 graph)。
通过 compose.WithCallbacks(...).DesignateNode(...) 向顶层 Graph 的某个 Node 注入 Handler。当这个 Node 自身是个内嵌的 Graph 时,会注入到这个内嵌 Graph 自身和其内部的各 Node。
通过 compose.WithCallbacks(...).DesignateNodeWithPath(...) 向内部嵌套的 Graph 的某个 Node 注入 Handler。
在 Graph 外注入 Handler
不想使用 Graph,但却想使用 Callback,则:
通过 InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) 获取一个新的 Context 并注入 Handlers 以及 RunInfo。
Handler 继承
与子 Context 继承父 Context 中的所有 Values 相同,子 Context 也会继承父 Context 中的所有 Handlers。举个例子,Graph 运行时传入的 Context 中如果已经有了 Handler,则这些 Handlers 都会被整个 Graph 的这次运行继承和生效。
注入 RunInfo
RunInfo 也需要注入到 Context 中,才会在触发回调时给到 Handler。
Graph 托管 RunInfo
Graph 会为内部所有的 Node 自动注入 RunInfo。机制是每个 Node 的运行,都是一个新的子 Context,Graph 向这个新的 Context 中注入对应 Node 的 RunInfo。
在 Graph 外注入 RunInfo
不想使用 Graph,但却想使用 Callback,则:
通过 InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) 获取一个新的 Context 并注入 Handlers 以及 RunInfo。
通过 ReuseHandlers(ctx context.Context, info *RunInfo) 来获取一个新的 Context,复用之前 Context 中的 Handler,并设置新的 RunInfo。
触发方式
组件实现内部触发(Component Callback)
在组件实现的代码中,调用 callbacks 包中的 OnStart(), OnEnd(), OnError(), OnStartWithStreamInput(), ``OnEndWithStreamOutput``()。以 Ark 的 ChatModel 实现为例,在 Generate 方法中:
func (cm *ChatModel) Generate(ctx context.Context, in []*schema.Message, opts ...fmodel.Option) (
outMsg *schema.Message, err error) {
defer func() {
if err != nil {
_ = callbacks.OnError(ctx, err)
}
}()
// omit multiple lines... instantiate req conf
ctx = callbacks.OnStart(ctx, &fmodel.CallbackInput{
Messages: in,
Tools: append(cm.rawTools), // join tool info from call options
ToolChoice: nil, // not support in api
Config: reqConf,
})
// omit multiple lines... invoke Ark chat API and get the response
_ = callbacks.OnEnd(ctx, &fmodel.CallbackOutput{
Message: outMsg,
Config: reqConf,
TokenUsage: toModelCallbackUsage(outMsg.ResponseMeta),
})
return outMsg, nil
}
在 Stream 方法中:
func (cm *ChatModel) Stream(ctx context.Context, in []*schema.Message, opts ...fmodel.Option) ( // byted_s_too_many_lines_in_func
outStream *schema.StreamReader[*schema.Message], err error) {
defer func() {
if err != nil {
_ = callbacks.OnError(ctx, err)
}
}()
// omit multiple lines... instantiate req conf
ctx = callbacks.OnStart(ctx, &fmodel.CallbackInput{
Messages: in,
Tools: append(cm.rawTools), // join tool info from call options
ToolChoice: nil, // not support in api
Config: reqConf,
})
// omit multiple lines... make request to Ark API and convert response stream to StreamReader[model.*CallbackOutput]
_, sr = callbacks.OnEndWithStreamOutput(ctx, sr)
return schema.StreamReaderWithConvert(sr,
func(src *fmodel.CallbackOutput) (*schema.Message, error) {
if src.Message == nil {
return nil, schema.ErrNoValue
}
return src.Message, nil
},
), nil
}
可以看到 Generate 调用时,触发的是 OnEnd,而 Stream 调用时,触发的是 OneEndWithStreamOutput:
组件实现内部触发 Callbacks 时:
-
当组件输入为 StreamReader 时,触发 OnStartWithStreamInput,否则触发 OnStart
-
当组件输出为 StreamReader 时,触发 OnEndWithStreamOutput,否则触发 OnEnd
内部实现了 callback 触发的组件,应当实现 Checker 接口,IsCallbacksEnabled 返回 true,向外部传达“我内部实现了 callback 触发”的信息:
// Checker tells callback aspect status of component's implementation
// When the Checker interface is implemented and returns true, the framework will not start the default aspect.
// Instead, the component will decide the callback execution location and the information to be injected.
type Checker interface {
IsCallbacksEnabled() bool
}
如果一个组件实现,没有实现 Checker 接口,或者 IsCallbacksEnabled 返回 false,可以认为该组件内部没有触发回调,需要 Graph Node 来负责注入和触发(在 Graph 内使用时)。
Graph Node 触发(Node Callback)
当一个 Component 被编排入 Graph 时,成为一个 Node。这时,如果 Component 自身会触发 callback,Node 就复用 Component 的 callback 处理。否则,Node 会在 Component 外面埋上 callback handler 触发点位。这些点位与 Component 自身的流式范式对应。比如一个 ChatModelNode,会在 Generate 方法外面埋上 OnStart/OnEnd/OnError,同时会在 Stream 方法外面埋上 OnStart/OnEndWithStreamOutput/OnError。
在 Graph 运行时,各组件会以 Invoke 或 Transform 范式运行,又会根据组件具体实现的业务流式范式,调用对应的组件方法。比如 Graph 以 Invoke 运行,Chat Model Node 会以 Invoke 运行,调用 Generate 方法。而当 Graph 以 Stream 运行,Chat Model Node 会以 Transform 运行,但 Chat Model 的业务流式范式中没有 Transform,会自动降级成调用 Stream 方法。因此:
Graph Node 具体触发哪个位点(OnStart 还是 OnStartWithStreamInput),取决于组件实现的业务流式范式和 Graph 运行方式两个因素。
关于 Eino 流式编程的详细介绍,参见 Eino 流式编程要点
Graph 自身触发(Graph Callback)
Graph 在自身的开始、结束、err 的时机触发 Callback Handler。如果 Graph 以 Invoke 形式调用,触发 OnStart/OnEnd/OnError。如果以 Stream/Collect/Transform 形式调用,触发 OnStartWithStreamInput/OnEndWithStreamOutput/OnError。这是因为 Graph 内部会始终以 Invoke 或 Transform 执行。参见 Eino 流式编程要点
值得注意的是:graph 也是 component 的一种,因此 graph callback 也是 component callback 的一种特殊形式。根据 Node Callback 的定义,当 Node 内部的 component 实现了对触发时机的感知和处理时,Node 会直接复用 Component 的实现,不会再实现 Node Callback。这意味着当一个 graph 通过 AddGraphNode 的方式加入到另外一个 Graph 中作为一个 Node 时,这个 Node 会复用内部 graph 的 graph callback。
解析 Callback Input & Output
从上文得知,Callback Input & Output 的底层是 Any,只是不同组件类型在具体触发回调时,可能会传入自己特定的类型。并且 Callback Handler 的接口定义中,各方法的入参也是 Any 类型的 Callback Input & Output。
因此,具体的 Handler 实现中,需要做两个事情:
-
根据 RunInfo 判断当前触发回调的是哪个组件类型,比如 RunInfo.Component == “ChatModel”,或者 RunInfo.Type == “xxx Chat Model”。
-
把 any 类型的 Callback Input & Output 转成对应的具体类型,以 RunInfo.Component == “ChatModel” 为例:
// ConvCallbackInput converts the callback input to the model callback input.
func ConvCallbackInput(src callbacks.CallbackInput) *CallbackInput {
switch t := src.(type) {
case *CallbackInput: // when callback is triggered within component implementation, the input is usually already a typed *model.CallbackInput
return t
case []*schema.Message: // when callback is injected by graph node, not the component implementation itself, the input is the input of Chat Model interface, which is []*schema.Message
return &CallbackInput{
Messages: t,
}
default:
return nil
}
}
// ConvCallbackOutput converts the callback output to the model callback output.
func ConvCallbackOutput(src callbacks.CallbackOutput) *CallbackOutput {
switch t := src.(type) {
case *CallbackOutput: // when callback is triggered within component implementation, the output is usually already a typed *model.CallbackOutput
return t
case *schema.Message: // when callback is injected by graph node, not the component implementation itself, the output is the output of Chat Model interface, which is *schema.Message
return &CallbackOutput{
Message: t,
}
default:
return nil
}
}
如果 Handler 里面需要增加 switch case 来判断 RunInfo.Component,并且对每一个 case,需要调对应的转换函数把 Any 转成具体类型,确实有些复杂。为了减少写胶水代码的重复劳动,我们提供了两种实现 Handler 的便捷工具函数。
Handler 实现方式
除了直接实现 Handler 接口外,Eino 提供了两种 Handler 的便捷实现工具。
HandlerHelper
如果用户的 Handler 只关注特定类型的组件,比如 ReactAgent 的场景,只关注 ChatModel 和 Tool,建议使用 HandlerHelper 来快速创建具体类型的 Callback Handler:
import ucb "github.com/cloudwego/eino/utils/callbacks"
handler := ucb.NewHandlerHelper().ChatModel(modelHandler).Tool(toolHandler).Handler()
其中 modelHandler 是 Chat Model 组件对 callback handler 的进一步封装:
// from package utils/callbacks
// ModelCallbackHandler is the handler for the model callback.
type ModelCallbackHandler struct {
OnStart func(ctx context.Context, runInfo *callbacks.RunInfo, input *model.CallbackInput) context.Context
OnEnd func(ctx context.Context, runInfo *callbacks.RunInfo, output *model.CallbackOutput) context.Context
OnEndWithStreamOutput func(ctx context.Context, runInfo *callbacks.RunInfo, output *schema.StreamReader[*model.CallbackOutput]) context.Context
OnError func(ctx context.Context, runInfo *callbacks.RunInfo, err error) context.Context
}
上面的 ModelCallbackHandler,封装了三个操作:
-
不再需要判断 RunInfo.Component 来选择属于 ChatModel 触发的回调,而是已经自动做了过滤。
-
只要求实现 Chat Model 这个组件支持的触发时机,这里去掉了不支持的 OnStartWithStreamInput。同时,如果用户只关注 Chat Model 支持的四个时机的某几个,比如只有 OnStart,也可以只实现 OnStart。
-
Input / Output 不再是 Any 类型,而是已经转化好的 model.CallbackInput, model.CallbackOutput。
HandlerHelper 支持全部的官方组件,目前的列表是:ChatModel, ChatTemplate, Retriever, Indexer, Embedding, Document.Loader, Document.Transformer, Tool, ToolsNode.
针对 Lambda,Graph,Chain 这些输入输出类型不确定的“组件”,也可以使用 HandlerHelper,但是只能做到上面的第 1 点,即按照组件类型做自动的过滤,2、3 点依然需要用户自己实现:
import ucb "github.com/cloudwego/eino/utils/callbacks"
handler := ucb.NewHandlerHelper().Lambda(callbacks.Handler).Graph(callbacks.Handler)...Handler()
这时,NewHandlerHelper().Lambda() 需要传入 callbacks.Handler 可以用下面的 HandlerBuilder 来实现。
HandlerBuilder
如果用户的 Handler 需要关注多个组件类型,但却只需要关注部分的触发时机,可以使用 HandlerBuilder:
import "github.com/cloudwego/eino/callbacks"
handler := callbacks.NewHandlerBuilder().OnStartFn(fn)...Build()
最佳实践
在 Graph 中使用
-
积极使用 Global Handlers,注册始终生效的 Handlers。
package main
import (
"context"
"log"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/compose"
)
func main() {
// Build a simple global handler
handler := callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
log.Printf("[Global Start] component=%s name=%s input=%T", info.Component, info.Name, input)
return ctx
}).
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
log.Printf("[Global End] component=%s name=%s output=%T", info.Component, info.Name, output)
return ctx
}).
OnErrorFn(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
log.Printf("[Global Error] component=%s name=%s err=%v", info.Component, info.Name, err)
return ctx
}).
Build()
// Register as global callbacks (applies to all subsequent runs)
callbacks.AppendGlobalHandlers(handler)
// Example graph usage; the global handler will be invoked automatically
g := compose.NewGraph[string, string]()
// ... add nodes/edges ...
r, _ := g.Compile(context.Background())
_, _ = r.Invoke(context.Background(), "hello") // triggers global callbacks
}
-
通过 WithHandlers option 在运行时注入 Handler,通过 DesignateNode 或 DesignateNodeByPath 指定生效的 Node / 嵌套的内部 Graph / 内部 Graph 的 Node。
package main
import (
"context"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/components/prompt"
"github.com/cloudwego/eino/schema"
)
func main() {
ctx := context.Background()
top := compose.NewGraph[map[string]any, []*schema.Message]()
sub := compose.NewGraph[map[string]any, []*schema.Message]()
_ = sub.AddChatTemplateNode("tmpl_nested", prompt.FromMessages(schema.FString, schema.UserMessage("Hello, {name}!")))
_ = sub.AddEdge(compose.START, "tmpl_nested")
_ = sub.AddEdge("tmpl_nested", compose.END)
_ = top.AddGraphNode("sub_graph", sub)
_ = top.AddEdge(compose.START, "sub_graph")
_ = top.AddEdge("sub_graph", compose.END)
r, _ := top.Compile(ctx)
optGlobal := compose.WithCallbacks(
callbacks.NewHandlerBuilder().OnEndFn(func(ctx context.Context, _ *callbacks.RunInfo, _ callbacks.CallbackOutput) context.Context { return ctx }).Build(),
)
optNode := compose.WithCallbacks(
callbacks.NewHandlerBuilder().OnStartFn(func(ctx context.Context, _ *callbacks.RunInfo, _ callbacks.CallbackInput) context.Context { return ctx }).Build(),
).DesignateNode("sub_graph")
optNested := compose.WithChatTemplateOption(
prompt.WrapImplSpecificOptFn(func(_ *struct{}) {}),
).DesignateNodeWithPath(
compose.NewNodePath("sub_graph", "tmpl_nested"),
)
_, _ = r.Invoke(ctx, map[string]any{"name": "Alice"}, optGlobal, optNode, optNested)
}
在 Graph 外使用
这个场景是:不使用 Graph/Chain/Workflow 等编排能力,单独用代码去调用 ChatModel/Tool/Lambda 等各种组件,且希望这些组件能成功触发 Callback Handlers。
此场景需要用户解决的问题是:手动设置正确的 RunInfo 和 Handlers,因为没有 Graph 来帮助用户自动设置 RunInfo 和 Handlers 了。
完整示例:
package main
import (
"context"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/compose"
)
func innerLambda(ctx context.Context, input string) (string, error) {
// 作为 ComponentB 的实现方:进入组件时补默认 RunInfo(Name 无法给默认值)
ctx = callbacks.EnsureRunInfo(ctx, "Lambda", compose.ComponentOfLambda)
ctx = callbacks.OnStart(ctx, input)
out := "inner:" + input
ctx = callbacks.OnEnd(ctx, out)
return out, nil
}
func outerLambda(ctx context.Context, input string) (string, error) {
// 作为 ComponentA 的实现方:进入组件时补默认 RunInfo
ctx = callbacks.EnsureRunInfo(ctx, "Lambda", compose.ComponentOfLambda)
ctx = callbacks.OnStart(ctx, input)
// 推荐:调用前替换 RunInfo,确保内层组件拿到正确的 name/type/component
ctxInner := callbacks.ReuseHandlers(ctx,
&callbacks.RunInfo{Name: "ComponentB", Type: "Lambda", Component: compose.ComponentOfLambda},
)
out1, _ := innerLambda(ctxInner, input) // 内层 RunInfo.Name = "ComponentB"
// 未替换:框架清空 RunInfo,只能靠 EnsureRunInfo 补默认值(Name 为空)
out2, _ := innerLambda(ctx, input) // 内层 RunInfo.Name == ""
final := out1 + "|" + out2
ctx = callbacks.OnEnd(ctx, final)
return final, nil
}
func main() {
// 在 graph 外单独使用组件:初始化 RunInfo 与 Handlers
h := callbacks.NewHandlerBuilder().Build()
ctx := callbacks.InitCallbacks(context.Background(),
&callbacks.RunInfo{Name: "ComponentA", Type: "Lambda", Component: compose.ComponentOfLambda},
h,
)
_, _ = outerLambda(ctx, "ping")
}
对上面的样例代码做下说明:
-
初始化:在 graph/chain 外使用组件时,用 InitCallbacks 设置首个 RunInfo 与 Handlers ,让后续组件执行能拿到完整回调上下文。
-
内部调用:在组件 A 内部调用组件 B 前,用 ReuseHandlers 替换 RunInfo (保留原有 handlers),确保 B 的回调拿到正确的 Type/Component/Name 。
-
不替换的后果:Eino 在一组 Callbacks 完整触发后,会清空当前 ctx 中的 RunInfo,此时因为 RunInfo 为空,Eino 就不再会触发 Callbacks;组件 B 的开发者只能在自身实现里用 EnsureRunInfo 补 Type/Component 的默认值,来确保 RunInfo 非空且大致正确,从而能成功触发 Callbacks。但无法给出合理 Name ,因此 RunInfo.Name 会是空字符串。
组件嵌套使用
场景:在一个组件,比如 Lambda 内,手动调用另外一个组件,比如 ChatModel。
这时,如果外层的组件的 ctx 中有 callback handler,因为这个 ctx 也会传入内部的组件,所以内部的组件也会收到同样的 callback handler。
按“是否希望内部组件触发 callback”区分:
-
希望触发:基本等同于上面一小节的情况,建议通过
ReuseHandlers来手动为内部组件设置RunInfo。
package main
import (
"context"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
)
// 外层 Lambda,在内部手动调用 ChatModel
func OuterLambdaCallsChatModel(cm model.BaseChatModel) *compose.Lambda {
return compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
// 1) 复用外层 handlers,并为内部组件显式设置 RunInfo
innerCtx := callbacks.ReuseHandlers(ctx, &callbacks.RunInfo{
Type: "InnerCM", // 可自定义
Component: components.ComponentOfChatModel, // 标注组件类型
Name: "inner-chat-model", // 可自定义
})
// 2) 构造输入消息
msgs := []*schema.Message{{Role: schema.User, Content: input}}
// 3) 调用 ChatModel(内部会触发相应的回调)
out, err := cm.Generate(innerCtx, msgs)
if err != nil {
return "", err
}
return out.Content, nil
})
}
上面的代码假设了“内部的 ChatModel 的 Generate 方法内部,已经调用了 OnStart,OnEnd,OnError 这些方法”。如果没有,则需要在外部组件内部“替内部组件”调用这些方法:
func OuterLambdaCallsChatModel(cm model.BaseChatModel) *compose.Lambda {
return compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
// 复用外层 handlers,并为内部组件显式设置 RunInfo
ctx = callbacks.ReuseHandlers(ctx, &callbacks.RunInfo{
Type: "InnerCM",
Component: components.ComponentOfChatModel,
Name: "inner-chat-model",
})
// 构造输入消息
msgs := []*schema.Message{{Role: schema.User, Content: input}}
// 显式触发 OnStart
ctx = callbacks.OnStart(ctx, msgs)
// 调用 ChatModel
resp, err := cm.Generate(ctx, msgs)
if err != nil {
// 显式触发 OnError
ctx = callbacks.OnError(ctx, err)
return "", err
}
// 显式触发 OnEnd
ctx = callbacks.OnEnd(ctx, resp)
return resp.Content, nil
})
}
-
不希望触发:这里假定内部组件实现了
IsCallbacksEnabled()且返回 true,并且在内部调用了EnsureRunInfo。这时默认内部 callbacks 会触发。如不希望触发,最简单的办法是去掉 ctx 中的 handler,比如为内部组件传一个新的 ctx:package main import ( "context" "github.com/cloudwego/eino/components/model" "github.com/cloudwego/eino/compose" "github.com/cloudwego/eino/schema" ) func OuterLambdaNoCallbacks(cm model.BaseChatModel) *compose.Lambda { return compose.InvokableLambda(func(ctx context.Context, input string) (string, error) { // 使用一个全新的 ctx,不复用外层的 handlers innerCtx := context.Background() msgs := []*schema.Message{{Role: schema.User, Content: input}} out, err := cm.Generate(innerCtx, msgs) if err != nil { return "", err } return out.Content, nil }) }-
但有时用户可能希望“只不触发某个特定的 callback handlers,但是还触发其他的 callback handlers”。建议的使用姿势是在这个 callback handler 中加代码,按 RunInfo 过滤掉内部组件:
-
package main
import (
"context"
"log"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components"
"github.com/cloudwego/eino/compose"
)
// 一个按 RunInfo 过滤的 handler:对内部 ChatModel(Type=InnerCM,Name=inner-chat-model)不做任何处理
func newSelectiveHandler() callbacks.Handler {
return callbacks.
NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
if info != nil && info.Component == components.ComponentOfChatModel &&
info.Type == "InnerCM" && info.Name == "inner-chat-model" {
// 过滤目标:内部 ChatModel,直接返回,不做处理
return ctx
}
log.Printf("[OnStart] %s/%s (%s)", info.Type, info.Name, info.Component)
return ctx
}).
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
if info != nil && info.Component == components.ComponentOfChatModel &&
info.Type == "InnerCM" && info.Name == "inner-chat-model" {
// 过滤目标:内部 ChatModel,直接返回,不做处理
return ctx
}
log.Printf("[OnEnd] %s/%s (%s)", info.Type, info.Name, info.Component)
return ctx
}).
Build()
}
// 组合示例:外层调用希望触发,特定 handler 通过 RunInfo 过滤掉内部 ChatModel
func Example(cm model.BaseChatModel) (compose.Runnable[string, string], error) {
handler := newSelectiveHandler()
chain := compose.NewChain[string, string]().
AppendLambda(OuterLambdaCallsChatModel(cm)) // 内部会 ReuseHandlers + RunInfo
return chain.Compile(
context.Background(),
// 挂载 handler(也可结合全局 handlers)
compose.WithCallbacks(handler),
)
}
Handler 内读写 input & output
Input & output 在 graph 中流转时,是直接变量赋值。如下图所示,NodeA.Output, NodeB.Input, NodeC.Input, 以及各个 Handler 中拿到的 input & output,如果是结构体指针或 Map 等引用类型,则都是同一份数据。因此,无论在 Node 内还是 Handler 内,都不建议修改 Input & Output,会产生并发问题:即使同步情况下,Node B 和 Node C 有并发,导致内部的 handler1 和 handler2 有并发。存在异步处理逻辑时,并发的可能场景更多。
在流传递的场景,所有下游节点和 handler 中的输入流,都是 StreamReader.Copy(n) 得到的流,可相互独立的读取流。但是,流中的每个 chunk,是直接变量赋值,如果 chunk 是结构体指针或 Map 等引用类型,各个 Copy 后的流读到的是同一份数据。因此,在 Node 和 Handler 内,同样不建议修改流的 chunk,有并发问题。
Handler 间传递信息
同一个 Handler 的不同时机之间,可通过 ctx 传递信息,如 OnStart 中通过 context.WithValue 返回一个新的 context,在 OnEnd 中从 context 中再取出这个 value。
不同 Handler 之间,没有执行顺序的保证,因此不建议通过上面的机制在不同 Handler 间传递信息。本质上是无法保证某一个 Handler 返回的 context,一定会进入下一个 Handler 的函数执行中。
如果需要在不同 Handler 之间传递信息,建议的方式是在最外层的 context(如 graph 执行时传入的 context)中,设置一个全局的、请求维度的变量作为公共信息的存取空间,在各个 Handler 中按需读取和更新这个公共变量。用户需要自行保证这个公共变量的并发安全。
流切记要 Close
以存在 ChatModel 这种具有真流输出的节点为例,当存在 Callback 切面时,ChatModel 的输出流:
-
既要被下游节点作为输入来消费,又要被 Callback 切面来消费
-
一个流中的一个帧(Chunk),只能被一个消费方消费到,即流不是广播模型
所以此时需要将流进行复制,其复制关系如下:
-
如果其中一个 Callback n 没有 Close 对应的流,可能导致原始 Stream 无法 Close 和释放资源。
calloption扩展
CallOption: 对 Graph 编译产物进行调用时,直接传递数据给特定的一组节点(Component、Implementation、Node)的渠道
-
和 节点 Config 的区别: 节点 Config 是实例粒度的配置,也就是从实例创建到实例消除,Config 中的值一旦确定就不需要改变了
-
CallOption:是请求粒度的配置,不同的请求,其中的值是不一样的。更像是节点入参,但是这个入参是直接由 Graph 的入口直接传入,而不是上游节点传入。
-
举例:给一个 ChatModel 节点传入 Temperature 配置;给一个 Lambda 节点传入自定义 option。
-
组件 CallOption 形态
组件 CallOption 配置,有两个粒度:
-
组件的抽象(Abstract/Interface)统一定义的 CallOption 配置【组件抽象 CallOption】
-
组件的实现(Type/Implementation)定义的该类型组件专用的 CallOption 配置【组件实现 CallOption】
以 ChatModel 这个 Component 为例,介绍 CallOption 的形态
Model 抽象与实现的目录
// 抽象所在代码位置
eino/components/model
├── interface.go
├── option.go // Component 抽象粒度的 CallOption 入参
// 抽象实现所在代码位置
eino-ext/components/model
├── claude
│ ├── option.go // Component 的一种实现的 CallOption 入参
│ └── chatmodel.go
├── ollama
│ ├── call_option.go // Component 的一种实现的 CallOption 入参
│ ├── chatmodel.go
Model 抽象
如上所述,在定义组件的 CallOption 时,需要区分【组件抽象 CallOption】、【组件实现 CallOption】两种场景。 而是否要提供 【组件实现 CallOption】,则是由 组件抽象 来决定的。
组件抽象提供的 CallOption 扩展能力如下(以 Model 为例,其他组件类似):
package model
type ChatModel interface {
Generate(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.Message, error)
Stream(ctx context.Context, input []*schema.Message, opts ...Option) (
*schema.StreamReader[*schema.Message], error)
// BindTools bind tools to the model.
// BindTools before requesting ChatModel generally.
// notice the non-atomic problem of BindTools and Generate.
BindTools(tools []*schema.ToolInfo) error
}
// 此结构体是【组件抽象CallOption】的统一定义。 组件的实现可根据自己的需要取用【组件抽象CallOption】的信息
// Options is the common options for the model.
type Options struct {
// Temperature is the temperature for the model, which controls the randomness of the model.
Temperature *float32
// MaxTokens is the max number of tokens, if reached the max tokens, the model will stop generating, and mostly return an finish reason of "length".
MaxTokens *int
// Model is the model name.
Model *string
// TopP is the top p for the model, which controls the diversity of the model.
TopP *float32
// Stop is the stop words for the model, which controls the stopping condition of the model.
Stop []string
}
// Option is the call option for ChatModel component.
type Option struct {
// 此字段是为【组件抽象CallOption】服务的 apply 方法,例如 WithTemperature
// 如果组件抽象不想提供【组件抽象CallOption】,可不提供此字段,同时不提供 GetCommonOptions() 方法
apply func(opts *Options)
// 此字段是为【组件实现CallOption】服务的 apply 方法。并假设 apply 方法为:func(*T)
// 如果组件抽象不想提供【组件实现CallOption】,可不提供此字段,同时不提供 GetImplSpecificOptions() 方法
implSpecificOptFn any
}
// WithTemperature is the option to set the temperature for the model.
func WithTemperature(temperature float32) Option {
return Option{
apply: func(opts *Options) {
opts.Temperature = &temperature
},
}
}
// WithMaxTokens is the option to set the max tokens for the model.
func WithMaxTokens(maxTokens int) Option {
return Option{
apply: func(opts *Options) {
opts.MaxTokens = &maxTokens
},
}
}
// WithModel is the option to set the model name.
func WithModel(name string) Option {
return Option{
apply: func(opts *Options) {
opts.Model = &name
},
}
}
// WithTopP is the option to set the top p for the model.
func WithTopP(topP float32) Option {
return Option{
apply: func(opts *Options) {
opts.TopP = &topP
},
}
}
// WithStop is the option to set the stop words for the model.
func WithStop(stop []string) Option {
return Option{
apply: func(opts *Options) {
opts.Stop = stop
},
}
}
// GetCommonOptions extract model Options from Option list, optionally providing a base Options with default values.
func GetCommonOptions(base *Options, opts ...Option) *Options {
if base == nil {
base = &Options{}
}
for i := range opts {
opt := opts[i]
if opt.apply != nil {
opt.apply(base)
}
}
return base
}
// 组件实现方基于此方法,封装自己的Option函数:func WithXXX(xxx string) Option{}
func WrapImplSpecificOptFn[T any](optFn func(*T)) Option {
return Option{
implSpecificOptFn: optFn,
}
}
// GetImplSpecificOptions provides tool author the ability to extract their own custom options from the unified Option type.
// T: the type of the impl specific options struct.
// This function should be used within the tool implementation's InvokableRun or StreamableRun functions.
// It is recommended to provide a base T as the first argument, within which the tool author can provide default values for the impl specific options.
func GetImplSpecificOptions[T any](base *T, opts ...Option) *T {
if base == nil {
base = new(T)
}
for i := range opts {
opt := opts[i]
if opt.implSpecificOptFn != nil {
optFn, ok := opt.implSpecificOptFn.(func(*T))
if ok {
optFn(base)
}
}
}
return base
}
Claude 实现
https://github.com/cloudwego/eino-ext/blob/main/components/model/claude/option.go
package claude
import (
"github.com/cloudwego/eino/components/model"
)
type options struct {
TopK *int32
}
func WithTopK(k int32) model.Option {
return model.WrapImplSpecificOptFn(func(o *options) {
o.TopK = &k
})
}
https://github.com/cloudwego/eino-ext/blob/main/components/model/claude/claude.go
func (c *claude) genMessageNewParams(input []*schema.Message, opts ...model.Option) (anthropic.MessageNewParams, error) {
if len(input) == 0 {
return anthropic.MessageNewParams{}, fmt.Errorf("input is empty")
}
commonOptions := model.GetCommonOptions(&model.Options{
Model: &c.model,
Temperature: c.temperature,
MaxTokens: &c.maxTokens,
TopP: c.topP,
Stop: c.stopSequences,
}, opts...)
claudeOptions := model.GetImplSpecificOptions(&options{TopK: c.topK}, opts...)
// omit mulple lines...
return nil, nil
}
编排中的 CallOption
https://github.com/cloudwego/eino/blob/main/compose/runnable.go
Graph 编译产物是 Runnable
type Runnable[I, O any] interface {
Invoke(ctx context.Context, input I, opts ...Option) (output O, err error)
Stream(ctx context.Context, input I, opts ...Option) (output *schema.StreamReader[O], err error)
Collect(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output O, err error)
Transform(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output *schema.StreamReader[O], err error)
}
Runnable 各方法均接收 compose.Option 列表。
https://github.com/cloudwego/eino/blob/main/compose/graph_call_options.go
包括 graph run 整体的配置,各类组件的配置,特定 Lambda 的配置等。
// Option is a functional option type for calling a graph.
type Option struct {
options []any
handler []callbacks.Handler
paths []*NodePath
maxRunSteps int
}
// DesignateNode set the key of the node which will the option be applied to.
// notice: only effective at the top graph.
// e.g.
//
// embeddingOption := compose.WithEmbeddingOption(embedding.WithModel("text-embedding-3-small"))
// runnable.Invoke(ctx, "input", embeddingOption.DesignateNode("my_embedding_node"))
func (o Option) DesignateNode(key ...string) Option {
nKeys := make([]*NodePath, len(key))
for i, k := range key {
nKeys[i] = NewNodePath(k)
}
return o.DesignateNodeWithPath(nKeys...)
}
// DesignateNodeWithPath sets the path of the node(s) to which the option will be applied to.
// You can make the option take effect in the subgraph by specifying the key of the subgraph.
// e.g.
// DesignateNodeWithPath({"sub graph node key", "node key within sub graph"})
func (o Option) DesignateNodeWithPath(path ...*NodePath) Option {
o.paths = append(o.paths, path...)
return o
}
// WithEmbeddingOption is a functional option type for embedding component.
// e.g.
//
// embeddingOption := compose.WithEmbeddingOption(embedding.WithModel("text-embedding-3-small"))
// runnable.Invoke(ctx, "input", embeddingOption)
func WithEmbeddingOption(opts ...embedding.Option) Option {
return withComponentOption(opts...)
}
compose.Option 可以按需分配给 Graph 中不同的节点。
// 所有节点都生效的 call option
compiledGraph.Invoke(ctx, input, WithCallbacks(handler))
// 只对特定类型节点生效的 call option
compiledGraph.Invoke(ctx, input, WithChatModelOption(WithTemperature(0.5))
compiledGraph.Invoke(ctx, input, WithToolOption(WithXXX("xxx"))
// 只对特定节点生效的 call option
compiledGraph.Invoke(ctx, input, WithCallbacks(handler).DesignateNode("node_1"))
// 只对特定内部嵌套图或其中节点生效的 Call option
compiledGraph.Invoke(ctx, input, WithCallbacks(handler).DesignateNodeWithPath(NewNodePath("1", "2"))
interrupt和断点
使用 Interrupt & CheckPoint 功能,可以实现在指定位置暂停 Graph 执行并在之后断点续传,如果是 StateGraph,还可以在断点续传前修改 State。
💡 断点续传仅能复原输入和运行时各节点产生的数据,需要确保 Graph 编排完全相同,以及重新完整传入 CallOption(没有特殊情况应当保持一致,除非依赖 CallOption 在 Resume 时传递数据等)。
使用静态 Interrupt
静态 Interrupt 支持在指定 Node 执行前或执行后暂停 Graph,Compile 时传入 WithInterruptAfterNodes 与 WithInterruptBeforeNodes Option 来设置 Interrupt:
import (
"github.com/cloudwego/eino/compose"
)
func main() {
g := NewGraph[string, string]()
err := g.AddLambdaNode("node1", compose.InvokableLambda(func(ctx **context**._Context_, input string) (output string, err error) {/*invokable func*/})
if err != nil {/* error handle */}
err = g.AddLambdaNode("node2", compose.InvokableLambda(func(ctx **context**._Context_, input string) (output string, err error) {/*invokable func*/})
if err != nil {/* error handle */}
/** other graph composed code
xxx
*/
err = g.Compile(ctx, compose.WithInterruptAfterNodes([]string{"node1"}), compose.WithInterruptBeforeNodes([]string{"node2"}))
if err != nil {/* error handle */}
}
💡 目前仅支持 Compile 时设置静态断点,如果需要请求时设置,欢迎提出~
可以从运行返回的 error 中获得本次运行是否 Interrupt 以及 Interrupt 信息:
// compose/checkpoint.go
**type **InterruptInfo **struct **{
State any
BeforeNodes []string
AfterNodes []string
RerunNodes []string
RerunNodesExtra **map**[string]any
SubGraphs **map**[string]*InterruptInfo
InterruptContexts []*InterruptCtx
}
func ExtractInterruptInfo(err error) (info *InterruptInfo, existed bool) {}
例如:
import "github.com/cloudwego/eino/compse"
/***graph compose code
* g := NewGraph
* xxx
* runner := g.Compile
*/
result, err := runner.Invoke(ctx, input)
if info, ok := ExtractInterruptInfo(err); ok {
// handler info
}
if err != nil {
// handle error
}
💡 Interrupt 时 output 为空值,没有意义。
使用 CheckPoint
CheckPoint 记录 Graph 运行状态,使用 CheckPoint 可以在 Interrupt 后恢复运行。
实现 CheckPointerStore
CheckPointStore 是一个 key 类型为 string、value 类型为[]byte 的 KV 存储接口,我们没有提供封装和默认实现,需要用户自行实现,用来存储 checkpoint。
// compose/checkpoint.go
type CheckpointStore interface {
Get(ctx **context**._Context_, key string) (value []byte, existed bool,err error)
Set(ctx **context**._Context_, key string, value []byte) (err error)
}
注册序列化方法
CheckPoint 的保存和读取涉及对 Graph 节点输入输出以及 State 的序列化和反序列化,在仅使用简单类型或 eino 内置类型(比如 Message 或 Document)时,用户无需额外操作;当引入自定义 struct 时,需要提前注册类型,Eino 提供了注册方法 schema.``RegisterName:
package main
import "github.com/cloudwego/eino/schema"
type MyState struct {
Counter int
Note string
}
func init() {
// Register the type with a stable name for serialization/persistence.
// Use the pointer form if you persist pointers to this type.
// It's recommended to register types within the `init()` function
// within the same file your type is declared.
schema.RegisterName[*MyState]("my_state_v1")
}
注册后的类型在序列化时将被额外记录类型信息,因此在反序列化时,即使不指明类型(比如反序列化到 interface{}),Eino 也可以反序列化出正确的类型。注册方法中的 key 唯一标识了这个类型,一旦确定了 key 需要保证其不能改变,否则已持久化的 checkpoint 将不能被正确恢复。
💡 结构体的未导出字段无法访问,因此不会被存储/恢复
默认情况下,会使用 eino 内置的序列化功能,此时,如果注册的类型实现了 json Marshaler 和 Unmarshaler,此类型的序列化和反序列化会使用自定义方法。
// encoding/json
type Marshaler interface {
MarshalJSON() ([]byte, error)
}
type Unmarshaler interface {
UnmarshalJSON([]byte) error
}
Eino 同时提供了将序列化方式改为 gob 的选项:
r, err := compose.NewChain[*AgentInput, Message]().
AppendLambda(compose.InvokableLambda(func(ctx context.Context, input *AgentInput) ([]Message, error) {
return a.genModelInput(ctx, instruction, input)
})).
AppendChatModel(a.model).
Compile(ctx, compose.WithGraphName(a.name),
compose.WithCheckPointStore(store),
compose.WithSerializer(&gobSerializer{}))
用户可以按偏好选择,选择后不建议轻易变更,历史数据不兼容。
开启 CheckPoint
创建 CheckPointStore 后在 Compile Graph 时作为 Option 传入,把 CheckPointer 绑定到 Graph:
import (
"github.com/cloudwego/eino/compose"
)
func main() {
/** graph composed code
xxx
*/
err = g.Compile(ctx, compose.WithCheckPointStore(store), compose.WithInterruptBeforeNodes([]string{"node2"}))
if err != nil {/* error handle */}
}
之后可以在请求时通过 CallOption 引入 CheckPoint:
// compose/checkpoint.go
func WithCheckPointID(checkPointID string) Option
Checkpoint id 会被作为 CheckPointStore 的 key 使用,graph 运行时会检查 CheckPointStore 是否存在此 id,如果存在则从 checkpoint 中恢复运行;interrupt 是会把 graph 状态保存到此 id 中。
动态 Interrupt
节点返回特殊错误可以动态地触发 Interrupt:
在 eino v0.7.0 之前
// eino/compose/interrupt.go
// emit a plain interrupt signal
var InterruptAndRerun = errors.New("interrupt and rerun")
// emit an interrupt signal with extra info
**func **NewInterruptAndRerunErr(extra any) error
Eino Graph 接收到节点返回此错误后会发生 interrupt,恢复运行时,会再次运行此节点,再次运行前会调用 StateModifier 修改 state(如果已配置)。
这种情况下,再次运行节点时输入会替换为空值,而不是原本的输入,如果再次运行时需要仍需要原本输入,需要提前保存到 State 中。
在 eino v0.7.0 及之后
增加了对“保存本地状态”、“透出内部中断信号”、“并行中断”的支持:
// eino/compose/interrupt.go
// emit an interrupt signal with user-facing info
func Interrupt(ctx context.Context, info any) error
// emit an interrupt signal with user-facing info AS WELL AS
// persistent LOCALLY-DEFINED state
func StatefulInterrupt(ctx context.Context, info any, state any) error
// emit an interrupt signal WRAPPING other interrupt signals
// emitted from inner processes,
// such as ToolsNode wrapping Tools.
func CompositeInterrupt(ctx context.Context, info any, state any, errs ...error)
详细设计参见:Eino human-in-the-loop 框架:技术架构指南
外部主动 Interrupt
有时,我们希望能在 Graph 外部主动触发中断,保存现场,之后择机恢复。这些场景可能包括实例优雅退出等。这时,可以通过调用 WithGraphInterrupt 获取一个 ctx 和一个 interrupt function。其中 ctx 用于传递给 graph.Invoke() 等运行方法,interrupt function 用于在用户希望主动中断时调用:
// from compose/graph_call_options.go
_// WithGraphInterrupt creates a context with graph cancellation support._
_// When the returned context is used to invoke a graph or workflow, calling the interrupt function will trigger an interrupt._
_// The graph will wait for current tasks to complete by default._
**func **WithGraphInterrupt(parent context.Context) (ctx context.Context, interrupt **func**(opts ...GraphInterruptOption)) {}
在主动调用 interrupt function 时,可以传递超时等参数:
// from compose/graph_call_options.go
_// WithGraphInterruptTimeout specifies the max waiting time before generating an interrupt._
_// After the max waiting time, the graph will force an interrupt. Any unfinished tasks will be re-run when the graph is resumed._
**func **WithGraphInterruptTimeout(timeout time.Duration) GraphInterruptOption {
**return func**(o *graphInterruptOptions) {
o.timeout = &timeout
}
}
当外部触发中断时,节点内部没有机会保存局部状态(包括节点的 input),所以 eino 会自动保存被外部中断的节点的 input,在下次执行时自动恢复。非外部触发中断的场景,节点内部发起中断时,保存 input 是每个节点的职责,可通过保存到 graph state 中或使用 compose.StatefulInterrupt 保存局部状态。
流式传输中的 CheckPoint
流式传输在保存 CheckPoint 时需要拼接数据流,因此需要注册拼接方法:
// compose/stream_concat.go
func RegisterStreamChunkConcatFunc[T any](fn func([]T) (T, error))
// example
type TestStruct struct {
Body string
}
// RegisterStreamChunkConcatFunc非线程安全,需要在初始化阶段使用
RegisterStreamChunkConcatFunc(func(ss []TestStruct)(TestStruct, error){
ret := TestStruct{Body:""}
for i := range ss {
ret.Body += ss[i].Body
}
return ret, nil
})
eino 默认提供了*schema.Message、[]*schema.Message 和 string 的 concat 方法。
嵌套图中的 Interrupt&CheckPoint
父图传入 CheckPointer 的前提下,AddGraphNode 时使用 WithGraphCompileOptions 传入 InterruptNodes 可以开启子图的 Interrupt&CheckPoint,父图未设置 CheckPointer 时会在 Compile 时报错。
/* graph compose code
xxx
*/
g.AddGraphNode("node1", subGraph, WithGraphCompileOptions(
WithInterruptAfterNodes([]string{"node2"}),
))
g.Compile(ctx, WithCheckPointStore(cp))
如果在子图中 interrupt,resume 时修改的 state 应为子图 state。TODO,说明下 StateModifier 中 Path 使用
恢复
恢复:Interrupt 并保存 checkpoint 后,后续的 graph 运行。
在 eino v0.7.0 之前
通过修改 State 来影响恢复时的行为。
// compose/checkpoint.go
type StateModifier func(ctx context.Context, path NodePath, state any) error
func WithStateModifier(sm StateModifier) GraphCompileOption
StateModifier 在 Graph 恢复运行时生效,可以在运行前修改 State,path 在嵌套图中生效,非嵌套视为空数组。
/* graph compose and compile
xxx
*/
// first run interrupt
id := GenUUID()
_, err := runner.Invoke(ctx, input, WithCheckPointID(id))
// resume from id
_, err = runner.Invoke(ctx, input/*unused*/,
WithCheckPointID(id),
WithStateModifier(func(ctx context.Context, path NodePath, state any) error{
state.(*testState).Field1 = "hello"
return nil
}),
)
💡 Resume 时 input 不会被读取,此时 input 传空即可。
在 eino v0.7.0 及之后
除了 StateModifier 之外,还可以选择性的恢复某个中断点,以及直接给指定的“中断点位”传递“恢复数据”:
// specifically resume particular interrupt point(s),
// without specifying resume data
func Resume(ctx context.Context, interruptIDs ...string) context.Context
// specifically resume one interrupt point, with custom resume data
func ResumeWithData(ctx context.Context, interruptID string, data any) context.Context
// specifically resume multiple interrupt points, each with custom resume data
func BatchResumeWithData(ctx context.Context, resumeData map[string]any) context.Context
其中,InterruptID 是从 interrupt error 中获取的:
interruptInfo, isInterrupt := ExtractInterruptInfo(err)
if isInterrupt {
// maybe multiple interrupt points exist here,
// we only take the first one for illustration purpose
interruptID = interruptInfo.InterruptContexts[0].ID
}
resumeData 是发生中断的点位定义的类型,比如一个 Tool 发生了中断并要求用户“审批”是否执行这个 Tool,自定义了一个 ApprovalResult 作为 resumeData:
func (i InvokableApprovableTool) InvokableRun(ctx context.Context, argumentsInJSON string,
opts ...tool.Option) (string, error) {
toolInfo, err := i.Info(ctx)
if err != nil {
return "", err
}
wasInterrupted, _, storedArguments := compose.GetInterruptState[string](ctx)
if !wasInterrupted { // initial invocation, interrupt and wait for approval
return "", compose.StatefulInterrupt(ctx, &ApprovalInfo{
ToolName: toolInfo.Name,
ArgumentsInJSON: argumentsInJSON,
ToolCallID: compose.GetToolCallID(ctx),
}, argumentsInJSON)
}
isResumeTarget, hasData, data := compose.GetResumeContext[*ApprovalResult](ctx)
if !isResumeTarget { // was interrupted but not explicitly resumed, reinterrupt and wait for approval again
return "", compose.StatefulInterrupt(ctx, &ApprovalInfo{
ToolName: toolInfo.Name,
ArgumentsInJSON: storedArguments,
ToolCallID: compose.GetToolCallID(ctx),
}, storedArguments)
}
if !hasData {
return "", fmt.Errorf("tool '%s' resumed with no data", toolInfo.Name)
}
if data.Approved {
return i.InvokableTool.InvokableRun(ctx, storedArguments, opts...)
}
if data.DisapproveReason != nil {
return fmt.Sprintf("tool '%s' disapproved, reason: %s", toolInfo.Name, *data.DisapproveReason), nil
}
return fmt.Sprintf("tool '%s' disapproved", toolInfo.Name), nil
}
例子
在 eino v0.7.0 之前
https://github.com/cloudwego/eino-examples/tree/main/compose/graph/react_with_interrupt
在 eino v0.7.0 之后
https://github.com/cloudwego/eino/blob/main/compose/resume_test.go
其中
TestInterruptStateAndResumeForRootGraph: 简单动态中断
TestInterruptStateAndResumeForSubGraph: 子图中断
TestInterruptStateAndResumeForToolInNestedSubGraph: 嵌套子图内部 tool 中断
TestMultipleInterruptsAndResumes: 并行中断
TestReentryForResumedTools: ReAct Agent 内 tool 中断,恢复后多次循环执行
TestGraphInterruptWithinLambda: Lambda 节点内包含独立 Graph 且内部中断
3.3 flow集成
一句话定义
集成组件 = 被标准化封装的、可被编排系统直接使用的 “原子能力单元”。
它到底是什么?
它就是一个黑盒功能块:
-
有固定的输入
-
有固定的输出
-
内部实现具体业务逻辑 / 工具能力 / 模型调用 / 数据处理
-
对外不暴露实现细节
-
能直接被编排系统(Graph/Chain)使用
它是 Eino 编排体系里的最小执行单元。
Flow 进编排
Flow 集成组件自身一般是由一个或多个 graph 编排而成。同时,这些 flow 也可以作为节点进入其他 graph 的编排之中,方式有三种:
-
如果一个 flow 实现了某个组件的 interface,可用该组件对应的 AddXXXNode 等方法加入编排,如 multiquery retriever:
// instantiate the flow: multiquery.NewRetriever vk, err := newVikingDBRetriever(ctx, vikingDBHost, vikingDBRegion, vikingDBAK, vikingDBSK) if err != nil { logs.Errorf("newVikingDBRetriever failed, err=%v", err) return } llm, err := newChatModel(ctx, openAIBaseURL, openAIAPIKey, openAIModelName) if err != nil { logs.Errorf("newChatModel failed, err=%v", err) return } // rewrite query by llm mqr, err := multiquery.NewRetriever(ctx, &multiquery.Config{ RewriteLLM: llm, RewriteTemplate: nil, // use default QueryVar: "", // use default LLMOutputParser: nil, // use default MaxQueriesNum: 3, OrigRetriever: vk, FusionFunc: nil, // use default fusion, just deduplicate by doc id }) if err != nil { logs.Errorf("NewMultiQueryRetriever failed, err=%v", err) return } // add the flow to graph graph := compose.NewGraph[string, *schema.Message]() _ = graph.AddRetrieverNode("multi_query_retriever", mqr, compose.WithOutputKey("context")) _ = graph.AddEdge(compose._START_, "multi_query_retriever") _ = graph.AddChatTemplateNode("template", prompt.FromMessages(schema._FString_, schema.UserMessage("{context}"))) // ... -
如果一个 flow 内部是由单个 graph 编排而成,且 flow 的功能可完全等价于这个 graph 的运行(没有不能转化成 graph run 的定制逻辑),则可以将该 flow 的 graph 导出,通过 AddGraphNode 等方法加入编排,如 ReAct Agent 和 Host Multi-Agent:
// instantiate the host multi-agent hostMA, err := NewMultiAgent(ctx, &MultiAgentConfig{ Host: Host{ ChatModel: mockHostLLM, }, Specialists: []*Specialist{ specialist1, specialist2, }, }) assert.Nil(t, err) // export graph and []GraphAddNodeOption from host multi-agent maGraph, opts := hostMA.ExportGraph() // add to another graph fullGraph, err := compose.NewChain[map[string]any, *schema.Message](). AppendChatTemplate(prompt.FromMessages(schema._FString_, schema.UserMessage("what's the capital city of {country_name}"))). AppendGraph(maGraph, append(opts, compose.WithNodeKey("host_ma_node"))...). Compile(ctx) assert.Nil(t, err) // invoke the other graph // convert the flow's own option to compose.Option if needed // assign options to flow's nodes if needed out, err := fullGraph.Invoke(ctx, map[string]any{"country_name": "China"}, compose.WithCallbacks(ConvertCallbackHandlers(mockCallback)). DesignateNodeWithPath(compose.NewNodePath("host_ma_node", hostMA.HostNodeKey()))) -
所有 flow 应当都可以封装成 Lambda,通过 AddLambdaNode 等方法加入编排。目前所有的 flow 都可以通过 1 或 2 加入编排,所以不需要降级到使用 Lambda。如果要用,使用姿势是:
// instantiate the flow a, err := NewAgent(ctx, &AgentConfig{ Model: cm, ToolsConfig: compose.ToolsNodeConfig{ Tools: []tool.BaseTool{fakeTool, &fakeStreamToolGreetForTest{}}, }, MaxStep: 40, }) assert.Nil(t, err) chain := compose.NewChain[[]*schema.Message, string]() // convert the flow to Lambda agentLambda, err := compose.AnyLambda(a.Generate, a.Stream, nil, nil) assert.Nil(t, err) // add lambda to another graph chain. AppendLambda(agentLambda). AppendLambda(compose.InvokableLambda(func(ctx context.Context, input *schema.Message) (string, error) { t.Log("got agent response: ", input.Content) return input.Content, nil })) r, err := chain.Compile(ctx) assert.Nil(t, err) // invoke the graph res, err := r.Invoke(ctx, []*schema.Message{{Role: schema._User_, Content: "hello"}}, compose.WithCallbacks(callbackForTest))
三个方法的对比如下:
| 方式 | 适用场景 | 优势 |
| 作为组件 | 需实现组件的 interface | 简单直接,语义清晰 |
| 作为 Graph | 由单个 graph 编排而成,功能不超出这个 graph 的范围 | graph 内节点对外层 graph 暴露,可统一分配运行时 option,相比 Lambda 少一层转化,可通过 GraphCompileCallback 获取上下级 graph 关系 |
| 作为 Lambda | 所有 | 普适 |
ReAct Agent 使用手册
简介
Eino React Agent 是实现了 React 逻辑 的智能体框架,用户可以用来快速灵活地构建并调用 React Agent.
💡 代码实现详见:实现代码目录
节点拓扑&数据流图
react agent 底层使用 compose.Graph 作为编排方案,一般来说有 2 个节点: ChatModel、Tools,中间运行过程中的所有历史消息都会放入 state 中,在将所有历史消息传递给 ChatModel 之前,会 copy 消息交由 MessageModifier 进行处理,处理的结果再传递给 ChatModel。直到 ChatModel 返回的消息中不再有 tool call,则返回最终消息。
当 Tools 列表中至少有一个 Tool 配置了 ReturnDirectly 时,ReAct Agent 结构会更复杂:在 ToolsNode 之后会增加一个 Branch,判断是否调用了一个 ReturnDirectly 的 Tool,如果是,直接 END,否则照旧进入 ChatModel。
初始化
提供了 ReactAgent 初始化函数,必填参数为 Model 和 ToolsConfig,选填参数为 MessageModifier, MaxStep, ToolReturnDirectly 和 StreamToolCallChecker.
go get github.com/cloudwego/eino-ext/components/model/openai@latest
go get github.com/cloudwego/eino@latest
import (
"github.com/cloudwego/eino-ext/components/model/openai"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/components/tool"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/flow/agent/react"
"github.com/cloudwego/eino/schema"
)
func main() {
// 先初始化所需的 chatModel
toolableChatModel, err := openai.NewChatModel(...)
// 初始化所需的 tools
tools := compose.ToolsNodeConfig{
InvokableTools: []tool.InvokableTool{mytool},
StreamableTools: []tool.StreamableTool{myStreamTool},
}
// 创建 agent
agent, err := react.NewAgent(ctx, &react.AgentConfig{
ToolCallingModel: toolableChatModel,
ToolsConfig: tools,
...
}
}
Model
由于 ReAct Agent 需要进行工具调用,Model 需要拥有 ToolCall 的能力,因此需要配置一个 ToolCallingChatModel。
在 Agent 内部,会调用 WithTools 接口向模型注册 Agent 的工具列表,定义为:
// BaseChatModel defines the basic interface for chat models.
// It provides methods for generating complete outputs and streaming outputs.
// This interface serves as the foundation for all chat model implementations.
//
//go:generate mockgen -destination ../../internal/mock/components/model/ChatModel_mock.go --package model -source interface.go
type BaseChatModel interface {
Generate(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.Message, error)
Stream(ctx context.Context, input []*schema.Message, opts ...Option) (
*schema.StreamReader[*schema.Message], error)
}
// ToolCallingChatModel extends BaseChatModel with tool calling capabilities.
// It provides a WithTools method that returns a new instance with
// the specified tools bound, avoiding state mutation and concurrency issues.
type ToolCallingChatModel interface {
BaseChatModel
// WithTools returns a new ToolCallingChatModel instance with the specified tools bound.
// This method does not modify the current instance, making it safer for concurrent use.
WithTools(tools []*schema.ToolInfo) (ToolCallingChatModel, error)
}
目前,eino 提供了 openai, ark 等实现,只要底层模型支持 tool call 即可。
go get github.com/cloudwego/eino-ext/components/model/openai@latest
go get github.com/cloudwego/eino-ext/components/model/ark@latest
import (
"github.com/cloudwego/eino-ext/components/model/openai"
"github.com/cloudwego/eino-ext/components/model/ark"
)
func openaiExample() {
chatModel, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
BaseURL: os.Getenv("OPENAI_BASE_URL"),
Key: os.Getenv("OPENAI_ACCESS_KEY"),
ByAzure: true,
Model: "{{model name which support tool call}}",
})
agent, err := react.NewAgent(ctx, react.AgentConfig{
ToolCallingModel: chatModel,
ToolsConfig: ...,
})
}
func arkExample() {
arkModel, err := ark.NewChatModel(context.Background(), ark.ChatModelConfig{
APIKey: os.Getenv("ARK_API_KEY"),
Model: os.Getenv("ARK_MODEL"),
})
agent, err := react.NewAgent(ctx, react.AgentConfig{
ToolCallingModel: arkModel,
ToolsConfig: ...,
})
}
ToolsConfig
toolsConfig 类型为 compose.ToolsNodeConfig, 在 eino 中,若要构建一个 Tool 节点,则需要提供 Tool 的信息,以及调用 Tool 的 function。tool 的接口定义如下:
type InvokableRun func(ctx context.Context, arguments string, opts ...Option) (content string, err error)
type StreamableRun func(ctx context.Context, arguments string, opts ...Option) (content *schema.StreamReader[string], err error)
type BaseTool interface {
Info() *schema.ToolInfo
}
// InvokableTool the tool for ChatModel intent recognition and ToolsNode execution.
type InvokableTool interface {
BaseTool
Run() InvokableRun
}
// StreamableTool the stream tool for ChatModel intent recognition and ToolsNode execution.
type StreamableTool interface {
BaseTool
Run() StreamableRun
}
用户可以根据 tool 的接口定义自行实现所需的 tool,同时框架也提供了更简便的构建 tool 的方法:
userInfoTool := utils.NewTool(
&schema.ToolInfo{
Name: "user_info",
Desc: "根据用户的姓名和邮箱,查询用户的公司、职位、薪酬信息",
ParamsOneOf: schema.NewParamsOneOfByParams(map[string]*schema.ParameterInfo{
"name": {
Type: "string",
Desc: "用户的姓名",
},
"email": {
Type: "string",
Desc: "用户的邮箱",
},
}),
},
func(ctx context.Context, input *userInfoRequest) (output *userInfoResponse, err error) {
return &userInfoResponse{
Name: input.Name,
Email: input.Email,
Company: "Cool Company LLC.",
Position: "CEO",
Salary: "9999",
}, nil
})
toolConfig := &compose.ToolsNodeConfig{
InvokableTools: []tool.InvokableTool{invokeTool},
}
MessageModifier
MessageModifier 会在每次把所有历史消息传递给 ChatModel 之前执行,定义为:
// modify the input messages before the model is called.
type MessageModifier func(ctx context.Context, input []*schema.Message) []*schema.Message
在 Agent 中配置 MessageModifier 可以修改传入模型的 messages,常用于添加前置的 system message:
import (
"github.com/cloudwego/eino/flow/agent/react"
"github.com/cloudwego/eino/schema"
)
func main() {
agent, err := react.NewAgent(ctx, &react.AgentConfig{
Model: toolableChatModel,
ToolsConfig: tools,
MessageModifier: func(ctx context.Context, input []*schema.Message) []*schema.Message {
res := make([]*schema.Message, 0, len(input)+1)
res = append(res, schema.SystemMessage("你是一个 golang 开发专家."))
res = append(res, input...)
return res
},
})
agent.Generate(ctx, []*schema.Message{schema.UserMessage("写一个 hello world 的代码")})
// 模型得到的实际输入为:
// []*schema.Message{
// {Role: schema.System, Content:"你是一个 golang 开发专家."},
// {Role: schema.Human, Content: "写一个 hello world 的代码"}
//}
}
MessageRewriter
MessageRewriter 在每次 ChatModel 之前执行,会修改并更新保存全局状态中的历史消息:
// MessageRewriter modifies message in the state, before the ChatModel is called.
// It takes the messages stored accumulated in state, modify them, and put the modified version back into state.
// Useful for compressing message history to fit the model context window,
// or if you want to make changes to messages that take effect across multiple model calls.
// NOTE: if both MessageModifier and MessageRewriter are set, MessageRewriter will be called before MessageModifier.
MessageRewriter MessageModifier
常用于上下文压缩这种在多轮 ReAct 循环中需要一直生效的消息变更。
对比 MessageModifier(只变更不持久,因此适合 system prompt),MessageRewriter 的变更在后续的 ReAct 循环也可见。
MaxStep
指定 Agent 最大运行步长,每次从一个节点转移到下一个节点为一步,默认值为 node 个数 + 2。
由于 Agent 中一次循环为 ChatModel + Tools,即为 2 步,因此默认值 12 最多可运行 6 个循环。但由于最后一步必须为 ChatModel 返回 (因为 ChatModel 结束后判断无须运行 tool 才能返回最终结果),因此最多运行 5 次 tool。
同理,若希望最多可运行 10 个循环 (10 次 ChatModel + 9 次 Tools),则需要设置 MaxStep 为 20。若希望最多运行 20 个循环,则 MaxStep 需为 40。
func main() {
agent, err := react.NewAgent(ctx, &react.AgentConfig{
ToolCallingModel: toolableChatModel,
ToolsConfig: tools,
MaxStep: 20,
}
}
ToolReturnDirectly
如果希望当 ChatModel 选择了特定的 Tool 并执行后,Agent 直接把 Tool 的 Response ToolMessage 返回去,则可以在 ToolReturnDirectly 中配置这个 Tool。
a, err = NewAgent(ctx, &AgentConfig{
Model: cm,
ToolsConfig: compose.ToolsNodeConfig{
Tools: []tool.BaseTool{fakeTool, fakeStreamTool},
},
MaxStep: 40,
ToolReturnDirectly: map[string]struct{}{fakeToolName: {}}, // one of the two tools is return directly
})
StreamToolCallChecker
不同的模型在流式模式下输出工具调用的方式可能不同: 某些模型(如 OpenAI) 会直接输出工具调用;某些模型 (如 Claude) 会先输出文本,然后再输出工具调用。因此需要使用不同的方法来判断,这个字段用来指定判断模型流式输出中是否包含工具调用的函数。
可选填写,未填写时使用“非空包”是否包含工具调用判断:
func firstChunkStreamToolCallChecker(_ context.Context, sr *schema.StreamReader[*schema.Message]) (bool, error) {
defer sr.Close()
for {
msg, err := sr.Recv()
if err == io.EOF {
return false, nil
}
if err != nil {
return false, err
}
if len(msg.ToolCalls) > 0 {
return true, nil
}
if len(msg.Content) == 0 { // skip empty chunks at the front
continue
}
return false, nil
}
}
上述默认实现适用于:模型输出的 Tool Call Message 中只有 Tool Call。
默认实现不适用的情况:在输出 Tool Call 前,有非空的 content chunk。此时,需要自定义 tool Call checker 如下:
toolCallChecker := func(ctx context.Context, sr *schema.StreamReader[*schema.Message]) (bool, error) {
defer sr.Close()
for {
msg, err := sr.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
// finish
break
}
return false, err
}
if len(msg.ToolCalls) > 0 {
return true, nil
}
}
return false, nil
}
上面这个自定义 StreamToolCallChecker,在极端情况下可能需要判断所有包是否包含 ToolCall,从而导致“流式判断”的效果丢失。如果希望尽可能保留“流式判断”效果,解决这一问题的建议是:
💡 尝试添加 prompt 来约束模型在工具调用时不额外输出文本,例如:“如果需要调用 tool,直接输出 tool,不要输出文本”。
不同模型受 prompt 影响可能不同,实际使用时需要自行调整 prompt 并验证效果。
调用
Generate
agent, _ := react.NewAgent(...)
var outMessage *schema.Message
outMessage, err = agent.Generate(ctx, []*schema.Message{
schema.UserMessage("写一个 golang 的 hello world 程序"),
})
Stream
agent, _ := react.NewAgent(...)
var msgReader *schema.StreamReader[*schema.Message]
msgReader, err = agent.Stream(ctx, []*schema.Message{
schema.UserMessage("写一个 golang 的 hello world 程序"),
})
for {
// msg type is *schema.Message
msg, err := msgReader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
// finish
break
}
// error
log.Printf("failed to recv: %v\n", err)
return
}
fmt.Print(msg.Content)
}
WithCallbacks
Callback 是在 Agent 运行时特定时机执行的回调,由于 Agent 这个 Graph 里面只有 ChatModel 和 ToolsNode,因此 Agent 的 Callback 就是 ChatModel 和 Tool 的 Callback。react 包中提供了一个 helper function 来帮助用户快速构建针对这两个组件类型的 Callback Handler。
import (
template "github.com/cloudwego/eino/utils/callbacks"
)
// BuildAgentCallback builds a callback handler for agent.
// e.g.
//
// callback := BuildAgentCallback(modelHandler, toolHandler)
// agent, err := react.NewAgent(ctx, &AgentConfig{})
// agent.Generate(ctx, input, agent.WithComposeOptions(compose.WithCallbacks(callback)))
func BuildAgentCallback(modelHandler *template.ModelCallbackHandler, toolHandler *template.ToolCallbackHandler) callbacks.Handler {
return template.NewHandlerHelper().ChatModel(modelHandler).Tool(toolHandler).Handler()
}
Options
React agent 支持通过运行时 Option 动态修改
场景 1:运行时修改 Agent 中的 Model 配置,通过:
// WithChatModelOptions returns an agent option that specifies model.Option for the chat model in agent.
func WithChatModelOptions(opts ...model.Option) agent.AgentOption {
return agent.WithComposeOptions(compose.WithChatModelOption(opts...))
}
场景 2:运行时修改 Tool 列表,通过:
// WithToolList returns an agent option that specifies the list of tools can be called which are BaseTool but must implement InvokableTool or StreamableTool.
func WithToolList(tools ...tool.BaseTool) agent.AgentOption {
return agent.WithComposeOptions(compose.WithToolsNodeOption(compose.WithToolList(tools...)))
}
另外,也需要修改 ChatModel 中绑定的 tool: WithChatModelOptions(model.WithTools(...))
场景 3:运行时修改某个 Tool 的 option,通过:
// WithToolOptions returns an agent option that specifies tool.Option for the tools in agent.
func WithToolOptions(opts ...tool.Option) agent.AgentOption {
return agent.WithComposeOptions(compose.WithToolsNodeOption(compose.WithToolOption(opts...)))
}
Prompt
运行时修改 prompt,其实就是在 Generate 或者 Stream 的时候,传入不同的 Message 列表。
获取中间结果
如果希望实时拿到 React Agent 执行过程中产生的 *schema.Message,可以先通过 WithMessageFuture 获取一个运行时 Option 和一个 MessageFuture:
// WithMessageFuture returns an agent option and a MessageFuture interface instance.
// The option configures the agent to collect messages generated during execution,
// while the MessageFuture interface allows users to asynchronously retrieve these messages.
func WithMessageFuture() (agent.AgentOption, MessageFuture) {
h := &cbHandler{started: make(chan struct{})}
cmHandler := &ub.ModelCallbackHandler{
OnEnd: h.onChatModelEnd,
OnEndWithStreamOutput: h.onChatModelEndWithStreamOutput,
}
toolHandler := &ub.ToolCallbackHandler{
OnEnd: h.onToolEnd,
OnEndWithStreamOutput: h.onToolEndWithStreamOutput,
}
graphHandler := callbacks.NewHandlerBuilder().
OnStartFn(h.onGraphStart).
OnStartWithStreamInputFn(h.onGraphStartWithStreamInput).
OnEndFn(h.onGraphEnd).
OnEndWithStreamOutputFn(h.onGraphEndWithStreamOutput).
OnErrorFn(h.onGraphError).Build()
cb := ub.NewHandlerHelper().ChatModel(cmHandler).Tool(toolHandler).Graph(graphHandler).Handler()
option := agent.WithComposeOptions(compose.WithCallbacks(cb))
return option, h
}
这个运行时 Option 就正常传递给 Generate 或者 Stream 方法。这个 MessageFuture 可以 GetMessages 或者 GetMessageStreams 来获取各中间状态的 Message。
💡 传入 MessageFuture 的 Option 后,Agent 仍然会阻塞运行,通过 MessageFuture 接收中间结果需要和 Agent 运行异步(在 goroutine 中读 MessageFuture 或在 goroutine 中运行 Agent)
Agent In Graph/Chain
Agent 可作为 Lambda 嵌入到其他的 Graph 中:
agent, _ := NewAgent(ctx, &AgentConfig{
ToolCallingModel: cm,
ToolsConfig: compose.ToolsNodeConfig{
Tools: []tool.BaseTool{fakeTool, &fakeStreamToolGreetForTest{}},
},
MaxStep: 40,
})
chain := compose.NewChain[[]*schema.Message, string]()
agentLambda, _ := compose.AnyLambda(agent.Generate, agent.Stream, nil, nil)
chain.
AppendLambda(agentLambda).
AppendLambda(compose.InvokableLambda(func(ctx context.Context, input *schema.Message) (string, error) {
t.Log("got agent response: ", input.Content)
return input.Content, nil
}))
r, _ := chain.Compile(ctx)
res, _ := r.Invoke(ctx, []*schema.Message{{Role: schema.User, Content: "hello"}},
compose.WithCallbacks(callbackForTest))
Demo
基本信息
简介:这是一个拥有两个 tool (query_restaurants 和 query_dishes ) 的 美食推荐官
地址:eino-examples/flow/agent/react
使用方式:
-
clone eino-examples repo,并 cd 到根目录
-
提供一个
OPENAI_API_KEY:export OPENAI_API_KEY=xxxxxxx -
运行 demo:
go run flow/agent/react/react.go
运行过程解释
-
模拟用户输入了
我在海淀区,给我推荐一些菜,需要有口味辣一点的菜,至少推荐有 2 家餐厅 -
agent 运行第一个节点
ChatModel,大模型判断出需要做一次 ToolCall 调用来查询餐厅,并且给出的参数为:
"function": {
"name": "query_restaurants",
"arguments": "{\"location\":\"海淀区\",\"topn\":2}"
}
-
进入
Tools节点,调用 查询餐厅 的 tool,并且得到结果,结果返回了 2 家海淀区的餐厅信息:
[{"id":"1001","name":"老地方餐厅","place":"北京老胡同 5F, 左转进入","desc":"","score":3},{"id":"1002","name":"人间味道餐厅","place":"北京大世界商城-1F","desc":"","score":5}]
-
得到 tool 的结果后,此时对话的 history 中包含了 tool 的结果,再次运行
ChatModel,大模型判断出需要再次调用另一个 ToolCall,用来查询餐厅有哪些菜品,注意,由于有两家餐厅,因此大模型返回了 2 个 ToolCall,如下:
"Message": {
"role": "ai",
"content": "",
"tool_calls": [ // <= 这里有 2 个 tool call
{
"index": 1,
"id": "call_wV7zA3vGGJBhuN7r9guhhAfF",
"function": {
"name": "query_dishes",
"arguments": "{\"restaurant_id\": \"1002\", \"topn\": 5}"
}
},
{
"index": 0,
"id": "call_UOsp0jRtzEbfxixNjP5501MF",
"function": {
"name": "query_dishes",
"arguments": "{\"restaurant_id\": \"1001\", \"topn\": 5}"
}
}
]
}
-
再次进入到
Tools节点,由于有 2 个 tool call,Tools 节点内部并发执行这两个调用,并且均加入到对话的 history 中,从 callback 的调试日志中可以看到结果如下:
=========[OnToolStart]=========
{"restaurant_id": "1001", "topn": 5}
=========[OnToolEnd]=========
[{"name":"红烧肉","desc":"一块红烧肉","price":20,"score":8},{"name":"清泉牛肉","desc":"很多的水煮牛肉","price":50,"score":8},{"name":"清炒小南瓜","desc":"炒的糊糊的南瓜","price":5,"score":5},{"name":"韩式辣白菜","desc":"这可是开过光的辣白菜,好吃得很","price":20,"score":9},{"name":"酸辣土豆丝","desc":"酸酸辣辣的土豆丝","price":10,"score":9}]
=========[OnToolStart]=========
{"restaurant_id": "1002", "topn": 5}
=========[OnToolEnd]=========
[{"name":"红烧排骨","desc":"一块一块的排骨","price":43,"score":7},{"name":"大刀回锅肉","desc":"经典的回锅肉, 肉很大","price":40,"score":8},{"name":"火辣辣的吻","desc":"凉拌猪嘴,口味辣而不腻","price":60,"score":9},{"name":"辣椒拌皮蛋","desc":"擂椒皮蛋,下饭的神器","price":15,"score":8}]
-
得到所有 tool call 返回的结果后,再次进入
ChatModel节点,这次大模型发现已经拥有了回答用户提问的所有信息,因此整合信息后输出结论,由于调用时使用的Stream方法,因此流式返回的大模型结果
Host Multi-Agent
是一个 Host 做意图识别后,跳转到某个专家 agent 做实际的生成。只转发,不生成子任务。
以一个简单的“日记助手”做例子:可以写日记、读日记、根据日记回答问题。
完整样例参见:https://github.com/cloudwego/eino-examples/tree/main/flow/agent/multiagent/host/journal
Host:
func newHost(ctx context.Context, baseURL, apiKey, modelName string) (*host.Host, error) {
chatModel, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
BaseURL: baseURL,
Model: modelName,
ByAzure: true,
APIKey: apiKey,
})
if err != nil {
return nil, err
}
return &host.Host{
ChatModel: chatModel,
SystemPrompt: "You can read and write journal on behalf of the user. When user asks a question, always answer with journal content.",
}, nil
}
写日记的“专家”:host 识别出用户意图是写日记后,会跳转到这里,把用户想要写的内容写到文件里。
func newWriteJournalSpecialist(ctx context.Context) (*host.Specialist, error) {
chatModel, err := ollama.NewChatModel(ctx, &ollama.ChatModelConfig{
BaseURL: "http://localhost:11434",
Model: "llama3-groq-tool-use",
Options: &api.Options{
Temperature: 0.000001,
},
})
if err != nil {
return nil, err
}
// use a chat model to rewrite user query to journal entry
// for example, the user query might be:
//
// write: I got up at 7:00 in the morning.
//
// should be rewritten to:
//
// I got up at 7:00 in the morning.
chain := compose.NewChain[[]*schema.Message, *schema.Message]()
chain.AppendLambda(compose.InvokableLambda(func(ctx context.Context, input []*schema.Message) ([]*schema.Message, error) {
systemMsg := &schema.Message{
Role: schema._System_,
Content: "You are responsible for preparing the user query for insertion into journal. The user's query is expected to contain the actual text the user want to write to journal, as well as convey the intention that this query should be written to journal. You job is to remove that intention from the user query, while preserving as much as possible the user's original query, and output ONLY the text to be written into journal",
}
return append([]*schema.Message{systemMsg}, input...), nil
})).
AppendChatModel(chatModel).
AppendLambda(compose.InvokableLambda(func(ctx context.Context, input *schema.Message) (*schema.Message, error) {
err := appendJournal(input.Content)
if err != nil {
return nil, err
}
return &schema.Message{
Role: schema._Assistant_,
Content: "Journal written successfully: " + input.Content,
}, nil
}))
r, err := chain.Compile(ctx)
if err != nil {
return nil, err
}
return &host.Specialist{
AgentMeta: host.AgentMeta{
Name: "write_journal",
IntendedUse: "treat the user query as a sentence of a journal entry, append it to the right journal file",
},
Invokable: func(ctx context.Context, input []*schema.Message, opts ...agent.AgentOption) (*schema.Message, error) {
return r.Invoke(ctx, input, agent.GetComposeOptions(opts...)...)
},
}, nil
}
读日记的“专家”:host 识别出用户意图是读日记后,会跳转到这里,读日记文件内容并一行行的输出。就是一个本地的 function。
func newReadJournalSpecialist(ctx context.Context) (*host.Specialist, error) {
// create a new read journal specialist
return &host.Specialist{
AgentMeta: host.AgentMeta{
Name: "view_journal_content",
IntendedUse: "let another agent view the content of the journal",
},
Streamable: func(ctx context.Context, input []*schema.Message, opts ...agent.AgentOption) (*schema.StreamReader[*schema.Message], error) {
now := time.Now()
dateStr := now.Format("2006-01-02")
journal, err := readJournal(dateStr)
if err != nil {
return nil, err
}
reader, writer := schema.Pipe[*schema.Message](0)
go func() {
scanner := bufio.NewScanner(journal)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
line := scanner.Text()
message := &schema.Message{
Role: schema._Assistant_,
Content: line + "\n",
}
writer.Send(message, nil)
}
if err := scanner.Err(); err != nil {
writer.Send(nil, err)
}
writer.Close()
}()
return reader, nil
},
}, nil
}
根据日记回答问题的"专家":
func newAnswerWithJournalSpecialist(ctx context.Context) (*host.Specialist, error) {
chatModel, err := ollama.NewChatModel(ctx, &ollama.ChatModelConfig{
BaseURL: "http://localhost:11434",
Model: "llama3-groq-tool-use",
Options: &api.Options{
Temperature: 0.000001,
},
})
if err != nil {
return nil, err
}
// create a graph: load journal and user query -> chat template -> chat model -> answer
graph := compose.NewGraph[[]*schema.Message, *schema.Message]()
if err = graph.AddLambdaNode("journal_loader", compose.InvokableLambda(func(ctx context.Context, input []*schema.Message) (string, error) {
now := time.Now()
dateStr := now.Format("2006-01-02")
return loadJournal(dateStr)
}), compose.WithOutputKey("journal")); err != nil {
return nil, err
}
if err = graph.AddLambdaNode("query_extractor", compose.InvokableLambda(func(ctx context.Context, input []*schema.Message) (string, error) {
return input[len(input)-1].Content, nil
}), compose.WithOutputKey("query")); err != nil {
return nil, err
}
systemTpl := `Answer user's query based on journal content: {journal}'`
chatTpl := prompt.FromMessages(schema._FString_,
schema.SystemMessage(systemTpl),
schema.UserMessage("{query}"),
)
if err = graph.AddChatTemplateNode("template", chatTpl); err != nil {
return nil, err
}
if err = graph.AddChatModelNode("model", chatModel); err != nil {
return nil, err
}
if err = graph.AddEdge("journal_loader", "template"); err != nil {
return nil, err
}
if err = graph.AddEdge("query_extractor", "template"); err != nil {
return nil, err
}
if err = graph.AddEdge("template", "model"); err != nil {
return nil, err
}
if err = graph.AddEdge(compose._START_, "journal_loader"); err != nil {
return nil, err
}
if err = graph.AddEdge(compose._START_, "query_extractor"); err != nil {
return nil, err
}
if err = graph.AddEdge("model", compose._END_); err != nil {
return nil, err
}
r, err := graph.Compile(ctx)
if err != nil {
return nil, err
}
return &host.Specialist{
AgentMeta: host.AgentMeta{
Name: "answer_with_journal",
IntendedUse: "load journal content and answer user's question with it",
},
Invokable: func(ctx context.Context, input []*schema.Message, opts ...agent.AgentOption) (*schema.Message, error) {
return r.Invoke(ctx, input, agent.GetComposeOptions(opts...)...)
},
}, nil
}
编排成 host multi agent 并在命令行启动:
func main() {
ctx := context.Background()
h, err := newHost(ctx)
if err != nil {
panic(err)
}
writer, err := newWriteJournalSpecialist(ctx)
if err != nil {
panic(err)
}
reader, err := newReadJournalSpecialist(ctx)
if err != nil {
panic(err)
}
answerer, err := newAnswerWithJournalSpecialist(ctx)
if err!= nil {
panic(err)
}
hostMA, err := host.NewMultiAgent(ctx, &host.MultiAgentConfig{
Host: *h,
Specialists: []*host.Specialist{
writer,
reader,
answerer,
},
})
if err != nil {
panic(err)
}
cb := &logCallback{}
for { // 多轮对话,除非用户输入了 "exit",否则一直循环
println("\n\nYou: ") // 提示轮到用户输入了
var message string
scanner := bufio.NewScanner(os.Stdin) // 获取用户在命令行的输入
for scanner.Scan() {
message += scanner.Text()
break
}
if err := scanner.Err(); err != nil {
panic(err)
}
if message == "exit" {
return
}
msg := &schema.Message{
Role: schema._User_,
Content: message,
}
out, err := hostMA.Stream(ctx, []*schema.Message{msg}, host.WithAgentCallbacks(cb))
if err != nil {
panic(err)
}
defer out.Close()
println("\nAnswer:")
for {
msg, err := out.Recv()
if err != nil {
if err == io.EOF {
break
}
}
print(msg.Content)
}
}
}
运行 console 输出:
You:
write journal: I got up at 7:00 in the morning
HandOff to write_journal with argument {"reason":"I got up at 7:00 in the morning"}
Answer:
Journal written successfully: I got up at 7:00 in the morning
You:
read journal
HandOff to view_journal_content with argument {"reason":"User wants to read the journal content."}
Answer:
I got up at 7:00 in the morning
You:
when did I get up in the morning?
HandOff to answer_with_journal with argument {"reason":"To find out the user's morning wake-up times"}
Answer:
You got up at 7:00 in the morning.
FAQ
Host 直接输出时没有流式
Host Multi-Agent 提供了一个 StreamToolCallChecker 的配置,用于判断 Host 是否直接输出。
不同的模型在流式模式下输出工具调用的方式可能不同: 某些模型(如 OpenAI) 会直接输出工具调用;某些模型 (如 Claude) 会先输出文本,然后再输出工具调用。因此需要使用不同的方法来判断,这个字段用来指定判断模型流式输出中是否包含工具调用的函数。
可选填写,未填写时使用“非空包”是否包含工具调用判断:
func firstChunkStreamToolCallChecker(_ context.Context, sr *schema.StreamReader[*schema.Message]) (bool, error) {
defer sr.Close()
for {
msg, err := sr.Recv()
if err == io.EOF {
return false, nil
}
if err != nil {
return false, err
}
if len(msg.ToolCalls) > 0 {
return true, nil
}
if len(msg.Content) == 0 { // skip empty chunks at the front
continue
}
return false, nil
}
}
上述默认实现适用于:模型输出的 Tool Call Message 中只有 Tool Call。
默认实现不适用的情况:在输出 Tool Call 前,有非空的 content chunk。此时,需要自定义 tool Call checker 如下:
toolCallChecker := func(ctx context.Context, sr *schema.StreamReader[*schema.Message]) (bool, error) {
defer sr.Close()
for {
msg, err := sr.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
// finish
break
}
return false, err
}
if len(msg.ToolCalls) > 0 {
return true, nil
}
}
return false, nil
}
上面这个自定义 StreamToolCallChecker,在极端情况下可能需要判断所有包是否包含 ToolCall,从而导致“流式判断”的效果丢失。如果希望尽可能保留“流式判断”效果,解决这一问题的建议是:
💡 尝试添加 prompt 来约束模型在工具调用时不额外输出文本,例如:“如果需要调用 tool,直接输出 tool,不要输出文本”。
不同模型受 prompt 影响可能不同,实际使用时需要自行调整 prompt 并验证效果。
Host 同时选择多个 Specialist
Host 以 Tool Call 的形式给出对 Specialist 的选择,因此可能以 Tool Call 列表的形式同时选中多个 Specialist。此时 Host Multi-Agent 会同时将请求路由到这多个 Specialist,并在多个 Specialist 完成后,通过 Summarizer 节点总结多条 Message 为一条 Message,作为 Host Multi-Agent 的最终输出。
用户可通过配置 Summarizer,指定一个 ChatModel 以及 SystemPrompt,来定制化 Summarizer 的行为。如未指定,Host Multi-Agent 会将多个 Specialist 的输出 Message Content 拼接后返回。
3.4 agent adk
Component 和 Agent 的关系:
- Component 不构成完整的 AI 应用:它只是能力单元,需要被组织、编排、执行
- Agent 是完整的 AI 应用:它封装了完整的业务逻辑,可以直接运行
- Agent 内部使用 Component:最核心的是
ChatModel(对话能力)和Tool(执行能力)
为什么需要 Agent?
如果只有 Component,你需要自己:
- 管理对话历史
- 编排调用流程(何时调用模型、何时调用工具)
- 处理流式输出
- 实现中断恢复
ChatModel vs ChatModelAgent:本质区别
|
维度 |
ChatModel |
ChatModelAgent |
|
定位 |
Component(组件) |
Agent(智能体) |
|
接口 |
Generate() / Stream() |
Run() -> AsyncIterator[*AgentEvent] |
|
输出 |
直接返回消息内容 |
返回事件流(包含消息、控制动作等) |
|
能力 |
单纯的模型调用 |
可扩展 tools、middleware、interrupt 等 |
|
适用场景 |
简单的对话场景 |
复杂的智能体应用 |
-
ChatModel 就像"数据库驱动":负责与数据库通信,屏蔽 MySQL/PostgreSQL 的差异
-
ChatModelAgent 就像"业务逻辑层":基于数据库驱动构建,但还包含业务规则、事务管理等
快速开始
Agent
什么是 Eino ADK
Eino ADK 参考 Google-ADK 的设计,提供了 Go 语言 的 Agents 开发的灵活组合框架,即 Agent、Multi-Agent 开发框架,并为多 Agent 交互场景沉淀了通用的上下文传递、事件流分发和转换、任务控制权转让、中断与恢复、通用切面等能力。
什么是 Agent
Agent 是 Eino ADK 的核心,它代表一个独立的、可执行的智能任务单元。你可以把它想象成一个能够理解指令、执行任务并给出回应的“智能体”。每个 Agent 都有明确的名称和描述,使其可以被其他 Agent 发现和调用。
任何需要与大语言模型(LLM)交互的场景都可以抽象为一个 Agent。例如:
-
一个用于查询天气信息的 Agent。
-
一个用于预定会议的 Agent。
-
一个能够回答特定领域知识的 Agent。
Eino ADK 中的 Agent
Eino ADK 中的所有功能设计均围绕 Agent 抽象设计展开:
type Agent interface {
Name(ctx context.Context) string
Description(ctx context.Context) string
Run(ctx context.Context, input *AgentInput) *AsyncIterator[*AgentEvent]
}
基于 Agent 抽象,ADK 提供了三大类基础拓展:
-
ChatModel Agent: 应用程序的“思考”部分,利用 LLM 作为核心,理解自然语言,进行推理、规划、生成响应,并动态决定如何执行或使用哪些工具。 -
Workflow Agents:应用程序的协调管理部分,基于预定义的逻辑,按照自身类型(顺序 / 并发 / 循环)控制子 Agent 执行流程。Workflow Agents 产生确定性的,可预测的执行模式,不同于 ChatModel Agent 生成的动态随机的决策。-
顺序 (Sequential Agent):按顺序依次执行子 Agents
-
循环 (Loop Agent):重复执行子 Agents,直至满足特定的终止条件
-
并行 (Parallel Agent):并行执行多个子 Agents
-
-
Custom Agent:通过接口实现自己的 Agent,允许定义高度定制的复杂 Agent
基于基础扩展,您可以针对自己的需求排列组合这些基础 Agents,构建所需要的 Multi-Agent 系统。另外,Eino 从日常实践经验出发,内置提供了几种开箱即用的 Multi-Agent 最佳范式:
-
Supervisor: 监督者模式,监督者 Agent 控制所有通信流程和任务委托,并根据当前上下文和任务需求决定调用哪个 Agent。
-
Plan-Execute:计划-执行模式,Plan Agent 生成含多个步骤的计划,Execute Agent 根据用户 query 和计划来完成任务。Execute 后会再次调用 Plan,决定完成任务 / 重新进行规划。
下方表格和图提供了这些基础拓展与封装的特点,区别,与关系。后续章节中将展开介绍每种类型的原理与细节:
|
类别 |
ChatModel Agent |
Workflow Agents |
Custom Logic |
EinoBuiltInAgent(supervisor, plan-execute) |
|
功能 |
思考,生成,工具调用 |
控制 Agent 之间的执行流程 |
运行自定义逻辑 |
开箱即用的 Multi-agent 模式封装 |
|
核心 |
LLM |
预确定的执行流程(顺序,并发,循环) |
自定义代码 |
基于 Eino 实践积累的经验,对前三者的高度封装 |
|
用途 |
生成,动态决策 |
结构化处理,编排 |
定制需求 |
特定场景内的开箱即用 |
ADK Examples
Eino-examples 项目中提供了多种 ADK 的实施样例,您可以参考样例代码与简介,对 adk 能力构建初步的认知:
|
项目路径 |
简介 |
结构图 |
|
该示例代码展示了基于 eino adk 的 Workflow 模式构建的一个顺序执行的多智能体工作流。
|
||
|
该示例代码基于 eino adk 的 Workflow 模式中的 LoopAgent,构建了一个反思迭代型智能体框架。
|
||
|
该示例代码基于 eino adk 的 Workflow 模式中的 ParallelAgent,构建了一个并发信息搜集框架:
|
||
|
该用例采用单层 Supervisor 管理两个功能较为综合的子 Agent:Research Agent 负责检索任务,Math Agent 负责多种数学运算(加、乘、除),但所有数学运算均由同一个 Math Agent 内部统一处理,而非拆分为多个子 Agent。此设计简化了代理层级,适合任务较为集中且不需要过度拆解的场景,便于快速部署和维护。 |
||
|
该用例实现了多层级智能体监督体系,顶层 Supervisor 管理 Research Agent 和 Math Agent,Math Agent 又进一步细分为 Subtract、Multiply、Divide 三个子 Agent。顶层 Supervisor 负责将研究任务和数学任务分配给下级 Agent,Math Agent 作为中层监督者再将具体数学运算任务分派给其子 Agent。
|
||
|
本示例基于 eino adk 实现 plan-execute-replan 模式的多 Agent 旅行规划系统,核心功能是处理用户复杂旅行请求(如 “3 天北京游,需从纽约出发的航班、酒店推荐、必去景点”),通过 “计划 - 执行 - 重新计划” 循环完成任务:1. 计划(Plan): Planner Agent 基于大模型生成分步执行计划(如 “第一步查北京天气,第二步搜纽约到北京航班”);2. 执行(Execute): Executor Agent 调用 ** 天气(get_weather)、航班(search_flights)、酒店(search_hotels)、景点(search_attractions)** 等 Mock 工具执行每一步,若用户输入信息缺失(如未说明预算),则调用 ask_for_clarification 工具追问;3. 重新计划(Replan): Replanner Agent 根据工具执行结果评估是否需要调整计划(如航班无票则重新选日期)。Execute 和 Replan 不断循环运行,直至完成计划中的所有步骤;4. 支持会话轨迹跟踪(CozeLoop 回调)和状态管理,最终输出完整旅行方案。从结构上看,plan-execute-replan 分为两层:
|
||
|
书籍推荐 agent(运行中断与恢复) |
该代码展示了基于 eino adk 框架构建的一个书籍推荐聊天智能体实现,体现了 Agent 运行中断与恢复功能。
|
What’s Next
经过 Quickstart 概览,您应该对 Eino ADK 与 Agent 有了基础的认知。
接下来的文章将深入介绍 ADK 的核心概念,助您理解 Eino ADK 的工作原理并更好的使用它:
什么是 Eino ADK?
Eino ADK 参考 Google-ADK 的设计,提供了 Go 语言 的 Agents 开发的灵活组合框架,即 Agent、Multi-Agent 开发框架。Eino ADK 为多 Agent 交互时,沉淀了通用的 上下文传递、事件流分发和转换、任务控制权转让、中断与恢复、通用切面等能力。 适用场景广泛、模型无关、部署无关,让 Agent、Multi-Agent 开发更加简单、便利,并提供完善的生产级应用的治理能力。
Eino ADK 旨在帮助开发者开发、管理 Agent 应用。提供灵活且鲁棒的开发环境,助力开发者搭建 对话智能体、非对话智能体、复杂任务、工作流等多种多样的 Agent 应用。
ADK 框架
Eino ADK 的整体模块构成,如下图所示:
Agent Interface
Eino ADK 的核心是 Agent 抽象(Agent Interface),ADK 的所有功能设计均围绕 Agent 抽象展开。详解请见 Eino ADK: Agent 抽象 [New]
type Agent interface {
Name(ctx context.Context) string
Description(ctx context.Context) string
// Run runs the agent.
// The returned AgentEvent within the AsyncIterator must be safe to modify.
// If the returned AgentEvent within the AsyncIterator contains MessageStream,
// the MessageStream MUST be exclusive and safe to be received directly.
// NOTE: it's recommended to use SetAutomaticClose() on the MessageStream of AgentEvents emitted by AsyncIterator,
// so that even the events are not processed, the MessageStream can still be closed.
Run(ctx context.Context, input *AgentInput, options ...AgentRunOption) *AsyncIterator[*AgentEvent]
}
Agent.Run 的定义为:
-
从入参 AgentInput、AgentRunOption 和可选的 Context Session 中获取任务详情及相关数据
-
执行任务,并将执行过程、执行结果写入到 AgentEvent Iterator
Agent.Run 要求 Agent 的实现以 Future 模式异步执行,核心分成三步,具体可参考 ChatModelAgent 中 Run 方法的实现:
-
创建一对 Iterator、Generator
-
启动 Agent 的异步任务,并传入 Generator,处理 AgentInput。Agent 在这个异步任务执行核心逻辑(例如 ChatModelAgent 调用 LLM),并在产生新的事件时写入到 Generator 中,供 Agent 调用方在 Iterator 中消费
-
启动 2 中的任务后立即返回 Iterator
多 Agent 协作
围绕 Agent 抽象,Eino ADK 提供多种简单易用、场景丰富的组合原语,可支撑开发丰富多样的 Multi-Agent 协同策略,比如 Supervisor、Plan-Execute、Group-Chat 等 Multi-Agent 场景。从而实现不同的 Agent 分工合作模式,处理更复杂的任务。详解请见 Eino ADK: Agent 组合
Eino ADK 定义的 Agent 协作过程中的协作原语如下:
-
Agent 间协作方式
|
协助方式 |
描述 |
|
Transfer |
直接将任务转让给另外一个 Agent,本 Agent 则执行结束后退出,不关心转让 Agent 的任务执行状态 |
|
ToolCall(AgentAsTool) |
将 Agent 当成 ToolCall 调用,等待 Agent 的响应,并可获取被调用Agent 的输出结果,进行下一轮处理 |
-
AgentInput 的上下文策略
|
上下文策略 |
描述 |
|
上游 Agent 全对话 |
获取本 Agent 的上游 Agent 的完整对话记录 |
|
全新任务描述 |
忽略掉上游 Agent 的完整对话记录,给出一个全新的任务总结,作为子 Agent 的 AgentInput 输入 |
-
决策自主性
|
决策自主性 |
描述 |
|
自主决策 |
在 Agent 内部,基于其可选的下游 Agent, 如需协助时,自主选择下游 Agent 进行协助。 一般来说,Agent 内部是基于 LLM 进行决策,不过即使是基于预设逻辑进行选择,从 Agent 外部看依然视为自主决策 |
|
预设决策 |
事先预设好一个Agent 执行任务后的下一个 Agent。 Agent 的执行顺序是事先确定、可预测的 |
围绕协作原语,Eino ADK 提供了如下的几种 Agent 组合原语:
|
类型 |
描述 |
运行模式 |
协作方式 |
上下文策略 |
决策自主性 |
|
SubAgents |
将用户提供的 agent 作为 父Agent,用户提供的 subAgents 列表作为 子Agents,组合而成可自主决策的 Agent,其中的 Name 和 Description 作为该 Agent 的名称标识和描述。
|
Transfer |
上游 Agent 全对话 |
自主决策 |
|
|
Sequential |
将用户提供的 SubAgents 列表,组合成按照顺序依次执行的 Sequential Agent,其中的 Name 和 Description 作为 Sequential Agent 的名称标识和描述。Sequential Agent 执行时,将 SubAgents 列表,按照顺序依次执行,直至将所有 Agent 执行一遍后结束。 |
Transfer |
上游 Agent 全对话 |
预设决策 |
|
|
Parallel |
将用户提供的 SubAgents 列表,组合成基于相同上下文,并发执行的 Parallel Agent,其中的 Name 和 Description 作为 Parallel Agent 的名称标识和描述。Parallel Agent 执行时,将 SubAgents 列表,并发执行,待所有 Agent 执行完成后结束。 |
Transfer |
上游 Agent 全对话 |
预设决策 |
|
|
Loop |
将用户提供的 SubAgents 列表,按照数组顺序依次执行,循环往复,组合成 Loop Agent,其中的 Name 和 Description 作为 Loop Agent 的名称标识和描述。Loop Agent 执行时,将 SubAgents 列表,顺序执行,待所有 Agent 执行完成后结束。 |
Transfer |
上游 Agent 全对话 |
预设决策 |
|
|
AgentAsTool |
将一个 Agent 转换成 Tool,被其他的 Agent 当成普通的 Tool 使用。一个 Agent 能否将其他 Agent 当成 Tool 进行调用,取决于自身的实现。Eino ADK 中提供的 ChatModelAgent 支持 AgentAsTool 的功能 |
ToolCall |
全新任务描述 |
自主决策 |
ChatModelAgent
ChatModelAgent 是 Eino ADK 对 Agent 的关键实现,它封装了与大语言模型的交互逻辑,实现了 ReAct 范式的 Agent,基于 Eino 中的 Graph 编排出 ReAct Agent 控制流,通过 callbacks.Handler 导出 ReAct Agent 运行过程中产生的事件,转换成 AgentEvent 返回。
想要进一步了解 ChatModelAgent,请见:Eino ADK: ChatModelAgent [New]
type ChatModelAgentConfig struct {
// Name of the agent. Better be unique across all agents.
Name string
// Description of the agent's capabilities.
// Helps other agents determine whether to transfer tasks to this agent.
Description string
// Instruction used as the system prompt for this agent.
// Optional. If empty, no system prompt will be used.
// Supports f-string placeholders for session values in default GenModelInput, for example:
// "You are a helpful assistant. The current time is {Time}. The current user is {User}."
// These placeholders will be replaced with session values for "Time" and "User".
Instruction string
Model model.ToolCallingChatModel
ToolsConfig ToolsConfig
// GenModelInput transforms instructions and input messages into the model's input format.
// Optional. Defaults to defaultGenModelInput which combines instruction and messages.
GenModelInput GenModelInput
// Exit defines the tool used to terminate the agent process.
// Optional. If nil, no Exit Action will be generated.
// You can use the provided 'ExitTool' implementation directly.
Exit tool.BaseTool
// OutputKey stores the agent's response in the session.
// Optional. When set, stores output via AddSessionValue(ctx, outputKey, msg.Content).
OutputKey string
// MaxIterations defines the upper limit of ChatModel generation cycles.
// The agent will terminate with an error if this limit is exceeded.
// Optional. Defaults to 20.
MaxIterations int
}
func NewChatModelAgent(_ context.Context, config *ChatModelAgentConfig) (*ChatModelAgent, error) {
// omit code
}
AgentRunner
AgentRunner 是 Agent 的执行器,为 Agent 运行所需要的拓展功能加以支持,详解请见:Eino ADK: Agent 扩展
只有通过 Runner 执行 agent 时,才可以使用 ADK 的如下功能:
-
Interrupt & Resume
-
切面机制
-
Context 环境的预处理
type RunnerConfig struct { Agent Agent EnableStreaming bool CheckPointStore compose.CheckPointStore } func NewRunner(_ context.Context, conf RunnerConfig) *Runner { // omit code }
agent抽象
https://www.cloudwego.io/zh/docs/eino/core_modules/eino_adk/agent_interface/
agent协作
https://www.cloudwego.io/zh/docs/eino/core_modules/eino_adk/agent_collaboration/
agent实现
https://www.cloudwego.io/zh/docs/eino/core_modules/eino_adk/agent_implementation/
Agent Runner 与扩展
Agent Runner
定义
Runner 是 Eino ADK 中负责执行 Agent 的核心引擎。它的主要作用是管理和控制 Agent 的整个生命周期,如处理多 Agent 协作,保存传递上下文等,interrupt、callback 等切面能力也均依赖 Runner 实现。任何 Agent 都应通过 Runner 来运行。
Interrupt & Resume
Agent Runner 提供运行时中断与恢复的功能,该功能允许一个正在运行的 Agent 主动中断其执行并保存当前状态,支持从中断点恢复执行。该功能常用于 Agent 处理流程中需要外部输入、长时间等待或可暂停等场景。
下面将对一次中断到恢复过程中的三个关键点进行介绍:
-
Interrupted Action:由 Agent 抛出中断事件,Agent Runner 拦截
-
Checkpoint:Agent Runner 拦截事件后保存当前运行状态
-
Resume:运行条件重新 ready 后,由 Agent Runner 从断点恢复运行
Interrupted Action
在 Agent 的执行过程中,可以通过产生包含 Interrupted Action 的 AgentEvent 来主动中断 Runner 的运行。
当 Event 中的 Interrupted 不为空时,Agent Runner 便会认为发生中断:
// github.com/cloudwego/eino/adk/interface.go
type AgentAction struct {
// other actions
Interrupted *InterruptInfo
// other actions
}
// github.com/cloudwego/eino/adk/interrupt.go
type InterruptInfo struct {
Data any
}
当中断发生时,可以通过 InterruptInfo 结构体附带自定义的中断信息。此信息:
-
会被传递给调用者,可以通过该信息向调用者说明中断原因等
-
如果后续需要恢复 Agent 运行,InterruptInfo 会在恢复时重新传递给中断的 Agent,Agent 可以依据该信息恢复运行
// 例如 ChatModelAgent 中断时,会发送如下的 AgentEvent:
h.Send(&AgentEvent{AgentName: h.agentName, Action: &AgentAction{
Interrupted: &InterruptInfo{
Data: &ChatModelAgentInterruptInfo{Data: data, Info: info},
},
}})
状态持久化 (Checkpoint)
当 Runner 捕获到这个带有 Interrupted Action 的 Event 时,会立即终止当前的执行流程。 如果:
-
Runner 中设置了 CheckPointStore
// github.com/cloudwego/eino/adk/runner.go
type RunnerConfig struct {
// other fields
CheckPointStore CheckPointStore
}
// github.com/cloudwego/eino/adk/interrupt.go
type CheckPointStore interface {
Set(ctx context.Context, key string, value []byte) error
Get(ctx context.Context, key string) ([]byte, bool, error)
}
-
调用 Runner 时通过 AgentRunOption WithCheckPointID 传入 CheckPointID
// github.com/cloudwego/eino/adk/interrupt.go
func WithCheckPointID(id string) _AgentRunOption_
Runner 在终止运行后会将当前运行状态(原始输入、对话历史等)以及 Agent 抛出的 InterruptInfo 以 CheckPointID 为 key 持久化到 CheckPointStore 中。
💡 为了保存 interface 中数据的原本类型,Eino ADK 使用 gob(https://pkg.go.dev/encoding/gob)序列化运行状态。因此在使用自定义类型时需要提前使用 gob.Register 或 gob.RegisterName 注册类型(更推荐后者,前者使用路径加类型名作为默认名字,因此类型的位置和名字均不能发生变更)。Eino 会自动注册框架内置的类型。
Resume
运行中断,调用 Runner 的 Resume 接口传入中断时的 CheckPointID 可以恢复运行:
// github.com/cloudwego/eino/adk/runner.go
func (r *Runner) Resume(ctx context.Context, checkPointID string, opts ...AgentRunOption) (*AsyncIterator[*AgentEvent], error)
恢复 Agent 运行需要发生中断的 Agent 实现了 ResumableAgent 接口, Runner 从 CheckPointerStore 读取运行状态并恢复运行,其中 InterruptInfo 和上次运行配置的 EnableStreaming 会作为输入提供给 Agent:
// github.com/cloudwego/eino/adk/interface.go
type ResumableAgent interface {
Agent
Resume(ctx context.Context, info *ResumeInfo, opts ...AgentRunOption) *AsyncIterator[*AgentEvent]
}
// github.com/cloudwego/eino/adk/interrupt.go
type ResumeInfo struct {
EnableStreaming bool
*_InterruptInfo_
}
Resume 如果向 Agent 传入新信息,可以定义 AgentRunOption,在调用 Runner.Resume 时传入。
Eino human-in-the-loop框架
https://www.cloudwego.io/zh/docs/eino/core_modules/eino_adk/agent_hitl/
chatmodelagentmiddleware
https://www.cloudwego.io/zh/docs/eino/core_modules/eino_adk/eino_adk_chatmodelagentmiddleware/
Agent Callback
此功能为 ADK Agent 添加了回调(Callback)支持,类似于 compose 包中的回调机制。通过回调,用户可以观测 Agent 的执行生命周期,实现日志记录、追踪、监控等功能。
💡 提示:cozeloop 的 adk trace 版本见 https://github.com/cloudwego/eino-ext/releases/tag/callbacks%2Fcozeloop%2Fv0.2.0
务必同时使用支持 v0.8 的 trace callback handler 实现,才能正常使用 Agent trace 功能
概述
ADK Agent Callback 机制与 Eino compose 中的回调系统共享相同的基础设施:
-
使用相同的
callbacks.Handler接口 -
使用相同的
callbacks.RunInfo结构 -
可以与其他组件回调(如 ChatModel、Tool 等)组合使用
💡 通过 Agent Callback,你可以在 Agent 执行的关键节点介入,实现 tracing、logging、metrics 等可观测性能力。本能力在 v0.8.0 版本引入。
核心类型
ComponentOfAgent
组件类型标识符,用于在回调中识别 Agent 相关事件:
const ComponentOfAgent components.Component = "Agent"
在 callbacks.RunInfo.Component 中使用,用于过滤仅与 Agent 相关的回调事件。
AgentCallbackInput
Agent 回调的输入类型,在 OnStart 回调中传递:
type AgentCallbackInput struct {
// Input 包含新运行的 Agent 输入。恢复执行时为 nil。
Input *AgentInput
// ResumeInfo 包含从中断恢复时的信息。新运行时为 nil。
ResumeInfo *ResumeInfo
}
|
调用方式 |
字段值 |
|
Agent.Run() |
Input 字段有值, ResumeInfo 为 nil |
|
Agent.Resume() |
ResumeInfo 字段有值, Input 为 nil |
AgentCallbackOutput
Agent 回调的输出类型,在 OnEnd 回调中传递:
type AgentCallbackOutput struct {
// Events 提供 Agent 事件流。每个 handler 接收独立的副本。
Events *AsyncIterator[*AgentEvent]
}
💡 重要:
Events迭代器应异步消费,以避免阻塞 Agent 执行。每个回调 handler 接收独立的事件流副本,互不干扰。
API 使用
WithCallbacks
添加回调 handler 以接收 Agent 生命周期事件的运行选项:
func WithCallbacks(handlers ...callbacks.Handler) AgentRunOption
类型转换函数
将通用回调类型转换为 Agent 专用类型:
// 转换输入类型
func ConvAgentCallbackInput(input callbacks.CallbackInput) *AgentCallbackInput
// 转换输出类型
func ConvAgentCallbackOutput(output callbacks.CallbackOutput) *AgentCallbackOutput
如果类型不匹配,函数返回 nil。
使用示例
方式一:使用 HandlerBuilder
使用 callbacks.NewHandlerBuilder() 构建通用的 callback handler:
import (
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/callbacks"
)
handler := callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
if info.Component == adk.ComponentOfAgent {
agentInput := adk.ConvAgentCallbackInput(input)
if agentInput.Input != nil {
fmt.Printf("Agent %s started with new run\n", info.Name)
} else {
fmt.Printf("Agent %s resumed from interrupt\n", info.Name)
}
}
return ctx
}).
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
if info.Component == adk.ComponentOfAgent {
agentOutput := adk.ConvAgentCallbackOutput(output)
// 异步消费事件流
go func() {
for {
event, ok := agentOutput.Events.Next()
if !ok {
break
}
// 处理事件...
fmt.Printf("Event from %s: %+v\n", event.AgentName, event)
}
}()
}
return ctx
}).
Build()
// 创建 Runner - 必须通过 Runner 执行 Agent,callback 才会生效
runner := adk.NewRunner(ctx, adk.RunnerConfig{
Agent: agent,
EnableStreaming: input.EnableStreaming,
})
iter := runner.Run(ctx, input.Messages, adk.WithCallbacks(handler))
💡 重要提示:上面的示例展示了正确的使用方式。必须通过 Runner 执行 Agent,AgentCallback 才会生效。直接使用
agent.Run()时,callback 不会被触发。
方式二:使用 HandlerHelper(推荐)
使用 template.HandlerHelper 可以更方便地处理类型转换:
import (
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/callbacks"
template "github.com/cloudwego/eino/utils/callbacks"
)
helper := template.NewHandlerHelper().
Agent(&template.AgentCallbackHandler{
OnStart: func(ctx context.Context, info *callbacks.RunInfo, input *adk.AgentCallbackInput) context.Context {
if input.Input != nil {
fmt.Printf("Agent %s started with input\n", info.Name)
} else {
fmt.Printf("Agent %s resumed\n", info.Name)
}
return ctx
},
OnEnd: func(ctx context.Context, info *callbacks.RunInfo, output *adk.AgentCallbackOutput) context.Context {
// 异步消费事件
go func() {
for {
event, ok := output.Events.Next()
if !ok {
break
}
// 处理事件...
}
}()
return ctx
},
}).
Handler()
// 创建 Runner - 必须通过 Runner 执行 Agent,callback 才会生效
runner := adk.NewRunner(ctx, adk.RunnerConfig{
Agent: agent,
EnableStreaming: input.EnableStreaming,
})
iter := runner.Run(ctx, input.Messages, adk.WithCallbacks(helper))
💡 重要提示:必须通过 Runner 执行 Agent,AgentCallback 才会生效。直接使用
agent.Run()时,callback 不会被触发。
💡
HandlerHelper会自动进行类型转换,代码更简洁。同时支持组合多种组件的回调处理器。
Tracing 场景应用
💡 重要提示:AgentCallback 必须通过 Runner 来执行才会生效。直接使用 Agent.Run() 时,callback 不会被触发,因为 callback 机制是在 flowAgent 层面实现的。请使用 adk.NewRunner() 创建 Runner 后,通过 Runner.Run() 或 Runner.Query() 来执行 Agent。
Agent Callback 最常见的应用场景是实现分布式追踪(Tracing)。以下是使用 OpenTelemetry 实现 tracing 的示例:
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/callbacks"
)
// 创建 Agent(以 ChatModelAgent 为例)
agent, _ := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
Name: "my_agent",
Description: "A helpful assistant",
Model: chatModel,
})
// 创建 Runner - 必须通过 Runner 执行 Agent,callback 才会生效
runner := adk.NewRunner(ctx, adk.RunnerConfig{
Agent: agent,
EnableStreaming: true,
})
tracer := otel.Tracer("my-agent-tracer")
handler := callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
// 创建 span
ctx, span := tracer.Start(ctx, info.Name,
trace.WithAttributes(
attribute.String("component", string(info.Component)),
attribute.String("type", info.Type),
))
// Agent 特定的属性
if info.Component == adk.ComponentOfAgent {
agentInput := adk.ConvAgentCallbackInput(input)
if agentInput != nil && agentInput.Input != nil {
span.SetAttributes(attribute.Bool("is_new_run", true))
} else {
span.SetAttributes(attribute.Bool("is_resume", true))
}
}
return ctx
}).
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
span := trace.SpanFromContext(ctx)
span.End()
return ctx
}).
OnErrorFn(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
span := trace.SpanFromContext(ctx)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
span.End()
return ctx
}).
Build()
// 使用 Runner 执行 Agent,并传入 callback handler
iter := runner.Query(ctx, "Hello, agent!", adk.WithCallbacks(handler))
// 处理事件流
for {
event, ok := iter.Next()
if !ok {
break
}
if event.Err != nil {
log.Error(event.Err)
break
}
// 处理事件...
}
💡 再次提醒:必须通过 Runner 执行 Agent,callback 才会生效。直接使用
agent.Run()时,即使传入了adk.WithCallbacks(handler),Agent 级别的 callback 也不会被触发。
💡 提示:cozeloop 的 adk trace 版本见 https://github.com/cloudwego/eino-ext/releases/tag/callbacks%2Fcozeloop%2Fv0.2.0
Agent 类型标识
内置 Agent 实现了 components.Typer 接口,返回其类型标识,该信息会填充到 callbacks.RunInfo.Type 字段中:
|
Agent 类型 |
GetType() 返回值 |
|
ChatModelAgent |
"ChatModel" |
|
workflowAgent (Sequential) |
"Sequential" |
|
workflowAgent (Parallel) |
"Parallel" |
|
workflowAgent (Loop) |
"Loop" |
|
DeterministicTransfer Agent |
"DeterministicTransfer" |
回调行为说明
回调调用时机
|
Run 方法1. 初始化回调上下文2. 处理输入3. 调用 OnStart 4. 执行 Agent 逻辑5. 注册 OnEnd (在事件流创建时) |
Resume 方法1. 构建 ResumeInfo2. 初始化回调上下文3. 调用 OnStart 4. 恢复 Agent 执行5. 注册 OnEnd (在事件流创建时) |
OnEnd 调用时机
OnEnd 回调在迭代器创建时注册,而非在生成器关闭时。这允许 handler 在事件流式传输时消费事件。
注意事项
1. 异步消费事件流
回调 handler 中的 AgentCallbackOutput.Events 必须异步消费,否则会阻塞 Agent 执行:
// ✅ 正确
OnEnd: func(ctx context.Context, info *callbacks.RunInfo, output *adk.AgentCallbackOutput) context.Context {
go func() {
for {
event, ok := output.Events.Next()
if !ok {
break
}
// 处理事件
}
}()
return ctx
}
// ❌ 错误 - 会导致死锁
OnEnd: func(ctx context.Context, info *callbacks.RunInfo, output *adk.AgentCallbackOutput) context.Context {
for {
event, ok := output.Events.Next()
if !ok {
break
}
// 处理事件
}
return ctx
}
2. 无 OnError 回调
由于 Agent.Run() 和 Agent.Resume() 方法签名不返回 error,Agent 回调不支持 OnError。错误信息通过 AgentEvent.Err 字段在事件流中传递。
3. 事件流复制机制
当有多个回调 handler 时,每个 handler 接收独立的事件流副本,互不干扰。最后一个 handler 接收原始事件以减少内存分配。
3.5 einodev ==toolchain
🚀 Eino 是 Go AI 集成组件的研发框架,提供了 AI 应用相关的常用组件以及集成组件编排能力,为了更好的辅助开发者使用 Eino,我们提供了 「Eino Dev 插件」 ,现在就安装插件 ( EinoDev 插件安装指南),助你高效开发
背景 & 简介
Eino 是 Go AI 集成组件的研发框架,提供常用的 AI 组件以及集成组件编排能力。为了更好的辅助开发者使用 Eino,我们提供了「Eino Dev」插件,助力 AI 应用高效开发 🚀。
如何安装
版本安装依赖
|
Plugin Version |
GoLand IDE Version |
VS Code Version |
eino-ext/devops Version |
|
1.1.0 |
2023.2+ |
1.97.x |
0.1.0 |
|
1.0.7 |
2023.2+ |
- |
0.1.0 |
|
1.0.6 |
2023.2+ |
- |
0.1.0 |
|
1.0.5 |
2023.2+ |
- |
0.1.0 |
|
1.0.4 |
2023.2+ |
- |
0.1.0 |
Plugin Version:插件版本信息
Goland IDE Version: Goland IDE 可支持的最小版本
VS Code Version: VS Code 可支持的最小版本
Eino-Ext/devops Version: eino-ext/devops 调试模块对应的合适版本
安装
GoLand
|
|
VS Code
-
在 VS Code 中点击「Extension 图标」,进入插件市场,搜索 Eino Dev,安装即可
功能简介
💡 插件安装完毕 ✅,接下来就可以体验插件提供的调试与编排能力了 ~
Graph 编排
详情 👉:Eino Dev 可视化编排插件功能指南
Graph 调试
详情 👉:Eino Dev 可视化调试插件功能指南
Eino Dev 可视化编排插件功能指南
简介
💡 Goland 提供的 Eino 可视化编排插件, 在 GoLand 中可以通过组件拖拽实现 Graph 的编排生成代码,并支持导入导出
初认插件
插件功能介绍
编排组件介绍
图 ( Graph )
-
与 Eino 中的 Graph 概念一致,指最终由插件侧生成的 Graph,可在以下界面添加 Graph。
-
点击添加插件,则弹出创建对话框,根据字段说明补充配置信息,即可生成一个 Graph 编排对象。
节点 ( Node )
-
与 Eino 中的 Node 一致,创建 Graph 完成后,通过界面右上角 AddNodes ,添加不同类型 Node 到画布。
-
添加到 Graph 中 Node 插件会默认填写 NodeKey ,此外可展开 More config 为 Node 配置可选配置。
组件 ( Component )
-
Component 是组成 Node 的必要信息,不同的 Component 对应不同的 Node 类型,并且提供了内置的官方 Official Components 与 Custom Components 。
-
完成添加 Node 操作后,可按需配置组件的 Runtime Config 信息。
插槽 ( Slot )
-
不同类型的 Component 的生成会依赖其他组件,将其作为自身配置依赖的一部分,这部分依赖被称作插槽( Slot )。
-
比如官方提供的 volc_vikingDB 组件,其依赖了 Embedding Component 作为插槽;再比如官方提供的 ToolsNode 组件,其依赖了多个 Tool Component。
开始编排
初始化插件
点击进入 Eino Dev 插件,会展示如下界面,可点击图中圈选框进入编排。
创建并编排 Graph
-
界面左下角新增 Graph,在弹窗对话框填写 Graph 相关配置,生成 Graph 画布。
-
按需从 AddNodes 选择合适的 Node 组件,添加的画布。
-
依据业务编排逻辑将 Node 组件连接,完成 Graph 业务编排逻辑。
-
点击 “Generate as code” 并选择合适文件夹,将编排的 Graph 生成代码并保存到指定路径。
-
特别的当添加的 Component 为 Graph 类型时,添加的 嵌套 Graph 可展开做 Node 组件的配置,配置完成后,通过顶层面包屑路径跳回首页界面。
Eino Dev 可视化调试插件功能指南
简介
💡 使用该插件可以对使用 Eino 框架编写的编排产物(Graph,Chain)进行可视化调试,包括:
编排产物可视化渲染;
从可操作的任意节点开始,mock 输入进行调试。
快速开始
下载 eino-example
# HTTPS
git clone https://github.com/cloudwego/eino-examples.git
# SSH
git clone git@github.com:cloudwego/eino-examples.git
安装依赖
在项目目录下依次执行以下指令
# 1. Pull latest devops repository
go get github.com/cloudwego/eino-ext/devops@latest
# 2. Cleans and updates go.mod and go.sum
go mod tidy
运行 Demo
进入 eino-examples/devops/debug/main.go,运行 main.go。因为插件会同时在本地启动一个 HTTP 服务用于连接用户服务进程,所以会弹出接入网络警告,点击允许。
配置调试地址
| 1.点击左侧或正中间调试功能进入调试配置 | 2.点击配置调试地址 |
| 3.填入 127.0.0.1:52538 | 4.点击确认进入调试界面,选择要调试的Graph |
开始调试
| 1.点击「Test Run」从 start 节点开始执行 | 2.输入 "hello eino",点击确认 |
| 3.在调试区域展示有各个节点的输入和输出 | 4.点击 Input 和 Output 切换查看节点信息 |
功能一览
本地或远程调试
目标调试编排产物无论是运行在本地电脑还是在远程服务器,都可以通过配置 IP:Port ,主动连接到目标调试对象所在的服务器。
编排拓扑可视化
支持 Graph 和 Chain 编排拓扑可视化。
从任意节点开始调试
查看节点执行结果
每个节点执行结果都会按执行顺序展示在调试区域,包括:输入、输出、执行耗时
从零开始调试
使用 Eino 进行编排
插件支持对 Graph 和 Chain 的编排产物进行调试,假设你已经有编排代码如下
func RegisterSimpleGraph(ctx context.Context) {
g := compose.NewGraph[string, string]()
_ = g.AddLambdaNode("node_1", compose.InvokableLambda(func(ctx context.Context, input string) (output string, err error) {
return input + " process by node_1,", nil
}))
_ = g.AddLambdaNode("node_2", compose.InvokableLambda(func(ctx context.Context, input string) (output string, err error) {
return input + " process by node_2,", nil
}))
_ = g.AddLambdaNode("node_3", compose.InvokableLambda(func(ctx context.Context, input string) (output string, err error) {
return input + " process by node_3,", nil
}))
_ = g.AddEdge(compose.START, "node_1")
_ = g.AddEdge("node_1", "node_2")
_ = g.AddEdge("node_2", "node_3")
_ = g.AddEdge("node_3", compose.END)
_, err := g.Compile(ctx)
if err != nil {
logs.Errorf("compile graph failed, err=%v", err)
return
}
}
安装依赖
在项目目录下依次执行以下指令
# 1. Pull latest devops repository
go get github.com/cloudwego/eino-ext/devops@latest
# 2. Cleans and updates go.mod and go.sum
go mod tidy
调用调试初始化函数
因为调试需要在用户主进程中启动一个 HTTP 服务,以用作与本地调试插件交互,所以用户需要主动调用一次 github.com/cloudwego/eino-ext/devops 中的 Init() 来启动调试服务。
💡 注意事项
确保目标调试的编排产物至少执行过一次
Compile()。
devops.Init()的执行必须要在调用Compile()之前。用户需要保证
devops.Init()执行后主进程不能退出。
如在 main() 函数中增加调试服务启动代码
// 1.调用调试服务初始化函数
err := devops.Init(ctx)
if err != nil {
logs.Errorf("[eino dev] init failed, err=%v", err)
return
}
// 2.编译目标调试的编排产物
RegisterSimpleGraph(ctx)
运行用户进程
在本地电脑或者远程环境中运行你的进程,并保证主进程不会退出。
在 github.com/cloudwego/eino-examples/devops/debug/main.go 中,main() 代码如下
func main() {
ctx := context.Background()
// Init eino devops server
err := devops.Init(ctx)
if err != nil {
logs.Errorf("[eino dev] init failed, err=%v", err)
return
}
// Register chain, graph and state_graph for demo use
chain.RegisterSimpleChain(ctx)
graph.RegisterSimpleGraph(ctx)
graph.RegisterSimpleStateGraph(ctx)
// Blocking process exits
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
// Exit
logs.Infof("[eino dev] shutting down\n")
}
配置调试地址
-
IP:用户进程所在服务器的 IP 地址。
-
用户进程运行在本地电脑,则填写
127.0.0.1; -
用户进程运行在远程服务器上,则填写远程服务器的 IP 地址,兼容 IPv4 和 IPv6 。
-
-
Port:调试服务监听的端口,默认是
52538,可通过 「WithDevServerPort」 这一 option 方法进行修改
💡 注意事项
本地电脑调试:系统可能会弹出网络接入警告,允许接入即可。
远程服务器调试:需要你保证端口可访问。
IP 和 Port 配置完成后,点击确认,调试插件会自动连接到目标调试服务器。如果成功连接,连接状态指示器会变成绿色。
选择目标调试编排产物
确保你目标调试的编排产物至少执行过一次 Compile()。因为调试设计是面向编排产物实例,所以如果多次执行 Compile(),会在调试服务中注册多个编排产物,继而在选择列表中看到多个可调试目标。
开始调试
调试支持从任意节点开始调试,包括 start 节点和其他中间节点。
-
从 START 节点开始调试:直接点击 「Test Run」,然后输入 mock 的 input(如果 input 是复杂结构的话,会自动对 input 的结构进行推断)然后点击确定,开始执行你的 graph,每个 node 的结果会在下方显示。
-
从任意的可操作节点开始调试:比如,从第二个节点开始执行。
查看执行结果
从 START 节点开始调试,点击 Test Run 后,在插件下方查看调试结果。
从任意的可操作节点进行调试,在插件下方查看调试结果。
高阶功能
指定 interface 字段的实现类型
对于 interface 类型的字段,会被默认渲染为 {} 。在 {} 中输入空格可唤出 interface 实现类型的列表,选中某个类型后,系统会生成一个特殊的结构体以表达 interface 的信息;该特殊结构体定义如下:
{
"_value": {} // 按具体类型生成的 json value
"_eino_go_type": "*model.MyConcreteType" // Go 类型名
}
💡 系统内已经内置了一些常见的 interface 类型,如
string、schema.Message等,可直接选择使用。如果需要自定义 interface 实现类型,可通过devops提供的AppendType方法进行注册。
-
假设你已经有编排代码如下,其中,graph 的输入定义为
any,node_1的输入定义为*NodeInfo;type NodeInfo struct { Message string } func RegisterGraphOfInterfaceType(ctx context.Context) { // Define a graph that input parameter is any. g := compose.NewGraph[any, string]() _ = g.AddLambdaNode("node_1", compose.InvokableLambda(func(ctx context.Context, input *NodeInfo) (output string, err error) { if input == nil { return "", nil } return input.Message + " process by node_1,", nil })) _ = g.AddLambdaNode("node_2", compose.InvokableLambda(func(ctx context.Context, input string) (output string, err error) { return input + " process by node_2,", nil })) _ = g.AddLambdaNode("node_3", compose.InvokableLambda(func(ctx context.Context, input string) (output string, err error) { return input + " process by node_3,", nil })) _ = g.AddEdge(compose._START_, "node_1") _ = g.AddEdge("node_1", "node_2") _ = g.AddEdge("node_2", "node_3") _ = g.AddEdge("node_3", compose._END_) r, err := g.Compile(ctx) if err != nil { logs.Errorf("compile graph failed, err=%v", err) return } } -
调试前,通过
AppendType方法在Init()时注册自定义的*NodeInfo类型:err := devops.Init(ctx, devops.AppendType(&graph.NodeInfo{})) -
调试过程中,在 Test Run 的 Json 输入框中,对于 interface 类型的字段,默认会呈现为
{}。可以通过在{}中键入一个空格,来查看所有内置的以及自定义注册的数据类型,并选择该 interface 的具体实现类型。
-
在
_value字段中补全调试节点输入。
-
点击确认,查看调试结果。
map[string]any 调试
这里再解释下输入类型为 map[string]any 时如何调试;如果某个节点的输入类型为 map[string]any,如下所示:
func RegisterAnyInputGraph(ctx context.Context) {
g := compose.NewGraph[map[string]any, string]()
_ = g.AddLambdaNode("node_1", compose.InvokableLambda(func(ctx context.Context, input map[string]any) (output string, err error) {
for k, v := range input {
switch v.(type) {
case string:
output += k + ":" + v.(string) + ","
case int:
output += k + ":" + fmt.Sprintf("%d", v.(int))
default:
return "", fmt.Errorf("unsupported type: %T", v)
}
}
return output, nil
}))
_ = g.AddLambdaNode("node_2", compose.InvokableLambda(func(ctx context.Context, input string) (output string, err error) {
return input + " process by node_2,", nil
}))
_ = g.AddEdge(compose.START, "node_1")
_ = g.AddEdge("node_1", "node_2")
_ = g.AddEdge("node_2", compose.END)
r, err := g.Compile(ctx)
if err != nil {
logs.Errorf("compile graph failed, err=%v", err)
return
}
message, err := r.Invoke(ctx, map[string]any{"name": "bob", "score": 100})
if err != nil {
logs.Errorf("invoke graph failed, err=%v", err)
return
}
logs.Infof("eino any input graph output is: %v", message)
}
调试过程中,在 Test Run 的 Json 输入框中,你需要输入以下格式的内容:
{
"name": {
"_value": "alice",
"_eino_go_type": "string"
},
"score": {
"_value": "99",
"_eino_go_type": "int"
}
}
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐












































































































所有评论(0)