前面主要围绕协议层面对A2A涉及的所有操作,以及这些操作的输入输出消息类型进行了详细的介绍,我想很多人可能会觉得这些内容过于抽象和理论化了。接下来我们将介绍A2A官方提供Python SDK实现,看看它是如何构建一个A2A Server的。A2A默认支持JSON-RPCgRPCHTTP+JSON/REST三种标准的Binding,这里只介绍JSON-RPC的实现,其他两种Binding的实现方式在原理上是类似的。在介绍A2A SDK服务端架构设计之前,我们先通过一个实例演示如何利用A2A SDK服务端框架来构建一个Agent Server,并利用它来部署一个通过LangChain创建的Agent。我们最终会利用A2A客户端组件来调用这个Agent Server,以验证它的功能和效果。

1. 构建Agent Server

如下面的代码片段所示,我们调用LangChain的create_agent方法创建了一个简单的Agent,它使用ChatOpenAI作为模型。我们旨在利用此Agent提供一个根据天气提供着装建议,为此我们为它注册了一个用来提供天气信息的工具get_weather。然后我们基于这个Agent创建了一个代表A2A Agent Server的LangChainA2AServer实例,并调用run方法启动它。

from typing import Callable, Callable, cast
from dotenv import load_dotenv
from starlette.applications import Starlette
from langchain_core.messages import AnyMessage, AIMessage, ToolMessage
from langchain.tools import tool
from langgraph.graph.state import CompiledStateGraph
from langchain_core.runnables import RunnableConfig
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from a2a.types import Message
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import (
    InMemoryPushNotificationConfigStore,
    TaskUpdater,
    BasePushNotificationSender,
    InMemoryPushNotificationConfigStore,
    InMemoryTaskStore,
)
from a2a.types import (
    Part,
    TaskState,
    TextPart,
    UnsupportedOperationError,
    AgentCapabilities,
    AgentCard,
    AgentSkill,
)
from a2a.utils import (
    new_task,
    new_agent_text_message,
    new_agent_parts_message,
)
from a2a.utils.errors import ServerError
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler, DefaultRequestHandler
import uvicorn, httpx

load_dotenv()

@tool
def get_weather(location: str) -> str:
    """获取指定位置的天气信息"""
    return f"天气预报:{location}今天晴,最高温度25摄氏度,最低温度15摄氏度。"


agent = create_agent(
    model=ChatOpenAI(model="gpt-5.2-chat"),
    tools=[get_weather],
    system_prompt="你是一个精通养生之道的时尚顾问,请根据天气提供着装建议。",
)
agent_card_factory = lambda host, port: AgentCard(
    name="clothing_assistant",
    description="提供根据天气的着装建议",
    url=f"http://{host}:{port}/",
    version="1.0.0",
    default_input_modes=["text", "text/plain"],
    default_output_modes=["text", "text/plain"],
    capabilities=AgentCapabilities(streaming=True, push_notifications=True),
    skills=[
        AgentSkill(
            id="clothing_assistant",
            name="clothing_assistant",
            description="获取指定位置的天气信息",
            tags=["weather", "clothing", "advice"],
        )
    ],
)
server = LangChainA2AServer(agent=agent, agent_card_factory=agent_card_factory, port= 3721)
server.run()

1.1 LangChainA2AServer

Agent Server的本质上是一个由Uvicorn构建的Web Server,作为ASGIApplicationStarlette对象由LangChainA2AServer提供。如下所示的是LangChainA2AServer的完整定义。

class LangChainA2AServer:
    host: str
    port: int
    starlette: Starlette

    def __init__(
        self,
        agent: CompiledStateGraph,
        agent_card_factory: Callable[[str, int], AgentCard],
        host: str = "localhost",
        port: int = 8000,
    ) -> None:
        self.host = host
        self.port = port
        config_store = InMemoryPushNotificationConfigStore()
        request_handler = DefaultRequestHandler(
            agent_executor=LangChainAgentExecutor(agent=agent),
            task_store=InMemoryTaskStore(),
            push_config_store=config_store,
            push_sender=BasePushNotificationSender(
                httpx_client=httpx.AsyncClient(), config_store=config_store
            ),
        )
        self.starlette = A2AStarletteApplication(
            agent_card=agent_card_factory(self.host, self.port),
            http_handler=request_handler,
        ).build()

    def run(self):
        uvicorn.run(self.starlette, host=self.host, port=self.port)

