Run代表针对Assistant的一次调用,这样的调用可以绑定在一个Thread上,称为一轮对话中的一次问答,也可以是完全完全无状态的一次独立调用。RunsClient提供了如下三种调用方式:

  • stream:实时返回Agent的执行过程(如节点变化、Token 输出);
  • wait:采用同步阻塞的执行方式,一直等到任务完成后返回结果;
  • create (异步创建):以异步形式创建一个后台任务来完成调用;

1. 流式调用

RunsClient定义如下三个重载的stream方法利用创建的Run对象支持流式调用,其中前两个重载方法分别对应有无Thread绑定的两种调用方式。

class RunsClient:
    @overload
    def stream(
        self,
        thread_id: str,
        assistant_id: str,
        *,
        input: Input | None = None,
        command: Command | None = None,
        stream_mode: StreamMode | Sequence[StreamMode] = "values",
        stream_subgraphs: bool = False,
        stream_resumable: bool = False,
        metadata: Mapping[str, Any] | None = None,
        config: Config | None = None,
        context: Context | None = None,
        checkpoint: Checkpoint | None = None,
        checkpoint_id: str | None = None,
        checkpoint_during: bool | None = None,
        interrupt_before: All | Sequence[str] | None = None,
        interrupt_after: All | Sequence[str] | None = None,
        feedback_keys: Sequence[str] | None = None,
        on_disconnect: DisconnectMode | None = None,
        webhook: str | None = None,
        multitask_strategy: MultitaskStrategy | None = None,
        if_not_exists: IfNotExists | None = None,
        after_seconds: int | None = None,
        headers: Mapping[str, str] | None = None,
        params: QueryParamTypes | None = None,
        on_run_created: Callable[[RunCreateMetadata], None] | None = None,
    ) -> AsyncIterator[StreamPart]: ...

    @overload
    def stream(
        self,
        thread_id: None,
        assistant_id: str,
        *,
        input: Input | None = None,
        command: Command | None = None,
        stream_mode: StreamMode | Sequence[StreamMode] = "values",
        stream_subgraphs: bool = False,
        stream_resumable: bool = False,
        metadata: Mapping[str, Any] | None = None,
        config: Config | None = None,
        checkpoint_during: bool | None = None,
        interrupt_before: All | Sequence[str] | None = None,
        interrupt_after: All | Sequence[str] | None = None,
        feedback_keys: Sequence[str] | None = None,
        on_disconnect: DisconnectMode | None = None,
        on_completion: OnCompletionBehavior | None = None,
        if_not_exists: IfNotExists | None = None,
        webhook: str | None = None,
        after_seconds: int | None = None,
        headers: Mapping[str, str] | None = None,
        params: QueryParamTypes | None = None,
        on_run_created: Callable[[RunCreateMetadata], None] | None = None,
    ) -> AsyncIterator[StreamPart]: ...

    def stream(
        self,
        thread_id: str | None,
        assistant_id: str,
        *,
        input: Input | None = None,
        command: Command | None = None,
        stream_mode: StreamMode | Sequence[StreamMode] = "values",
        stream_subgraphs: bool = False,
        stream_resumable: bool = False,
        metadata: Mapping[str, Any] | None = None,
        config: Config | None = None,
        context: Context | None = None,
        checkpoint: Checkpoint | None = None,
        checkpoint_id: str | None = None,
        checkpoint_during: bool | None = None,  # deprecated
        interrupt_before: All | Sequence[str] | None = None,
        interrupt_after: All | Sequence[str] | None = None,
        feedback_keys: Sequence[str] | None = None,
        on_disconnect: DisconnectMode | None = None,
        on_completion: OnCompletionBehavior | None = None,
        webhook: str | None = None,
        multitask_strategy: MultitaskStrategy | None = None,
        if_not_exists: IfNotExists | None = None,
        after_seconds: int | None = None,
        headers: Mapping[str, str] | None = None,
        params: QueryParamTypes | None = None,
        on_run_created: Callable[[RunCreateMetadata], None] | None = None,
        durability: Durability | None = None,
    ) -> AsyncIterator[StreamPart]

