[A2A协议与实现-04]如何使用SDK构建一个A2A Server
前面主要围绕协议层面对A2A涉及的所有操作,以及这些操作的输入输出消息类型进行了详细的介绍,我想很多人可能会觉得这些内容过于抽象和理论化了。接下来我们将介绍A2A官方提供Python SDK实现,看看它是如何构建一个A2A Server的。A2A默认支持JSON-RPC、gRPC和HTTP+JSON/REST三种标准的Binding,这里只介绍JSON-RPC的实现,其他两种Binding的实现方式在原理上是类似的。在介绍A2A SDK服务端架构设计之前,我们先通过一个实例演示如何利用A2A SDK服务端框架来构建一个Agent Server,并利用它来部署一个通过LangChain创建的Agent。我们最终会利用A2A客户端组件来调用这个Agent Server,以验证它的功能和效果。
1. 构建Agent Server
如下面的代码片段所示,我们调用LangChain的create_agent方法创建了一个简单的Agent,它使用ChatOpenAI作为模型。我们旨在利用此Agent提供一个根据天气提供着装建议,为此我们为它注册了一个用来提供天气信息的工具get_weather。然后我们基于这个Agent创建了一个代表A2A Agent Server的LangChainA2AServer实例,并调用run方法启动它。
from typing import Callable, Callable, cast
from dotenv import load_dotenv
from starlette.applications import Starlette
from langchain_core.messages import AnyMessage, AIMessage, ToolMessage
from langchain.tools import tool
from langgraph.graph.state import CompiledStateGraph
from langchain_core.runnables import RunnableConfig
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from a2a.types import Message
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import (
InMemoryPushNotificationConfigStore,
TaskUpdater,
BasePushNotificationSender,
InMemoryPushNotificationConfigStore,
InMemoryTaskStore,
)
from a2a.types import (
Part,
TaskState,
TextPart,
UnsupportedOperationError,
AgentCapabilities,
AgentCard,
AgentSkill,
)
from a2a.utils import (
new_task,
new_agent_text_message,
new_agent_parts_message,
)
from a2a.utils.errors import ServerError
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler, DefaultRequestHandler
import uvicorn, httpx
load_dotenv()
@tool
def get_weather(location: str) -> str:
"""获取指定位置的天气信息"""
return f"天气预报:{location}今天晴,最高温度25摄氏度,最低温度15摄氏度。"
agent = create_agent(
model=ChatOpenAI(model="gpt-5.2-chat"),
tools=[get_weather],
system_prompt="你是一个精通养生之道的时尚顾问,请根据天气提供着装建议。",
)
agent_card_factory = lambda host, port: AgentCard(
name="clothing_assistant",
description="提供根据天气的着装建议",
url=f"http://{host}:{port}/",
version="1.0.0",
default_input_modes=["text", "text/plain"],
default_output_modes=["text", "text/plain"],
capabilities=AgentCapabilities(streaming=True, push_notifications=True),
skills=[
AgentSkill(
id="clothing_assistant",
name="clothing_assistant",
description="获取指定位置的天气信息",
tags=["weather", "clothing", "advice"],
)
],
)
server = LangChainA2AServer(agent=agent, agent_card_factory=agent_card_factory, port= 3721)
server.run()
1.1 LangChainA2AServer
Agent Server的本质上是一个由Uvicorn构建的Web Server,作为ASGIApplication的Starlette对象由LangChainA2AServer提供。如下所示的是LangChainA2AServer的完整定义。
class LangChainA2AServer:
host: str
port: int
starlette: Starlette
def __init__(
self,
agent: CompiledStateGraph,
agent_card_factory: Callable[[str, int], AgentCard],
host: str = "localhost",
port: int = 8000,
) -> None:
self.host = host
self.port = port
config_store = InMemoryPushNotificationConfigStore()
request_handler = DefaultRequestHandler(
agent_executor=LangChainAgentExecutor(agent=agent),
task_store=InMemoryTaskStore(),
push_config_store=config_store,
push_sender=BasePushNotificationSender(
httpx_client=httpx.AsyncClient(), config_store=config_store
),
)
self.starlette = A2AStarletteApplication(
agent_card=agent_card_factory(self.host, self.port),
http_handler=request_handler,
).build()
def run(self):
uvicorn.run(self.starlette, host=self.host, port=self.port)
LangChainA2AServer的__init__方法定义了如下的参数:
- agent:一个CompiledStateGraph对象,表示要绑定的LangChain Agent;
- agent_card_factory:一个函数,接受主机地址和端口号作为输入,返回一个AgentCard对象用于在A2A协议中展示Agent的身份信息和功能描述;
- host:一个字符串,表示服务器的主机地址,默认为"localhost";
- port:一个整数,表示服务器的端口号,默认为8000;
作为ASGIApplication的Starlette对象是在__init__方法中利用A2AStarletteApplication构建的。具体来说,我们在构建A2AStarletteApplication时提供了两个对象,一个是通过agent_card_factory生成的AgentCard对象,另一个是DefaultRequestHandler对象。顾名思义,DefaultRequestHandler是A2A SDK提供的默认的请求处理器,它负责处理A2A协议定义的各种请求。为了创建此对象,我们提供了:
LangChainAgentExecutor对象,它是A2A SDK中AgentExecutor的一个实现,负责执行与LangChain Agent交互的核心逻辑;InMemoryTaskStore对象,用于存储和管理任务;InMemoryPushNotificationConfigStore对象,用于存储和管理推送通知的配置;BasePushNotificationSender对象,用于发送推送通知;
1.2 LangChainAgentExecutor
作为一个开发协议的实现框架,A2A SDK并为与特定的Agent开发框架绑定,而是提供了抽象的AgentExecutor类与具体的框架进行适配。LangChainAgentExecutor就是我们自定义的AgentExecutor类型,用于适配通过LangChain创建的Agent。如下所示的是LangChainAgentExecutor的完整定义。
class LangChainAgentExecutor(AgentExecutor):
agent: CompiledStateGraph
def __init__(self, agent: CompiledStateGraph):
self.agent = agent
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
context_id = cast(str, context.context_id)
task = context.current_task
# enqueue a newly created task
if task is None:
task = new_task(cast(Message, context.message))
await event_queue.enqueue_event(task)
updater = TaskUpdater(
event_queue=event_queue, task_id=task.id, context_id=context_id
)
config: RunnableConfig = {
"configurable": {
"thread_id": context_id,
}
}
input: dict = {
"messages": [{"role": "user", "content": context.get_user_input()}]
}
async for chunk in self.agent.astream(
input=input, config=config, stream_mode="values"
):
message: AnyMessage = chunk["messages"][-1]
if isinstance(message, AIMessage):
# ai message with tool calls.
if message.tool_calls and len(message.tool_calls) > 0:
parts = []
for tool_call in message.tool_calls:
part = TextPart(text = f"{tool_call['name']}({tool_call['id']})")
part.metadata = {
"kind": "tool_call",
"tool_name": tool_call["name"],
"tool_call_id": tool_call["id"],
"args": tool_call["args"]}
parts.append(part)
await updater.update_status(
state=TaskState.working,
message=new_agent_parts_message(
parts=parts, context_id=context_id, task_id=task.id
),
)
else:
# ai message without tool calls, treat the content as final response and complete the task.
await updater.add_artifact(
parts=[Part(root=TextPart(text=str(message.content)))],
name="result",
)
await updater.complete()
elif isinstance(message, ToolMessage):
# tool message, update the task status but keep the task open.
await updater.update_status(
state=TaskState.working,
message=new_agent_text_message(
text=str(message.content),
context_id=context_id,
task_id=task.id,
),
)
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
raise ServerError(error=UnsupportedOperationError())
AgentExecutor的核心方法execute定义了context和event_queue两个参数,前者提供的RequestContext对象包含了与当前请求相关的上下文信息;后者提供的EventQueue队列,我们利用此对象实时将事件推送到客户端,这里的事件不仅包含任务状态更新、产出物更新事件,还包括任务和消息本身。在execute方法中,我们首先检查当前上下文中是否已经存在一个任务,如果没有,我们就创建一个新的任务并将其加入事件队列。我们创建的TaskUpdater用于更新任务的状态和Artifact。
我们调用Agent的astream方法来执行Agent的逻辑,作为输入的提示词通过调用RequestContext的get_user_input方法来获取。我们将RequestContext提供的的context_id作为Agent的thread_id置于RunnableConfig中。由于我们采用的流模式为values,所以每一个涉及状态更新的推理步骤都会触发一次迭代。对于每次迭代,我们会提取对话历史的最新生成的消息,并根据消息的类型进行不同的处理:
- 如果这个消息是一个
AIMessage- 如果有工具调用,我们将工具调用的信息(工具名称、调用ID、参数等)转换成一组
TextPart,然后利用new_agent_parts_message函数将它们封装成一个A2A消息,最后通过调用TaskUpdater的update_status来更新任务的状态(working); - 如果没有工具调用,意味着整个推理过程结束,此时我们通过调用
TaskUpdater的add_artifact方法将消息的内容作为Artifact,并调用complete方法将任务状态设置为completed;
- 如果有工具调用,我们将工具调用的信息(工具名称、调用ID、参数等)转换成一组
- 如果这个消息是一个
ToolMessage,我们调用new_agent_text_message函数将消息内容转换成A2A消息,并通过调用TaskUpdater的update_status来更新任务的状态(working);
2. 使用A2A SDK调用Agent
然后我们编写了如下的程序利用A2A SDK提供的客户端组件来调用我们刚才创建的A2A Agent Server。如下面的代码片段所示,我们调用ClientFactory的connect方法连接到Agent Server(“http://localhost:3721”)来创建代表A2A客户端组件的Client对象。然后调用它的get_card方法获取AgentCard,并将其打印出来。
from typing import Any, cast
from a2a.client import ClientFactory, ClientConfig
from a2a.types import (Message,Part,Role, TextPart,Task,TaskStatusUpdateEvent, TaskArtifactUpdateEvent)
import asyncio,json,uuid, httpx
async def main():
http_client = httpx.AsyncClient(timeout=120)
client = await ClientFactory.connect(agent= "http://localhost:3721", client_config= ClientConfig(httpx_client=http_client))
agent_card = await client.get_card()
print(f"Agent Card:\n{json.dumps(agent_card.model_dump(), indent=2)}\n")
request = Message(
message_id =uuid.uuid4().hex,
role= Role.user,
parts=[Part(root= TextPart(text="出差去苏州,请问现在苏州的天气如何?我该穿什么?"))]
)
processed_messages = set()
def pretty_print_message(message:Message|None):
if message is None or message.message_id in processed_messages:
return
processed_messages.add(message.message_id)
tool_calls = [part for part in message.parts if part.root.metadata and part.root.metadata.get("kind") == "tool_call"]
if len(tool_calls) > 0:
print("tool calls:")
for part in tool_calls:
print(f" {part.root.text}") # type: ignore
metadata = part.root.metadata or {}
print(f" name: {metadata.get('tool_name')}")
print(f" id: {metadata.get('tool_call_id')}")
print(f" args: {metadata.get('args')}")
for part in (part for part in message.parts if part not in tool_calls):
if isinstance(part.root, TextPart):
print(f"content: {part.root.text}")
async for chunk in client.send_message(request=request):
if isinstance(chunk,Message):
pretty_print_message(chunk)
else:
task, event = cast(tuple[Task, Any], chunk)
if event is None:
print(f"\n{'='*30} Task created: {'='*30}")
last_message = task.history[-1] if task.history else None
pretty_print_message(last_message)
if isinstance(event, TaskStatusUpdateEvent):
print(f"\n{'='*30} Status updated: {'='*30}")
print(f"Task status: {event.status.state}")
last_message = task.history[-1] if task.history else None
pretty_print_message(last_message)
elif isinstance(event, TaskArtifactUpdateEvent):
print(f"\n{'='*30} Artifact updated: {'='*30}")
for part in event.artifact.parts:
if isinstance(part.root, TextPart):
print(f"content: {part.root.text}")
asyncio.run(main())
接下里通过调用Client的send_message方法以A2A协议远程调用我们部署的Agent。我们将提示词疯转到一个A2A消息中,并将其作为输入传递给send_message方法。由于send_message方法返回一个异步生成器,我们可以在一个异步循环中处理它的输出。每次迭代,我们都会收到一个块(chunk),它可能是一个Message对象,也可能是一个包含Task和事件的元组。我们在循环中处理每个收到的响应块,如果块是一个Message对象,我们就调用pretty_print_message函数来格式化输出消息内容;如果块是一个包含Task和事件的元组,我们根据事件类型(TaskStatusUpdateEvent或TaskArtifactUpdateEvent)来打印相应的信息,并调用pretty_print_message函数来输出相关消息内容。
整个客户端程序会生成如下两端输出,前面一段是作为Agent身份信息的AgentCard的内容;后面一段则是我们以流的方式远程调用Agent实时收到的响应内容。
{
"additionalInterfaces": null,
"capabilities": {
"extensions": null,
"pushNotifications": true,
"stateTransitionHistory": null,
"streaming": true
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"description": "\u63d0\u4f9b\u6839\u636e\u5929\u6c14\u7684\u7740\u88c5\u5efa\u8bae",
"documentationUrl": null,
"iconUrl": null,
"name": "clothing_assistant",
"preferredTransport": "JSONRPC",
"protocolVersion": "0.3.0",
"provider": null,
"security": null,
"securitySchemes": null,
"signatures": null,
"skills": [
{
"description": "\u83b7\u53d6\u6307\u5b9a\u4f4d\u7f6e\u7684\u5929\u6c14\u4fe1\u606f",
"examples": null,
"id": "clothing_assistant",
"inputModes": null,
"name": "clothing_assistant",
"outputModes": null,
"security": null,
"tags": [
"weather",
"clothing",
"advice"
]
}
],
"supportsAuthenticatedExtendedCard": null,
"url": "http://localhost:3721/",
"version": "1.0.0"
}
============================== Task created: ==============================
content: 出差去苏州,请问现在苏州的天气如何?我该穿什么?
============================== Status updated: ==============================
Task status: TaskState.working
tool calls:
get_weather(call_ySD5swYhhGA98COU6R1e7MYc)
name: get_weather
id: call_ySD5swYhhGA98COU6R1e7MYc
args: {'location': '苏州'}
============================== Status updated: ==============================
Task status: TaskState.working
content: 天气预报:苏州今天晴,最高温度25摄氏度,最低温度15摄氏度。
============================== Artifact updated: ==============================
content: 好的,我结合**苏州当前天气**和**养生+时尚**两个角度给你一些贴心建议 🌿👔
### 🌤 苏州今天天气概况
- **天气**:晴朗
- **气温**:15℃ ~ 25℃
- **特点**:早晚偏凉,中午温暖,春季昼夜温差较明显
---
## 👗 出差着装建议(养生 × 得体)
### ✅ 白天 / 外出洽谈
- **上身**:
- 薄西装外套 / 针织开衫 / 风衣(不厚重)
- 内搭棉质衬衫或薄长袖,透气又稳重
- **下身**:
- 西裤 / 休闲西裤 / 及踝半裙
- **鞋子**:
- 皮鞋、乐福鞋或舒适的低跟鞋,适合走路
👉 中午25℃左右,**外套可随时脱下**,避免出汗后受凉,符合养生“春不捂过头”的原则。
---
### 🌙 早晚 & 空调环境
- **必备一件**:薄外套或披肩
- 苏州湿度相对偏高,**注意护住颈肩和腹部**,防止受凉引起疲劳或肠胃不适。
---
## 🌿 养生小提醒
- 春季宜“**轻暖不燥**”:
- 面料选 **棉、麻、天丝**,少穿闷热化纤
- 可选择**浅色系**(米白、浅灰、雾蓝),顺应春气,也显得清爽专业
- 随身带点温水,少喝冰饮,护脾胃
---
如果你能告诉我:
👉 **出差偏商务还是偏休闲?**
👉 **男性 / 女性?**
我可以帮你搭配一套更具体、更好看的出差穿搭方案 😊
============================== Status updated: ==============================
Task status: TaskState.completed
3. 直接以HTTP请求的方式调用Agent
既然我们构建的Agent Server是一个基于Uvicorn的Web Server,那么理论上我们也可以直接以HTTP请求的方式来调用它,而不需要通过A2A SDK提供的客户端组件。我们可以使用任何支持HTTP请求的工具或库来发送请求,例如curl、Postman或者httpx等。下面是一个使用httpx库发送HTTP请求的示例代码片段。
from a2a.types import (
Message,
SendMessageRequest,
MessageSendParams,
MessageSendConfiguration,
PushNotificationConfig,
Role,
Part,
TextPart,
)
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response
import asyncio, json, uuid, httpx, uvicorn
from concurrent.futures import ThreadPoolExecutor
async def handle_notification(request: Request) -> Response:
body = await request.body()
print(
f"Get Result:\n{json.dumps(json.loads(body.decode('utf-8')), indent=2, ensure_ascii=False)}"
)
return Response(status_code=204)
def run_listener():
starlette = Starlette()
starlette.add_route(path="/webhook", route=handle_notification, methods=["POST"])
ThreadPoolExecutor().submit(
lambda: uvicorn.run(starlette, host="0.0.0.0", port=9527)
)
async def main():
run_listener()
await asyncio.sleep(5) # 等待服务器启动
request = Message(
message_id=uuid.uuid4().hex,
role=Role.user,
parts=[
Part(
root=TextPart(text="出差去苏州,请问现在苏州的天气如何?我该穿什么?")
)
],
)
http_client = httpx.AsyncClient(timeout=120)
config = PushNotificationConfig(url="http://localhost:9527/webhook")
request = SendMessageRequest(
id=uuid.uuid4().hex,
params=MessageSendParams(
configuration=MessageSendConfiguration(push_notification_config=config),
message=request,
),
)
await http_client.post(
"http://localhost:3721", json=request.model_dump(mode="json", exclude_none=True)
)
while True:
await asyncio.sleep(1)
asyncio.run(main())
为了演示从服务端推送通知的功能,我们利用uvicorn构建了一个简单的Web Server来接收推送通知。这个Web Server监听在9527端口,并且定义了一个路由"/webhook"来处理POST请求。当收到推送通知时,它会将通知的内容打印出来。我们在main函数中首先启动这个Web Server,然后构造一个SendMessageRequest对象,其中包含了一个Message对象作为输入,以及一个PushNotificationConfig对象来配置推送通知的URL。最后我们使用httpx库发送HTTP POST请求到Agent Server的地址(http://localhost:3721),并将SendMessageRequest对象作为请求体发送出去。
当Agent Server处理这个请求并完成任务后,它会根据PushNotificationConfig中配置的URL将结果推送到我们定义的Webhook上,我们就可以在控制台看到推送通知的内容。
{
"artifacts": [
{
"artifactId": "181734b6-96ca-483c-b3ef-c27720d8f4bd",
"name": "result",
"parts": [
{
"kind": "text",
"text": "苏州今日**晴朗,15–25℃,早晚偏凉、白天温暖**,很适合走“**商务得体 + 养生舒适**”路线。给你一套分场景的穿搭建议:\n\n### 🌿 商务正式(会议/拜访客户)\n**上装**\n- 轻薄西装外套或挺括风衣(深蓝、灰、米色皆宜)\n- 内搭透气衬衫或薄款针织衫(棉、真丝或天丝材质,利于养气)\n\n**下装**\n- 西装裤或垂感好的直筒裤 \n- 避免过厚面料,苏州湿润,透气很重要\n\n**鞋履**\n- 皮质商务鞋或简约乐福鞋 \n- 建议软底,护足养肾,长时间走路不累\n\n---\n\n### 🌤 商务+出行(白天外出、考察)\n- 可脱卸的**薄西装/针织外套**(应对早晚温差)\n- 内搭**POLO衫或轻商务衬衫**\n- **防晒但透气**,晴天紫外线不低\n\n---\n\n### 🌙 晚间/非正式应酬\n- 轻薄针织衫 + 休闲西裤 \n- 若去园林或水边,**带一条薄围巾**,防凉护颈\n\n---\n\n### 🌱 养生小贴士(很重要)\n- 苏州湿气偏重: \n 👉 少穿完全不透气的化纤面料 \n 👉 选择**棉、麻、真丝混纺**\n- 鞋袜保持干爽,避免湿气入体\n- 颜色上推荐**浅色系**,顺应春末初夏养阳之道\n\n如果你愿意告诉我**性别、是否偏正式、是否需要拍照/上镜**,我可以直接给你一套“拎包即走”的完整搭配。"
}
]
}
],
"context_id": "83618cbd-4ef1-4410-bd92-ccf0712a25bf",
"history": [
{
"context_id": "83618cbd-4ef1-4410-bd92-ccf0712a25bf",
"kind": "message",
"messageId": "d0d7e2f441ea4522b3e4e2c7fcb3e827",
"parts": [
{
"kind": "text",
"text": "What should I wear for a business trip to Suzhou?"
}
],
"role": "user",
"taskId": "ab2d2177-7513-499d-a1e8-37b7d31d6003"
},
{
"context_id": "83618cbd-4ef1-4410-bd92-ccf0712a25bf",
"kind": "message",
"messageId": "f2128450-51b0-4595-b2bc-ff285bf6dbef",
"parts": [
{
"kind": "text",
"metadata": {
"kind": "tool_call",
"tool_name": "get_weather",
"tool_call_id": "call_WlCFfor9lkWm37FDTt7OUc89",
"args": {
"location": "Suzhou, China"
}
},
"text": "get_weather(call_WlCFfor9lkWm37FDTt7OUc89)"
}
],
"role": "agent",
"taskId": "ab2d2177-7513-499d-a1e8-37b7d31d6003"
},
{
"context_id": "83618cbd-4ef1-4410-bd92-ccf0712a25bf",
"kind": "message",
"messageId": "c87a6de9-4ee0-40da-9515-87c83dd85687",
"parts": [
{
"kind": "text",
"text": "天气预报:苏州今天晴,最高温度25摄氏度,最低温度15摄氏度。"
}
],
"role": "agent",
"taskId": "ab2d2177-7513-499d-a1e8-37b7d31d6003"
}
],
"id": "ab2d2177-7513-499d-a1e8-37b7d31d6003",
"kind": "task",
"status": {
"state": "completed",
"timestamp": "2026-05-03T10:55:20.583916+00:00"
}
}
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)