LangChainA2AServer__init__方法定义了如下的参数:

  • agent:一个CompiledStateGraph对象,表示要绑定的LangChain Agent;
  • agent_card_factory:一个函数,接受主机地址和端口号作为输入,返回一个AgentCard对象用于在A2A协议中展示Agent的身份信息和功能描述;
  • host:一个字符串,表示服务器的主机地址,默认为"localhost";
  • port:一个整数,表示服务器的端口号,默认为8000;

作为ASGIApplicationStarlette对象是在__init__方法中利用A2AStarletteApplication构建的。具体来说,我们在构建A2AStarletteApplication时提供了两个对象,一个是通过agent_card_factory生成的AgentCard对象,另一个是DefaultRequestHandler对象。顾名思义,DefaultRequestHandler是A2A SDK提供的默认的请求处理器,它负责处理A2A协议定义的各种请求。为了创建此对象,我们提供了:

  • LangChainAgentExecutor对象,它是A2A SDK中AgentExecutor的一个实现,负责执行与LangChain Agent交互的核心逻辑;
  • InMemoryTaskStore对象,用于存储和管理任务;
  • InMemoryPushNotificationConfigStore对象,用于存储和管理推送通知的配置;
  • BasePushNotificationSender对象,用于发送推送通知;

1.2 LangChainAgentExecutor

作为一个开发协议的实现框架,A2A SDK并为与特定的Agent开发框架绑定,而是提供了抽象的AgentExecutor类与具体的框架进行适配。LangChainAgentExecutor就是我们自定义的AgentExecutor类型,用于适配通过LangChain创建的Agent。如下所示的是LangChainAgentExecutor的完整定义。

class LangChainAgentExecutor(AgentExecutor):
    agent: CompiledStateGraph

    def __init__(self, agent: CompiledStateGraph):
        self.agent = agent

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        context_id = cast(str, context.context_id)
        task = context.current_task

        # enqueue a newly created task
        if task is None:
            task = new_task(cast(Message, context.message))
            await event_queue.enqueue_event(task)

        updater = TaskUpdater(
            event_queue=event_queue, task_id=task.id, context_id=context_id
        )
        config: RunnableConfig = {
            "configurable": {
                "thread_id": context_id,
            }
        }
        input: dict = {
            "messages": [{"role": "user", "content": context.get_user_input()}]
        }
        async for chunk in self.agent.astream(
            input=input, config=config, stream_mode="values"
        ):
            message: AnyMessage = chunk["messages"][-1]
            if isinstance(message, AIMessage):

                # ai message with tool calls.
                if message.tool_calls and len(message.tool_calls) > 0:
                    parts = []
                    for tool_call in message.tool_calls:
                        part = TextPart(text = f"{tool_call['name']}({tool_call['id']})")
                        part.metadata = {
                            "kind": "tool_call",
                            "tool_name": tool_call["name"],
                            "tool_call_id": tool_call["id"],
                            "args": tool_call["args"]}
                        parts.append(part)
                    await updater.update_status(
                        state=TaskState.working,
                        message=new_agent_parts_message(
                            parts=parts, context_id=context_id, task_id=task.id
                        ),
                    )
                else:
                    # ai message without tool calls, treat the content as final response and complete the task.
                    await updater.add_artifact(
                        parts=[Part(root=TextPart(text=str(message.content)))],
                        name="result",
                    )
                    await updater.complete()
            elif isinstance(message, ToolMessage):
                # tool message, update the task status but keep the task open.
                await updater.update_status(
                    state=TaskState.working,
                    message=new_agent_text_message(
                        text=str(message.content),
                        context_id=context_id,
                        task_id=task.id,
                    ),
                )

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
        raise ServerError(error=UnsupportedOperationError())