class StreamPart(NamedTuple):
    event: str
    data: dict
    id: str | None = None

StreamMode = Literal[
    "values",
    "messages",
    "updates",
    "events",
    "tasks",
    "checkpoints",
    "debug",
    "custom",
    "messages-tuple",
]

DisconnectMode = Literal["cancel", "continue"]
OnCompletionBehavior = Literal["delete", "keep"]
MultitaskStrategy = Literal["reject", "interrupt", "rollback", "enqueue"]
IfNotExists = Literal["create", "reject"]
class RunCreateMetadata(TypedDict):
    run_id: str
    thread_id: str | None

stream方法对应CompiledStateGraphastream方法,RunsClientstream方法可以理解为以远程的方式调用CompiledStateGraphastream方法(本质上是Pregelastream方法)。stream方法的参数非常多,下面是一些重要参数的说明:

  • thread_id:绑定的Thread的ID,如果为None表示不绑定Thread。Pregelastream方法会将其放到RunnableConfig中;
  • assistant_id:作为调用目标的Assistant对象的ID。Pregelastream方法利用它提取绑定的配置;
  • input:输入数据,对应Pregelastream方法的同名参数;
  • command:在进行恢复调用时使用此对象,服务端用它来创建langgraph.types.Command对象,并作为Pregelastream方法的同名参数;
  • stream_mode:流模式,对应Pregelastream方法的同名参数。具体模式的区别可以参阅我们的文章“通过一个实例理解LangChain的所有的流模式
  • stream_subgraphs:是否要接收SubGraph的事件,默认值为False表示不接收,对应Pregelastream方法的subgraphs参数;
  • stream_resumable:是否允许流式输出在连接中断后从断点继续,而不是从头开始或直接丢失数据;
  • metadata:调用相关的任意元数据,Pregelastream方法会将其放到RunnableConfig中;
  • config:配置项,Agent Server将它合并到RunnableConfig中;
  • context:静态上下文,对应Pregelastream方法的同名参数;
  • checkpoint/checkpoint_id:从特定的历史快照启动,Pregelastream方法会将其放到RunnableConfig中;
  • interrupt_before:在执行之前触发中断的节点列表,对应Pregelastream方法的同名参数;
  • interrupt_after:在执行之后触发中断的节点列表,对应Pregelastream方法的同名参数;
  • feedback_keys: 指定允许该运行任务接收哪些类型的反馈标签;
  • on_disconnect:指定在连接断开时的处理方式
    • “cancel”:取消正在运行的任务;
    • “continue”:保持任务继续运行;
  • on_completion:指定在调用完成时的处理方式
    • “delete”:删除调用相关的所有数据;
    • “keep”:保留调用相关的所有数据;
  • webhook:指定一个URL,当调用完成时向该URL发送一个POST请求,包含调用的结果数据;
  • multitask_strategy:指定并发任务的处理策略:
    • “reject”:拒绝新的任务请求,直到当前任务完成;
    • “interrupt”:中断当前正在运行的任务,立即开始新的任务;
    • “rollback”:回滚当前正在运行的任务到上一个Checkpoint,然后开始新的任务;
    • “enqueue”:将新的任务请求加入队列,等待当前任务完成后按顺序执行;
  • if_not_exists:指定在调用的Thread不存在时的处理方式
    • “create”:创建一个新的Thread来绑定调用;
    • “reject”:拒绝调用请求;
  • after_seconds:延迟执行的等待时间;
  • on_run_created:一个可调用对象,在调用创建时被调用,接受一个RunCreateMetadata对象作为参数;-
  • durability:指定调用的持久化方式,对应Pregelastream方法的同名参数;

对于RunsClient的stream方法对应的API请求,根据是否绑定Thread,请求方法和路径分别为:

  • 绑定Thread:POST /threads/{thread_id}/runs/stream
  • 不绑定Thread:POST /runs/stream

