[利用LangGraph SDK调用部署的Agent-05]Run: Agent逻辑在特定任务下的“单次运算实例”
Run代表针对Assistant的一次调用,这样的调用可以绑定在一个Thread上,称为一轮对话中的一次问答,也可以是完全完全无状态的一次独立调用。RunsClient提供了如下三种调用方式:
- stream:实时返回Agent的执行过程(如节点变化、Token 输出);
- wait:采用同步阻塞的执行方式,一直等到任务完成后返回结果;
- create (异步创建):以异步形式创建一个后台任务来完成调用;
1. 流式调用
RunsClient定义如下三个重载的stream方法利用创建的Run对象支持流式调用,其中前两个重载方法分别对应有无Thread绑定的两种调用方式。
class RunsClient:
@overload
def stream(
self,
thread_id: str,
assistant_id: str,
*,
input: Input | None = None,
command: Command | None = None,
stream_mode: StreamMode | Sequence[StreamMode] = "values",
stream_subgraphs: bool = False,
stream_resumable: bool = False,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
feedback_keys: Sequence[str] | None = None,
on_disconnect: DisconnectMode | None = None,
webhook: str | None = None,
multitask_strategy: MultitaskStrategy | None = None,
if_not_exists: IfNotExists | None = None,
after_seconds: int | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
on_run_created: Callable[[RunCreateMetadata], None] | None = None,
) -> AsyncIterator[StreamPart]: ...
@overload
def stream(
self,
thread_id: None,
assistant_id: str,
*,
input: Input | None = None,
command: Command | None = None,
stream_mode: StreamMode | Sequence[StreamMode] = "values",
stream_subgraphs: bool = False,
stream_resumable: bool = False,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
feedback_keys: Sequence[str] | None = None,
on_disconnect: DisconnectMode | None = None,
on_completion: OnCompletionBehavior | None = None,
if_not_exists: IfNotExists | None = None,
webhook: str | None = None,
after_seconds: int | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
on_run_created: Callable[[RunCreateMetadata], None] | None = None,
) -> AsyncIterator[StreamPart]: ...
def stream(
self,
thread_id: str | None,
assistant_id: str,
*,
input: Input | None = None,
command: Command | None = None,
stream_mode: StreamMode | Sequence[StreamMode] = "values",
stream_subgraphs: bool = False,
stream_resumable: bool = False,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None,
checkpoint_during: bool | None = None, # deprecated
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
feedback_keys: Sequence[str] | None = None,
on_disconnect: DisconnectMode | None = None,
on_completion: OnCompletionBehavior | None = None,
webhook: str | None = None,
multitask_strategy: MultitaskStrategy | None = None,
if_not_exists: IfNotExists | None = None,
after_seconds: int | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
on_run_created: Callable[[RunCreateMetadata], None] | None = None,
durability: Durability | None = None,
) -> AsyncIterator[StreamPart]
class StreamPart(NamedTuple):
event: str
data: dict
id: str | None = None
StreamMode = Literal[
"values",
"messages",
"updates",
"events",
"tasks",
"checkpoints",
"debug",
"custom",
"messages-tuple",
]
DisconnectMode = Literal["cancel", "continue"]
OnCompletionBehavior = Literal["delete", "keep"]
MultitaskStrategy = Literal["reject", "interrupt", "rollback", "enqueue"]
IfNotExists = Literal["create", "reject"]
class RunCreateMetadata(TypedDict):
run_id: str
thread_id: str | None
stream方法对应CompiledStateGraph的astream方法,RunsClient的stream方法可以理解为以远程的方式调用CompiledStateGraph的astream方法(本质上是Pregel的astream方法)。stream方法的参数非常多,下面是一些重要参数的说明:
- thread_id:绑定的Thread的ID,如果为None表示不绑定Thread。
Pregel的astream方法会将其放到RunnableConfig中; - assistant_id:作为调用目标的Assistant对象的ID。
Pregel的astream方法利用它提取绑定的配置; - input:输入数据,对应
Pregel的astream方法的同名参数; - command:在进行恢复调用时使用此对象,服务端用它来创建
langgraph.types.Command对象,并作为Pregel的astream方法的同名参数; - stream_mode:流模式,对应
Pregel的astream方法的同名参数。具体模式的区别可以参阅我们的文章“通过一个实例理解LangChain的所有的流模式” - stream_subgraphs:是否要接收SubGraph的事件,默认值为False表示不接收,对应
Pregel的astream方法的subgraphs参数; - stream_resumable:是否允许流式输出在连接中断后从断点继续,而不是从头开始或直接丢失数据;
- metadata:调用相关的任意元数据,
Pregel的astream方法会将其放到RunnableConfig中; - config:配置项,Agent Server将它合并到
RunnableConfig中; - context:静态上下文,对应
Pregel的astream方法的同名参数; - checkpoint/checkpoint_id:从特定的历史快照启动,
Pregel的astream方法会将其放到RunnableConfig中; - interrupt_before:在执行之前触发中断的节点列表,对应
Pregel的astream方法的同名参数; - interrupt_after:在执行之后触发中断的节点列表,对应
Pregel的astream方法的同名参数; - feedback_keys: 指定允许该运行任务接收哪些类型的反馈标签;
- on_disconnect:指定在连接断开时的处理方式
- “cancel”:取消正在运行的任务;
- “continue”:保持任务继续运行;
- on_completion:指定在调用完成时的处理方式
- “delete”:删除调用相关的所有数据;
- “keep”:保留调用相关的所有数据;
- webhook:指定一个URL,当调用完成时向该URL发送一个POST请求,包含调用的结果数据;
- multitask_strategy:指定并发任务的处理策略:
- “reject”:拒绝新的任务请求,直到当前任务完成;
- “interrupt”:中断当前正在运行的任务,立即开始新的任务;
- “rollback”:回滚当前正在运行的任务到上一个
Checkpoint,然后开始新的任务; - “enqueue”:将新的任务请求加入队列,等待当前任务完成后按顺序执行;
- if_not_exists:指定在调用的Thread不存在时的处理方式
- “create”:创建一个新的Thread来绑定调用;
- “reject”:拒绝调用请求;
- after_seconds:延迟执行的等待时间;
- on_run_created:一个可调用对象,在调用创建时被调用,接受一个
RunCreateMetadata对象作为参数;- - durability:指定调用的持久化方式,对应
Pregel的astream方法的同名参数;
对于RunsClient的stream方法对应的API请求,根据是否绑定Thread,请求方法和路径分别为:
- 绑定Thread:POST /threads/{thread_id}/runs/stream
- 不绑定Thread:POST /runs/stream
在如下的演示程序中,我们调用RunsClient的stream方法来调用test_agent默认Assistant对象。我们指定了出events(输出的内容太多)之外的所有流模式。我们将收集到的事件按照事件类型进行分类,并在最后按照事件类型输出每类事件的内容。
from langgraph_sdk import get_client
from langgraph_sdk.schema import StreamPart
import asyncio
async def main():
async with get_client(url="http://localhost:2024") as client:
events: dict[str, list[StreamPart]] = {}
async for part in client.runs.stream(
thread_id=None,
assistant_id="test_agent",
stream_mode=[
"values",
"messages",
"updates",
"tasks",
"checkpoints",
"debug",
"custom",
"messages-tuple",
],
input={
"foo": None,
"bar": "checked",
"baz": None,
},
):
events[part.event] = events.get(part.event, []) + [part]
if part.data.get("status") == "run_done":
break
for event, parts in events.items():
print(f"\n{event}:")
for part in parts:
print(part.data)
asyncio.run(main())
输出:
metadata:
{'run_id': '019dd289-8b20-7ff0-be7d-f94da9c280fc', 'attempt': 1}
values:
{'foo': None, 'bar': 'checked', 'baz': None}
{'foo': 'checked', 'bar': 'checked', 'baz': None}
{'foo': 'checked', 'bar': 'checked', 'baz': 'checked'}
tasks:
{'id': 'e9512746-2886-5438-bcf8-5e31aa68ace7', 'name': 'foo', 'input': {'foo': None, 'bar': 'checked', 'baz': None}, 'triggers': ['branch:to:foo']}
{'id': 'e9512746-2886-5438-bcf8-5e31aa68ace7', 'name': 'foo', 'error': None, 'result': {'foo': 'checked'}, 'interrupts': []}
{'id': '6cc10677-b7c1-a6b7-72f5-c8795e245421', 'name': 'bar', 'input': {'foo': 'checked', 'bar': 'checked', 'baz': None}, 'triggers': ['branch:to:bar']}
{'id': '6cc10677-b7c1-a6b7-72f5-c8795e245421', 'name': 'bar', 'error': None, 'result': {}, 'interrupts': []}
{'id': 'fef044cc-e53c-d40b-d6e7-c6f10b20d7ab', 'name': 'baz', 'input': {'foo': 'checked', 'bar': 'checked', 'baz': None}, 'triggers': ['branch:to:baz']}
{'id': 'fef044cc-e53c-d40b-d6e7-c6f10b20d7ab', 'name': 'baz', 'error': None, 'result': {'baz': 'checked'}, 'interrupts': []}
debug:
{'step': 1, 'timestamp': '2026-04-28T05:22:03.640827+00:00', 'type': 'task', 'payload': {'id': 'e9512746-2886-5438-bcf8-5e31aa68ace7', 'name': 'foo', 'input': {'foo': None, 'bar': 'checked', 'baz': None}, 'triggers': ['branch:to:foo']}}
{'step': 1, 'timestamp': '2026-04-28T05:22:03.646046+00:00', 'type': 'task_result', 'payload': {'id': 'e9512746-2886-5438-bcf8-5e31aa68ace7', 'name': 'foo', 'error': None, 'result': {'foo': 'checked'}, 'interrupts': []}}
{'step': 2, 'timestamp': '2026-04-28T05:22:03.648935+00:00', 'type': 'task', 'payload': {'id': '6cc10677-b7c1-a6b7-72f5-c8795e245421', 'name': 'bar', 'input': {'foo': 'checked', 'bar': 'checked', 'baz': None}, 'triggers': ['branch:to:bar']}}
{'step': 2, 'timestamp': '2026-04-28T05:22:03.652776+00:00', 'type': 'task_result', 'payload': {'id': '6cc10677-b7c1-a6b7-72f5-c8795e245421', 'name': 'bar', 'error': None, 'result': {}, 'interrupts': []}}
{'step': 3, 'timestamp': '2026-04-28T05:22:03.655148+00:00', 'type': 'task', 'payload': {'id': 'fef044cc-e53c-d40b-d6e7-c6f10b20d7ab', 'name': 'baz', 'input': {'foo': 'checked', 'bar': 'checked', 'baz': None}, 'triggers': ['branch:to:baz']}}
{'step': 3, 'timestamp': '2026-04-28T05:22:03.658642+00:00', 'type': 'task_result', 'payload': {'id': 'fef044cc-e53c-d40b-d6e7-c6f10b20d7ab', 'name': 'baz', 'error': None, 'result': {'baz': 'checked'}, 'interrupts': []}}
updates:
{'foo': {'foo': 'checked'}}
{'bar': None}
{'baz': {'baz': 'checked'}}
2. 同步阻塞调用
如果说RunsClient的stream方法利用创建的Run对象实现了远程调用CompiledStateGraph的astream方法,那么RunsClient的wait方法就可以理解为利用创建的Run对象远程调用CompiledStateGraph(Pregel)的ainvoke方法。与stream方法类似,wait方法也定义了三个重载来支持有无Thread绑定的两种调用方式。wait方法的参数与stream方法基本相同,区别在于wait方法没有stream_mode、stream_subgraphs和stream_resumable这三个参数(因为它不是流式调用了),而是增加了一个raise_error参数来指定在调用过程中如果发生错误是否要抛出异常。
class RunsClient:
@overload
async def wait(
self,
thread_id: str,
assistant_id: str,
*,
input: Input | None = None,
command: Command | None = None,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
webhook: str | None = None,
on_disconnect: DisconnectMode | None = None,
multitask_strategy: MultitaskStrategy | None = None,
if_not_exists: IfNotExists | None = None,
after_seconds: int | None = None,
raise_error: bool = True,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
on_run_created: Callable[[RunCreateMetadata], None] | None = None,
) -> list[dict] | dict[str, Any]: ...
@overload
async def wait(
self,
thread_id: None,
assistant_id: str,
*,
input: Input | None = None,
command: Command | None = None,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
webhook: str | None = None,
on_disconnect: DisconnectMode | None = None,
on_completion: OnCompletionBehavior | None = None,
if_not_exists: IfNotExists | None = None,
after_seconds: int | None = None,
raise_error: bool = True,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
on_run_created: Callable[[RunCreateMetadata], None] | None = None,
) -> list[dict] | dict[str, Any]: ...
async def wait(
self,
thread_id: str | None,
assistant_id: str,
*,
input: Input | None = None,
command: Command | None = None,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None,
checkpoint_during: bool | None = None, # deprecated
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
webhook: str | None = None,
on_disconnect: DisconnectMode | None = None,
on_completion: OnCompletionBehavior | None = None,
multitask_strategy: MultitaskStrategy | None = None,
if_not_exists: IfNotExists | None = None,
after_seconds: int | None = None,
raise_error: bool = True,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
on_run_created: Callable[[RunCreateMetadata], None] | None = None,
durability: Durability | None = None,
) -> list[dict] | dict[str, Any]
对于RunsClient的wait方法对应的API请求,根据是否绑定Thread,会分别采用如下的请求方法和路径。调用之后的状态会JSON格式的响应体中返回。
- 绑定Thread:POST /threads/{thread_id}/runs/wait
- 不绑定Thread:POST /runs/wait
3. 异步创建后台调用任务
与RunsClient的stream和wait方法直接执行创建的Run对象不同,create方法会直接返回的Run对象。RunsClient同样定义三个重载的create方法来支持有无Thread绑定的两种调用方式。create方法创建的Run方法采用流的方式调用Agent,所以它与stream方法的参数基本一致。
class RunsClient:
@overload
async def create(
self,
thread_id: None,
assistant_id: str,
*,
input: Input | None = None,
command: Command | None = None,
stream_mode: StreamMode | Sequence[StreamMode] = "values",
stream_subgraphs: bool = False,
stream_resumable: bool = False,
metadata: Mapping[str, Any] | None = None,
checkpoint_during: bool | None = None,
config: Config | None = None,
context: Context | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
webhook: str | None = None,
on_completion: OnCompletionBehavior | None = None,
if_not_exists: IfNotExists | None = None,
after_seconds: int | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
on_run_created: Callable[[RunCreateMetadata], None] | None = None,
) -> Run: ...
@overload
async def create(
self,
thread_id: str,
assistant_id: str,
*,
input: Input | None = None,
command: Command | None = None,
stream_mode: StreamMode | Sequence[StreamMode] = "values",
stream_subgraphs: bool = False,
stream_resumable: bool = False,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
webhook: str | None = None,
multitask_strategy: MultitaskStrategy | None = None,
if_not_exists: IfNotExists | None = None,
after_seconds: int | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
on_run_created: Callable[[RunCreateMetadata], None] | None = None,
) -> Run: ...
async def create(
self,
thread_id: str | None,
assistant_id: str,
*,
input: Input | None = None,
command: Command | None = None,
stream_mode: StreamMode | Sequence[StreamMode] = "values",
stream_subgraphs: bool = False,
stream_resumable: bool = False,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None,
checkpoint_during: bool | None = None, # deprecated
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
webhook: str | None = None,
multitask_strategy: MultitaskStrategy | None = None,
if_not_exists: IfNotExists | None = None,
on_completion: OnCompletionBehavior | None = None,
after_seconds: int | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
on_run_created: Callable[[RunCreateMetadata], None] | None = None,
durability: Durability | None = None,
) -> Run
class Run(TypedDict):
run_id: str
thread_id: str
assistant_id: str
created_at: datetime
updated_at: datetime
status: RunStatus
metadata: Json
multitask_strategy: MultitaskStrategy
RunStatus = Literal["pending", "running", "error", "success", "timeout", "interrupted"]
LangGraph SDK为Run定义了同名的类型,包含了Run的基本信息。可以看出Run的thread_id字段的类型为str而不是str|None,这意味着即使是通过不绑定Thread的方式创建的Run对象,系统也会生成一个UUID作为thread_id。这一个规则同样应用于前面利用stream和wait方法创建的Run对象。之所以这个thread_id是必须的,因为Run对象时基于thread_id进行存储的,所以针对Run对象提取和兼容都需要指定thread_id。
对于RunsClient的create方法对应的API请求,根据是否绑定Thread,会分别采用如下的请求方法和路径。创建的Run对象会以JSON格式的响应体返回。
- 绑定Thread:POST /threads/{thread_id}/runs
- 不绑定Thread:POST /runs
除了调用create方法创建一个Run对象完成单次调用之外,RunsClient还提供了一个create_batch方法,可以一次性创建多个Run对象来完成批量调用。create_batch方法接受一个RunCreate类型的列表作为参数,RunCreate类型包含了创建Run对象所需的所有信息。create_batch方法会返回一个Run对象的列表。该方法对应的API请求的方法和路径分别为POST和/runs/batch,创建的Run对象列表会以JSON格式的响应体返回。
class RunsClient:
async def create_batch(
self,
payloads: list[RunCreate],
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Run]
class RunCreate(TypedDict):
thread_id: str | None
assistant_id: str
input: dict | None
metadata: dict | None
config: Config | None
context: Context | None
checkpoint_id: str | None
interrupt_before: list[str] | None
interrupt_after: list[str] | None
webhook: str | None
multitask_strategy: MultitaskStrategy | None
下面的演示程序通过调用RunsClient的create方法来创建一个延迟5秒执行Run对象。我们在创建Run对象之后立即输出它的状态,然后分别在5秒和10秒之后获取Run对象的最新状态并输出。你会看到Run对象的状态是从pending变成success(由于Agent执行时间太短,无法捕捉到running状态)。上面介绍的自动为Run对象生成thread_id的规则也在这个程序中得到了体现。对于以这种方式创建的Run对象,在默认情况下,当执行完成后就被会删除,所以我们需要指定on_completion参数为"keep"来保留它。
from langgraph_sdk import get_client
import asyncio
async def main():
async with get_client(url="http://localhost:2024") as client:
run = await client.runs.create(
thread_id=None,
assistant_id="test_agent",
input={
"foo": None,
"bar": "checked",
"baz": None,
},
after_seconds=5,
on_completion= "keep"
)
thread_id = run["thread_id"]
print(f"initial status: {run['status']}")
await asyncio.sleep(5)
run = await client.runs.get(thread_id=thread_id, run_id=run["run_id"])
print(f"status after 5 seconds: {run['status']}")
await asyncio.sleep(6)
run = await client.runs.get(thread_id=thread_id, run_id=run["run_id"])
print(f"status after 10 seconds: {run['status']}")
asyncio.run(main())
输出:
initial status: pending
status after 5 seconds: pending
status after 10 seconds: success
4. Run检索、取消和删除
RunsClient提供了list方法根据照不同的条件来检索与指定Thread绑定的Run对象,如果知道run_id,我们可以调用get方法来获取对应Run对象的描述信息和最新状态。对于正在运行的Run对象,我们可以调用cancel方法来取消它。如果不再需要某个Run对象了,我们可以调用delete方法来删除它。我们在上面讨论过,Run对象是基于thread_id进行存储的,所以调用这些方法都需要指定thread_id。
class RunsClient:
async def list(
self,
thread_id: str,
*,
limit: int = 10,
offset: int = 0,
status: RunStatus | None = None,
select: list[RunSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Run]
async def get(
self,
thread_id: str,
run_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Run
async def cancel(
self,
thread_id: str,
run_id: str,
*,
wait: bool = False,
action: CancelAction = "interrupt",
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None
async def delete(
self,
thread_id: str,
run_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None
如所示的是这些方法对应的API请求方法和路径,其中list方法出thread_id之外的其他参数会置于请求URL的查询字符串中。
- list方法:GET /threads/{thread_id}/runs
- get方法:GET /threads/{thread_id}/runs/{run_id}
- cancel方法:POST /threads/{thread_id}/runs/{run_id}/cancel
- delete方法:DELETE /threads/{thread_id}/runs/{run_id}
5. Run对象的接入
对于通过RunsClient的create方法创建的Run对象,我们可以调用它的join方法来以流式的方式获取它的执行过程,或者调用它的join_stream方法来获取一个异步迭代器来迭代它的执行过程。
class RunsClient:
async def join(
self,
thread_id: str,
run_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> dict
def join_stream(
self,
thread_id: str,
run_id: str,
*,
cancel_on_disconnect: bool = False,
stream_mode: StreamMode | Sequence[StreamMode] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
last_event_id: str | None = None,
) -> AsyncIterator[StreamPart]
join方法会在Run对象执行完成后返回一个包含执行结果的字典,而join_stream方法会返回一个异步迭代器来迭代执行过程中的事件。下面的演示程序展示了如何调用join和join_stream方法来获取Run对象的执行过程。
from langgraph_sdk import get_client
import asyncio,json
async def main():
async with get_client(url="http://localhost:2024") as client:
run = await client.runs.create(
thread_id=None,
assistant_id="test_agent",
input={
"foo": None,
"bar": "checked",
"baz": None,
},
)
result = await client.runs.join(run["thread_id"], run["run_id"])
print(json.dumps(result, indent=2))
run = await client.runs.create(
thread_id=None,
assistant_id="test_agent",
input={
"foo": None,
"bar": "checked",
"baz": None,
},
)
async for part in client.runs.join_stream(run["thread_id"], run["run_id"]):
print(f"[{part.event}] {part.data}")
if part.data.get("status") == "run_done":
break
asyncio.run(main())
输出:
{
"foo": "checked",
"bar": "checked",
"baz": "checked"
}
[metadata] {'run_id': '019dd3f3-8b37-74e0-b173-f3d974645926', 'attempt': 1}
[values] {'foo': None, 'bar': 'checked', 'baz': None}
[values] {'foo': 'checked', 'bar': 'checked', 'baz': None}
[values] {'foo': 'checked', 'bar': 'checked', 'baz': 'checked'}
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)