AgentExecutor的核心方法execute定义了contextevent_queue两个参数,前者提供的RequestContext对象包含了与当前请求相关的上下文信息;后者提供的EventQueue队列,我们利用此对象实时将事件推送到客户端,这里的事件不仅包含任务状态更新、产出物更新事件,还包括任务和消息本身。在execute方法中,我们首先检查当前上下文中是否已经存在一个任务,如果没有,我们就创建一个新的任务并将其加入事件队列。我们创建的TaskUpdater用于更新任务的状态和Artifact。

我们调用Agentastream方法来执行Agent的逻辑,作为输入的提示词通过调用RequestContextget_user_input方法来获取。我们将RequestContext提供的的context_id作为Agentthread_id置于RunnableConfig中。由于我们采用的流模式为values,所以每一个涉及状态更新的推理步骤都会触发一次迭代。对于每次迭代,我们会提取对话历史的最新生成的消息,并根据消息的类型进行不同的处理:

  • 如果这个消息是一个AIMessage
    • 如果有工具调用,我们将工具调用的信息(工具名称、调用ID、参数等)转换成一组TextPart,然后利用new_agent_parts_message函数将它们封装成一个A2A消息,最后通过调用TaskUpdaterupdate_status来更新任务的状态(working);
    • 如果没有工具调用,意味着整个推理过程结束,此时我们通过调用TaskUpdateradd_artifact方法将消息的内容作为Artifact,并调用complete方法将任务状态设置为completed
  • 如果这个消息是一个ToolMessage,我们调用new_agent_text_message函数将消息内容转换成A2A消息,并通过调用TaskUpdaterupdate_status来更新任务的状态(working);

2. 使用A2A SDK调用Agent