在如下的演示程序中,我们调用RunsClientstream方法来调用test_agent默认Assistant对象。我们指定了出events(输出的内容太多)之外的所有流模式。我们将收集到的事件按照事件类型进行分类,并在最后按照事件类型输出每类事件的内容。

from langgraph_sdk import get_client
from langgraph_sdk.schema import StreamPart
import asyncio

async def main():
    async with get_client(url="http://localhost:2024") as client:
        events: dict[str, list[StreamPart]] = {}
        async for part in client.runs.stream(
            thread_id=None,
            assistant_id="test_agent",
            stream_mode=[
                "values",
                "messages",
                "updates",
                "tasks",
                "checkpoints",
                "debug",
                "custom",
                "messages-tuple",
            ],
            input={
                "foo": None,
                "bar": "checked",
                "baz": None,
            },
        ):
            events[part.event] = events.get(part.event, []) + [part]
            if part.data.get("status") == "run_done":
                break

        for event, parts in events.items():
            print(f"\n{event}:")
            for part in parts:
                print(part.data)

asyncio.run(main())

输出:

metadata:
{'run_id': '019dd289-8b20-7ff0-be7d-f94da9c280fc', 'attempt': 1}

values:
{'foo': None, 'bar': 'checked', 'baz': None}
{'foo': 'checked', 'bar': 'checked', 'baz': None}
{'foo': 'checked', 'bar': 'checked', 'baz': 'checked'}

tasks:
{'id': 'e9512746-2886-5438-bcf8-5e31aa68ace7', 'name': 'foo', 'input': {'foo': None, 'bar': 'checked', 'baz': None}, 'triggers': ['branch:to:foo']}
{'id': 'e9512746-2886-5438-bcf8-5e31aa68ace7', 'name': 'foo', 'error': None, 'result': {'foo': 'checked'}, 'interrupts': []}
{'id': '6cc10677-b7c1-a6b7-72f5-c8795e245421', 'name': 'bar', 'input': {'foo': 'checked', 'bar': 'checked', 'baz': None}, 'triggers': ['branch:to:bar']}
{'id': '6cc10677-b7c1-a6b7-72f5-c8795e245421', 'name': 'bar', 'error': None, 'result': {}, 'interrupts': []}
{'id': 'fef044cc-e53c-d40b-d6e7-c6f10b20d7ab', 'name': 'baz', 'input': {'foo': 'checked', 'bar': 'checked', 'baz': None}, 'triggers': ['branch:to:baz']}
{'id': 'fef044cc-e53c-d40b-d6e7-c6f10b20d7ab', 'name': 'baz', 'error': None, 'result': {'baz': 'checked'}, 'interrupts': []}

debug:
{'step': 1, 'timestamp': '2026-04-28T05:22:03.640827+00:00', 'type': 'task', 'payload': {'id': 'e9512746-2886-5438-bcf8-5e31aa68ace7', 'name': 'foo', 'input': {'foo': None, 'bar': 'checked', 'baz': None}, 'triggers': ['branch:to:foo']}}
{'step': 1, 'timestamp': '2026-04-28T05:22:03.646046+00:00', 'type': 'task_result', 'payload': {'id': 'e9512746-2886-5438-bcf8-5e31aa68ace7', 'name': 'foo', 'error': None, 'result': {'foo': 'checked'}, 'interrupts': []}}
{'step': 2, 'timestamp': '2026-04-28T05:22:03.648935+00:00', 'type': 'task', 'payload': {'id': '6cc10677-b7c1-a6b7-72f5-c8795e245421', 'name': 'bar', 'input': {'foo': 'checked', 'bar': 'checked', 'baz': None}, 'triggers': ['branch:to:bar']}}
{'step': 2, 'timestamp': '2026-04-28T05:22:03.652776+00:00', 'type': 'task_result', 'payload': {'id': '6cc10677-b7c1-a6b7-72f5-c8795e245421', 'name': 'bar', 'error': None, 'result': {}, 'interrupts': []}}
{'step': 3, 'timestamp': '2026-04-28T05:22:03.655148+00:00', 'type': 'task', 'payload': {'id': 'fef044cc-e53c-d40b-d6e7-c6f10b20d7ab', 'name': 'baz', 'input': {'foo': 'checked', 'bar': 'checked', 'baz': None}, 'triggers': ['branch:to:baz']}}
{'step': 3, 'timestamp': '2026-04-28T05:22:03.658642+00:00', 'type': 'task_result', 'payload': {'id': 'fef044cc-e53c-d40b-d6e7-c6f10b20d7ab', 'name': 'baz', 'error': None, 'result': {'baz': 'checked'}, 'interrupts': []}}