然后我们编写了如下的程序利用A2A SDK提供的客户端组件来调用我们刚才创建的A2A Agent Server。如下面的代码片段所示,我们调用ClientFactoryconnect方法连接到Agent Server(“http://localhost:3721”)来创建代表A2A客户端组件的Client对象。然后调用它的get_card方法获取AgentCard,并将其打印出来。

from typing import Any, cast
from a2a.client import ClientFactory, ClientConfig
from a2a.types import (Message,Part,Role, TextPart,Task,TaskStatusUpdateEvent, TaskArtifactUpdateEvent)
import asyncio,json,uuid, httpx

async def main():
    http_client = httpx.AsyncClient(timeout=120)
    client = await ClientFactory.connect(agent= "http://localhost:3721", client_config= ClientConfig(httpx_client=http_client))
    agent_card = await client.get_card()
    print(f"Agent Card:\n{json.dumps(agent_card.model_dump(), indent=2)}\n")
    
    request = Message(
        message_id =uuid.uuid4().hex,
        role= Role.user,
        parts=[Part(root= TextPart(text="出差去苏州,请问现在苏州的天气如何?我该穿什么?"))]
    )    
    processed_messages = set()
    def pretty_print_message(message:Message|None):
        if message is None or message.message_id in processed_messages:
            return
        processed_messages.add(message.message_id)
        tool_calls = [part for part in message.parts if part.root.metadata and part.root.metadata.get("kind") == "tool_call"]
        if len(tool_calls) > 0:
            print("tool calls:")
            for part in tool_calls:
                print(f"  {part.root.text}")  # type: ignore
                metadata = part.root.metadata or {}  
                print(f"    name: {metadata.get('tool_name')}")
                print(f"    id: {metadata.get('tool_call_id')}")
                print(f"    args: {metadata.get('args')}")
        for part in (part for part in message.parts if part not in tool_calls):
            if isinstance(part.root, TextPart):
                print(f"content: {part.root.text}")

    async for chunk in client.send_message(request=request):
        if isinstance(chunk,Message):
            pretty_print_message(chunk)
        else:
            task, event = cast(tuple[Task, Any], chunk)
            
            if event is None:
                print(f"\n{'='*30} Task created: {'='*30}")
                last_message = task.history[-1] if task.history else None
                pretty_print_message(last_message)

            if isinstance(event, TaskStatusUpdateEvent):
                print(f"\n{'='*30} Status updated: {'='*30}")
                print(f"Task status: {event.status.state}")
                last_message = task.history[-1] if task.history else None
                pretty_print_message(last_message)
            elif isinstance(event, TaskArtifactUpdateEvent):
                print(f"\n{'='*30} Artifact updated: {'='*30}")
                for part in event.artifact.parts:
                    if isinstance(part.root, TextPart):
                        print(f"content: {part.root.text}")   
                        
asyncio.run(main())

接下里通过调用Clientsend_message方法以A2A协议远程调用我们部署的Agent。我们将提示词疯转到一个A2A消息中,并将其作为输入传递给send_message方法。由于send_message方法返回一个异步生成器,我们可以在一个异步循环中处理它的输出。每次迭代,我们都会收到一个块(chunk),它可能是一个Message对象,也可能是一个包含Task和事件的元组。我们在循环中处理每个收到的响应块,如果块是一个Message对象,我们就调用pretty_print_message函数来格式化输出消息内容;如果块是一个包含Task和事件的元组,我们根据事件类型(TaskStatusUpdateEventTaskArtifactUpdateEvent)来打印相应的信息,并调用pretty_print_message函数来输出相关消息内容。

整个客户端程序会生成如下两端输出,前面一段是作为Agent身份信息的AgentCard的内容;后面一段则是我们以流的方式远程调用Agent实时收到的响应内容。

{
  "additionalInterfaces": null,
  "capabilities": {
    "extensions": null,
    "pushNotifications": true,
    "stateTransitionHistory": null,
    "streaming": true
  },
  "defaultInputModes": [
    "text",
    "text/plain"
  ],
  "defaultOutputModes": [
    "text",
    "text/plain"
  ],
  "description": "\u63d0\u4f9b\u6839\u636e\u5929\u6c14\u7684\u7740\u88c5\u5efa\u8bae",
  "documentationUrl": null,
  "iconUrl": null,
  "name": "clothing_assistant",
  "preferredTransport": "JSONRPC",
  "protocolVersion": "0.3.0",
  "provider": null,
  "security": null,
  "securitySchemes": null,
  "signatures": null,
  "skills": [
    {
      "description": "\u83b7\u53d6\u6307\u5b9a\u4f4d\u7f6e\u7684\u5929\u6c14\u4fe1\u606f",
      "examples": null,
      "id": "clothing_assistant",
      "inputModes": null,
      "name": "clothing_assistant",
      "outputModes": null,
      "security": null,
      "tags": [
        "weather",
        "clothing",
        "advice"
      ]
    }
  ],
  "supportsAuthenticatedExtendedCard": null,
  "url": "http://localhost:3721/",
  "version": "1.0.0"
}
============================== Task created: ==============================
content: 出差去苏州,请问现在苏州的天气如何?我该穿什么?

============================== Status updated: ==============================
Task status: TaskState.working
tool calls:
  get_weather(call_ySD5swYhhGA98COU6R1e7MYc)
    name: get_weather
    id: call_ySD5swYhhGA98COU6R1e7MYc
    args: {'location': '苏州'}

============================== Status updated: ==============================
Task status: TaskState.working
content: 天气预报:苏州今天晴,最高温度25摄氏度,最低温度15摄氏度。

============================== Artifact updated: ==============================
content: 好的,我结合**苏州当前天气**和**养生+时尚**两个角度给你一些贴心建议 🌿👔

### 🌤 苏州今天天气概况
- **天气**:晴朗  
- **气温**:15℃ ~ 25℃  
- **特点**:早晚偏凉,中午温暖,春季昼夜温差较明显

---

## 👗 出差着装建议(养生 × 得体)

### ✅ 白天 / 外出洽谈
- **上身**:  
  - 薄西装外套 / 针织开衫 / 风衣(不厚重)
  - 内搭棉质衬衫或薄长袖,透气又稳重  
- **下身**:  
  - 西裤 / 休闲西裤 / 及踝半裙  
- **鞋子**:  
  - 皮鞋、乐福鞋或舒适的低跟鞋,适合走路  

👉 中午25℃左右,**外套可随时脱下**,避免出汗后受凉,符合养生“春不捂过头”的原则。

---

### 🌙 早晚 & 空调环境
- **必备一件**:薄外套或披肩  
- 苏州湿度相对偏高,**注意护住颈肩和腹部**,防止受凉引起疲劳或肠胃不适。

---

## 🌿 养生小提醒
- 春季宜“**轻暖不燥**”:  
  - 面料选 **棉、麻、天丝**,少穿闷热化纤  
- 可选择**浅色系**(米白、浅灰、雾蓝),顺应春气,也显得清爽专业  
- 随身带点温水,少喝冰饮,护脾胃

---

如果你能告诉我:  
👉 **出差偏商务还是偏休闲?**  
👉 **男性 / 女性?**  

我可以帮你搭配一套更具体、更好看的出差穿搭方案 😊

============================== Status updated: ==============================
Task status: TaskState.completed

3. 直接以HTTP请求的方式调用Agent

既然我们构建的Agent Server是一个基于Uvicorn的Web Server,那么理论上我们也可以直接以HTTP请求的方式来调用它,而不需要通过A2A SDK提供的客户端组件。我们可以使用任何支持HTTP请求的工具或库来发送请求,例如curlPostman或者httpx等。下面是一个使用httpx库发送HTTP请求的示例代码片段。

from a2a.types import (
    Message,
    SendMessageRequest,
    MessageSendParams,
    MessageSendConfiguration,
    PushNotificationConfig,
    Role,
    Part,
    TextPart,
)
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response
import asyncio, json, uuid, httpx, uvicorn
from concurrent.futures import ThreadPoolExecutor


async def handle_notification(request: Request) -> Response:
    body = await request.body()
    print(
        f"Get Result:\n{json.dumps(json.loads(body.decode('utf-8')), indent=2, ensure_ascii=False)}"
    )
    return Response(status_code=204)


def run_listener():
    starlette = Starlette()
    starlette.add_route(path="/webhook", route=handle_notification, methods=["POST"])
    ThreadPoolExecutor().submit(
        lambda: uvicorn.run(starlette, host="0.0.0.0", port=9527)
    )

async def main():
    run_listener()
    await asyncio.sleep(5)  # 等待服务器启动

    request = Message(
        message_id=uuid.uuid4().hex,
        role=Role.user,
        parts=[
            Part(
                root=TextPart(text="出差去苏州,请问现在苏州的天气如何?我该穿什么?")
            )
        ],
    )
    http_client = httpx.AsyncClient(timeout=120)
    config = PushNotificationConfig(url="http://localhost:9527/webhook")
    request = SendMessageRequest(
        id=uuid.uuid4().hex,
        params=MessageSendParams(
            configuration=MessageSendConfiguration(push_notification_config=config),
            message=request,
        ),
    )
    await http_client.post(
        "http://localhost:3721", json=request.model_dump(mode="json", exclude_none=True)
    )
    while True:
        await asyncio.sleep(1)

asyncio.run(main())

为了演示从服务端推送通知的功能,我们利用uvicorn构建了一个简单的Web Server来接收推送通知。这个Web Server监听在9527端口,并且定义了一个路由"/webhook"来处理POST请求。当收到推送通知时,它会将通知的内容打印出来。我们在main函数中首先启动这个Web Server,然后构造一个SendMessageRequest对象,其中包含了一个Message对象作为输入,以及一个PushNotificationConfig对象来配置推送通知的URL。最后我们使用httpx库发送HTTP POST请求到Agent Server的地址(http://localhost:3721),并将SendMessageRequest对象作为请求体发送出去。

当Agent Server处理这个请求并完成任务后,它会根据PushNotificationConfig中配置的URL将结果推送到我们定义的Webhook上,我们就可以在控制台看到推送通知的内容。

{
  "artifacts": [
    {
      "artifactId": "181734b6-96ca-483c-b3ef-c27720d8f4bd",
      "name": "result",
      "parts": [
        {
          "kind": "text",
          "text": "苏州今日**晴朗,15–25℃,早晚偏凉、白天温暖**,很适合走“**商务得体 + 养生舒适**”路线。给你一套分场景的穿搭建议:\n\n### 🌿 商务正式(会议/拜访客户)\n**上装**\n- 轻薄西装外套或挺括风衣(深蓝、灰、米色皆宜)\n- 内搭透气衬衫或薄款针织衫(棉、真丝或天丝材质,利于养气)\n\n**下装**\n- 西装裤或垂感好的直筒裤  \n- 避免过厚面料,苏州湿润,透气很重要\n\n**鞋履**\n- 皮质商务鞋或简约乐福鞋  \n- 建议软底,护足养肾,长时间走路不累\n\n---\n\n### 🌤 商务+出行(白天外出、考察)\n- 可脱卸的**薄西装/针织外套**(应对早晚温差)\n- 内搭**POLO衫或轻商务衬衫**\n- **防晒但透气**,晴天紫外线不低\n\n---\n\n### 🌙 晚间/非正式应酬\n- 轻薄针织衫 + 休闲西裤  \n- 若去园林或水边,**带一条薄围巾**,防凉护颈\n\n---\n\n### 🌱 养生小贴士(很重要)\n- 苏州湿气偏重:  \n  👉 少穿完全不透气的化纤面料  \n  👉 选择**棉、麻、真丝混纺**\n- 鞋袜保持干爽,避免湿气入体\n- 颜色上推荐**浅色系**,顺应春末初夏养阳之道\n\n如果你愿意告诉我**性别、是否偏正式、是否需要拍照/上镜**,我可以直接给你一套“拎包即走”的完整搭配。"
        }
      ]
    }
  ],
  "context_id": "83618cbd-4ef1-4410-bd92-ccf0712a25bf",
  "history": [
    {
      "context_id": "83618cbd-4ef1-4410-bd92-ccf0712a25bf",
      "kind": "message",
      "messageId": "d0d7e2f441ea4522b3e4e2c7fcb3e827",
      "parts": [
        {
          "kind": "text",
          "text": "What should I wear for a business trip to Suzhou?"
        }
      ],
      "role": "user",
      "taskId": "ab2d2177-7513-499d-a1e8-37b7d31d6003"
    },
    {
      "context_id": "83618cbd-4ef1-4410-bd92-ccf0712a25bf",
      "kind": "message",
      "messageId": "f2128450-51b0-4595-b2bc-ff285bf6dbef",
      "parts": [
        {
          "kind": "text",
          "metadata": {
            "kind": "tool_call",
            "tool_name": "get_weather",
            "tool_call_id": "call_WlCFfor9lkWm37FDTt7OUc89",
            "args": {
              "location": "Suzhou, China"
            }
          },
          "text": "get_weather(call_WlCFfor9lkWm37FDTt7OUc89)"
        }
      ],
      "role": "agent",
      "taskId": "ab2d2177-7513-499d-a1e8-37b7d31d6003"
    },
    {
      "context_id": "83618cbd-4ef1-4410-bd92-ccf0712a25bf",
      "kind": "message",
      "messageId": "c87a6de9-4ee0-40da-9515-87c83dd85687",
      "parts": [
        {
          "kind": "text",
          "text": "天气预报:苏州今天晴,最高温度25摄氏度,最低温度15摄氏度。"
        }
      ],
      "role": "agent",
      "taskId": "ab2d2177-7513-499d-a1e8-37b7d31d6003"
    }
  ],
  "id": "ab2d2177-7513-499d-a1e8-37b7d31d6003",
  "kind": "task",
  "status": {
    "state": "completed",
    "timestamp": "2026-05-03T10:55:20.583916+00:00"
  }
}
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