updates:
{'foo': {'foo': 'checked'}}
{'bar': None}
{'baz': {'baz': 'checked'}}

2. 同步阻塞调用

如果说RunsClientstream方法利用创建的Run对象实现了远程调用CompiledStateGraphastream方法,那么RunsClientwait方法就可以理解为利用创建的Run对象远程调用CompiledStateGraph(Pregel)的ainvoke方法。与stream方法类似,wait方法也定义了三个重载来支持有无Thread绑定的两种调用方式。wait方法的参数与stream方法基本相同,区别在于wait方法没有stream_modestream_subgraphsstream_resumable这三个参数(因为它不是流式调用了),而是增加了一个raise_error参数来指定在调用过程中如果发生错误是否要抛出异常。

class RunsClient:
    @overload
    async def wait(
        self,
        thread_id: str,
        assistant_id: str,
        *,
        input: Input | None = None,
        command: Command | None = None,
        metadata: Mapping[str, Any] | None = None,
        config: Config | None = None,
        context: Context | None = None,
        checkpoint: Checkpoint | None = None,
        checkpoint_id: str | None = None,
        checkpoint_during: bool | None = None,
        interrupt_before: All | Sequence[str] | None = None,
        interrupt_after: All | Sequence[str] | None = None,
        webhook: str | None = None,
        on_disconnect: DisconnectMode | None = None,
        multitask_strategy: MultitaskStrategy | None = None,
        if_not_exists: IfNotExists | None = None,
        after_seconds: int | None = None,
        raise_error: bool = True,
        headers: Mapping[str, str] | None = None,
        params: QueryParamTypes | None = None,
        on_run_created: Callable[[RunCreateMetadata], None] | None = None,
    ) -> list[dict] | dict[str, Any]: ...

    @overload
    async def wait(
        self,
        thread_id: None,
        assistant_id: str,
        *,
        input: Input | None = None,
        command: Command | None = None,
        metadata: Mapping[str, Any] | None = None,
        config: Config | None = None,
        context: Context | None = None,
        checkpoint_during: bool | None = None,
        interrupt_before: All | Sequence[str] | None = None,
        interrupt_after: All | Sequence[str] | None = None,
        webhook: str | None = None,
        on_disconnect: DisconnectMode | None = None,
        on_completion: OnCompletionBehavior | None = None,
        if_not_exists: IfNotExists | None = None,
        after_seconds: int | None = None,
        raise_error: bool = True,
        headers: Mapping[str, str] | None = None,
        params: QueryParamTypes | None = None,
        on_run_created: Callable[[RunCreateMetadata], None] | None = None,
    ) -> list[dict] | dict[str, Any]: ...

    async def wait(
        self,
        thread_id: str | None,
        assistant_id: str,
        *,
        input: Input | None = None,
        command: Command | None = None,
        metadata: Mapping[str, Any] | None = None,
        config: Config | None = None,
        context: Context | None = None,
        checkpoint: Checkpoint | None = None,
        checkpoint_id: str | None = None,
        checkpoint_during: bool | None = None,  # deprecated
        interrupt_before: All | Sequence[str] | None = None,
        interrupt_after: All | Sequence[str] | None = None,
        webhook: str | None = None,
        on_disconnect: DisconnectMode | None = None,
        on_completion: OnCompletionBehavior | None = None,
        multitask_strategy: MultitaskStrategy | None = None,
        if_not_exists: IfNotExists | None = None,
        after_seconds: int | None = None,
        raise_error: bool = True,
        headers: Mapping[str, str] | None = None,
        params: QueryParamTypes | None = None,
        on_run_created: Callable[[RunCreateMetadata], None] | None = None,
        durability: Durability | None = None,
    ) -> list[dict] | dict[str, Any]

对于RunsClientwait方法对应的API请求,根据是否绑定Thread,会分别采用如下的请求方法和路径。调用之后的状态会JSON格式的响应体中返回。

  • 绑定Thread:POST /threads/{thread_id}/runs/wait
  • 不绑定Thread:POST /runs/wait

3. 异步创建后台调用任务

RunsClientstreamwait方法直接执行创建的Run对象不同,create方法会直接返回的Run对象。RunsClient同样定义三个重载的create方法来支持有无Thread绑定的两种调用方式。create方法创建的Run方法采用流的方式调用Agent,所以它与stream方法的参数基本一致。

class RunsClient:
    @overload
    async def create(
        self,
        thread_id: None,
        assistant_id: str,
        *,
        input: Input | None = None,
        command: Command | None = None,
        stream_mode: StreamMode | Sequence[StreamMode] = "values",
        stream_subgraphs: bool = False,
        stream_resumable: bool = False,
        metadata: Mapping[str, Any] | None = None,
        checkpoint_during: bool | None = None,
        config: Config | None = None,
        context: Context | None = None,
        interrupt_before: All | Sequence[str] | None = None,
        interrupt_after: All | Sequence[str] | None = None,
        webhook: str | None = None,
        on_completion: OnCompletionBehavior | None = None,
        if_not_exists: IfNotExists | None = None,
        after_seconds: int | None = None,
        headers: Mapping[str, str] | None = None,
        params: QueryParamTypes | None = None,
        on_run_created: Callable[[RunCreateMetadata], None] | None = None,
    ) -> Run: ...

    @overload
    async def create(
        self,
        thread_id: str,
        assistant_id: str,
        *,
        input: Input | None = None,
        command: Command | None = None,
        stream_mode: StreamMode | Sequence[StreamMode] = "values",
        stream_subgraphs: bool = False,
        stream_resumable: bool = False,
        metadata: Mapping[str, Any] | None = None,
        config: Config | None = None,
        context: Context | None = None,
        checkpoint: Checkpoint | None = None,
        checkpoint_id: str | None = None,
        checkpoint_during: bool | None = None,
        interrupt_before: All | Sequence[str] | None = None,
        interrupt_after: All | Sequence[str] | None = None,
        webhook: str | None = None,
        multitask_strategy: MultitaskStrategy | None = None,
        if_not_exists: IfNotExists | None = None,
        after_seconds: int | None = None,
        headers: Mapping[str, str] | None = None,
        params: QueryParamTypes | None = None,
        on_run_created: Callable[[RunCreateMetadata], None] | None = None,
    ) -> Run: ...

    async def create(
        self,
        thread_id: str | None,
        assistant_id: str,
        *,
        input: Input | None = None,
        command: Command | None = None,
        stream_mode: StreamMode | Sequence[StreamMode] = "values",
        stream_subgraphs: bool = False,
        stream_resumable: bool = False,
        metadata: Mapping[str, Any] | None = None,
        config: Config | None = None,
        context: Context | None = None,
        checkpoint: Checkpoint | None = None,
        checkpoint_id: str | None = None,
        checkpoint_during: bool | None = None,  # deprecated
        interrupt_before: All | Sequence[str] | None = None,
        interrupt_after: All | Sequence[str] | None = None,
        webhook: str | None = None,
        multitask_strategy: MultitaskStrategy | None = None,
        if_not_exists: IfNotExists | None = None,
        on_completion: OnCompletionBehavior | None = None,
        after_seconds: int | None = None,
        headers: Mapping[str, str] | None = None,
        params: QueryParamTypes | None = None,
        on_run_created: Callable[[RunCreateMetadata], None] | None = None,
        durability: Durability | None = None,
    ) -> Run

class Run(TypedDict):
    run_id: str
    thread_id: str
    assistant_id: str
    created_at: datetime
    updated_at: datetime
    status: RunStatus
    metadata: Json
    multitask_strategy: MultitaskStrategy

RunStatus = Literal["pending", "running", "error", "success", "timeout", "interrupted"]

LangGraph SDK为Run定义了同名的类型,包含了Run的基本信息。可以看出Run的thread_id字段的类型为str而不是str|None,这意味着即使是通过不绑定Thread的方式创建的Run对象,系统也会生成一个UUID作为thread_id。这一个规则同样应用于前面利用streamwait方法创建的Run对象。之所以这个thread_id是必须的,因为Run对象时基于thread_id进行存储的,所以针对Run对象提取和兼容都需要指定thread_id

对于RunsClientcreate方法对应的API请求,根据是否绑定Thread,会分别采用如下的请求方法和路径。创建的Run对象会以JSON格式的响应体返回。

  • 绑定Thread:POST /threads/{thread_id}/runs
  • 不绑定Thread:POST /runs

除了调用create方法创建一个Run对象完成单次调用之外,RunsClient还提供了一个create_batch方法,可以一次性创建多个Run对象来完成批量调用。create_batch方法接受一个RunCreate类型的列表作为参数,RunCreate类型包含了创建Run对象所需的所有信息。create_batch方法会返回一个Run对象的列表。该方法对应的API请求的方法和路径分别为POST和/runs/batch,创建的Run对象列表会以JSON格式的响应体返回。

class RunsClient:
    async def create_batch(
        self,
        payloads: list[RunCreate],
        *,
        headers: Mapping[str, str] | None = None,
        params: QueryParamTypes | None = None,
    ) -> list[Run]

class RunCreate(TypedDict):
    thread_id: str | None
    assistant_id: str
    input: dict | None
    metadata: dict | None
    config: Config | None
    context: Context | None
    checkpoint_id: str | None
    interrupt_before: list[str] | None
    interrupt_after: list[str] | None
    webhook: str | None
    multitask_strategy: MultitaskStrategy | None

下面的演示程序通过调用RunsClientcreate方法来创建一个延迟5秒执行Run对象。我们在创建Run对象之后立即输出它的状态,然后分别在5秒和10秒之后获取Run对象的最新状态并输出。你会看到Run对象的状态是从pending变成success(由于Agent执行时间太短,无法捕捉到running状态)。上面介绍的自动为Run对象生成thread_id的规则也在这个程序中得到了体现。对于以这种方式创建的Run对象,在默认情况下,当执行完成后就被会删除,所以我们需要指定on_completion参数为"keep"来保留它。

from langgraph_sdk import get_client
import asyncio

async def main():
    async with get_client(url="http://localhost:2024") as client:
        run = await client.runs.create(
            thread_id=None,
            assistant_id="test_agent",
            input={
                "foo": None,
                "bar": "checked",
                "baz": None,
            },
            after_seconds=5,
            on_completion= "keep"
        )
        thread_id = run["thread_id"]
        print(f"initial status: {run['status']}")

        await asyncio.sleep(5)
        run = await client.runs.get(thread_id=thread_id, run_id=run["run_id"])
        print(f"status after 5 seconds: {run['status']}")

        await asyncio.sleep(6)
        run = await client.runs.get(thread_id=thread_id, run_id=run["run_id"])
        print(f"status after 10 seconds: {run['status']}")

asyncio.run(main())

输出:

initial status: pending
status after 5 seconds: pending
status after 10 seconds: success

4. Run检索、取消和删除

RunsClient提供了list方法根据照不同的条件来检索与指定Thread绑定的Run对象,如果知道run_id,我们可以调用get方法来获取对应Run对象的描述信息和最新状态。对于正在运行的Run对象,我们可以调用cancel方法来取消它。如果不再需要某个Run对象了,我们可以调用delete方法来删除它。我们在上面讨论过,Run对象是基于thread_id进行存储的,所以调用这些方法都需要指定thread_id

class RunsClient:
    async def list(
        self,
        thread_id: str,
        *,
        limit: int = 10,
        offset: int = 0,
        status: RunStatus | None = None,
        select: list[RunSelectField] | None = None,
        headers: Mapping[str, str] | None = None,
        params: QueryParamTypes | None = None,
    ) -> list[Run]

    async def get(
        self,
        thread_id: str,
        run_id: str,
        *,
        headers: Mapping[str, str] | None = None,
        params: QueryParamTypes | None = None,
    ) -> Run

    async def cancel(
        self,
        thread_id: str,
        run_id: str,
        *,
        wait: bool = False,
        action: CancelAction = "interrupt",
        headers: Mapping[str, str] | None = None,
        params: QueryParamTypes | None = None,
    ) -> None

    async def delete(
        self,
        thread_id: str,
        run_id: str,
        *,
        headers: Mapping[str, str] | None = None,
        params: QueryParamTypes | None = None,
    ) -> None

如所示的是这些方法对应的API请求方法和路径,其中list方法出thread_id之外的其他参数会置于请求URL的查询字符串中。

  • list方法:GET /threads/{thread_id}/runs
  • get方法:GET /threads/{thread_id}/runs/{run_id}
  • cancel方法:POST /threads/{thread_id}/runs/{run_id}/cancel
  • delete方法:DELETE /threads/{thread_id}/runs/{run_id}

5. Run对象的接入

对于通过RunsClientcreate方法创建的Run对象,我们可以调用它的join方法来以流式的方式获取它的执行过程,或者调用它的join_stream方法来获取一个异步迭代器来迭代它的执行过程。

class RunsClient:
    async def join(
        self,
        thread_id: str,
        run_id: str,
        *,
        headers: Mapping[str, str] | None = None,
        params: QueryParamTypes | None = None,
    ) -> dict

    def join_stream(
        self,
        thread_id: str,
        run_id: str,
        *,
        cancel_on_disconnect: bool = False,
        stream_mode: StreamMode | Sequence[StreamMode] | None = None,
        headers: Mapping[str, str] | None = None,
        params: QueryParamTypes | None = None,
        last_event_id: str | None = None,
    ) -> AsyncIterator[StreamPart]

join方法会在Run对象执行完成后返回一个包含执行结果的字典,而join_stream方法会返回一个异步迭代器来迭代执行过程中的事件。下面的演示程序展示了如何调用joinjoin_stream方法来获取Run对象的执行过程。

from langgraph_sdk import get_client
import asyncio,json

async def main():
    async with get_client(url="http://localhost:2024") as client:
        run = await client.runs.create(
            thread_id=None,
            assistant_id="test_agent",
            input={
                "foo": None,
                "bar": "checked",
                "baz": None,
            },
        )

        result = await client.runs.join(run["thread_id"], run["run_id"])
        print(json.dumps(result, indent=2))

        run = await client.runs.create(
            thread_id=None,
            assistant_id="test_agent",
            input={
                "foo": None,
                "bar": "checked",
                "baz": None,
            },
        )
        async for part in client.runs.join_stream(run["thread_id"], run["run_id"]):
            print(f"[{part.event}] {part.data}")
            if part.data.get("status") == "run_done":
                break
asyncio.run(main())

输出:

{
  "foo": "checked",
  "bar": "checked",
  "baz": "checked"
}

[metadata] {'run_id': '019dd3f3-8b37-74e0-b173-f3d974645926', 'attempt': 1}
[values] {'foo': None, 'bar': 'checked', 'baz': None}
[values] {'foo': 'checked', 'bar': 'checked', 'baz': None}
[values] {'foo': 'checked', 'bar': 'checked', 'baz': 'checked'}
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