A2A与MCP相关
Agent技术文档总结
本文档总结了一些核心技术,包括 Function Call 函数调用、MCP 协议、Agent 智能体以及 A2A 协议。内容涵盖概念、原理、使用方式及代码示例。
1. Function Call 函数调用
学习目标
- 熟悉 Function Call 概念和工作原理
- 熟悉 Function Call 的基本使用
1.1 什么是 Function Call
2023年6月13日 OpenAI 公布了 Function Call 功能,允许开发者向 GPT 模型描述函数,模型会智能地输出一个包含调用这些函数参数的 JSON 对象,从而更可靠地将 GPT 与外部工具和 API 相连接。
Function Call 解决大模型的问题:
- 信息实时性:获取实时数据(如新闻、股价)。
- 数据局限性:调用外部数据库或 API 获取专业领域信息。
- 功能扩展性:扩展模型能力,如调用外部工具进行复杂计算。
支持 Function Call 的模型包括 GPT 系列以及国内模型如百度文心一言、ChatGLM3-6B、讯飞星火3.0以及后续出现的基本的性能更泛用的模型如Qwen3.5等等。
1.2 Function Call 工作原理
无 Function Call 的流程:
用户请求 → 服务给 GPT 提示词 → 返回文本。
有 Function Call 的流程:
- 用户请求,服务将提示词和可调用函数发送给大模型。
- 模型判断用普通文本还是函数调用格式响应。
- 若为函数调用,服务执行该函数,并将结果返回给模型。
- 模型利用提供的数据生成连贯文本回答。
注意:大模型的 Function Call 不会执行任何函数调用,仅返回调用函数所需的参数,开发者需在应用中执行。
1.3 Function Call 使用方式
1.3.1 自定义 tool 结构
通过手动编写 JSON Schema 描述工具。
导包:
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, ToolMessage
from agent_learn.config import Config
conf = Config()
定义外部函数:
def add(a: int, b: int) -> int:
"""将数字a与数字b相加"""
return a + b
def multiply(a: int, b: int) -> int:
"""将数字a与数字b相乘"""
return a * b
描述函数功能(JSON Schema):
tools = [
{
"type": "function",
"function": {
"name": "add",
"description": "将数字a与数字b相加",
"parameters": {
"type": "object",
"properties": {
"a": {"type": "integer", "description": "第一个数字"},
"b": {"type": "integer", "description": "第二个数字"}
},
"required": ["a", "b"]
}
}
},
{
"type": "function",
"function": {
"name": "multiply",
"description": "将数字a与数字b相乘",
"parameters": {
"type": "object",
"properties": {
"a": {"type": "integer", "description": "第一个数字"},
"b": {"type": "integer", "description": "第二个数字"}
},
"required": ["a", "b"]
}
}
}
]
模型实例化并绑定工具:
llm = ChatOpenAI(base_url=conf.base_url, api_key=conf.api_key, model=conf.model_name, temperature=0.1)
llm_with_tools = llm.bind_tools(tools, tool_choice="auto")
模型调用与处理:
query = "2+1等于多少?"
messages = [HumanMessage(query)]
ai_msg = llm_with_tools.invoke(messages)
messages.append(ai_msg)
if hasattr(ai_msg, 'tool_calls') and ai_msg.tool_calls:
for tool_call in ai_msg.tool_calls:
selected_tool = {"add": add, "multiply": multiply}[tool_call["name"].lower()]
tool_output = selected_tool(**tool_call["args"])
messages.append(ToolMessage(content=tool_output, tool_call_id=tool_call["id"]))
final_response = llm_with_tools.invoke(messages)
print(final_response.content)
else:
print(ai_msg.content)
1.3.2 装饰器 tool 方式
使用 @tool 装饰器自动生成工具定义。
from langchain_core.tools import tool
@tool
def add(a: int, b: int) -> int:
"""将数字a与数字b相加"""
return a + b
@tool
def multiply(a: int, b: int) -> int:
"""将数字a与数字b相乘"""
return a * b
tools = [add, multiply]
llm_with_tools = llm.bind_tools(tools)
# 后续调用同上
1.3.3 Pydantic 的 tool 方式
使用 Pydantic 模型定义工具参数,并手动实现 invoke 方法。
from pydantic.v1 import BaseModel, Field
class Add(BaseModel):
"""将两个数字相加"""
a: int = Field(..., description="第一个数字")
b: int = Field(..., description="第二个数字")
def invoke(self, args):
tool_instance = self.__class__(**args)
return tool_instance.a + tool_instance.b
class Multiply(BaseModel):
"""将两个数字相乘"""
a: int = Field(..., description="第一个数字")
b: int = Field(..., description="第二个数字")
def invoke(self, args):
tool_instance = self.__class__(**args)
return tool_instance.a * tool_instance.b
tools = [Add, Multiply]
llm_with_tools = llm.bind_tools(tools)
# 调用时需实例化工具类并调用 invoke
三种方式对比:
| 特性 | JSON Schema | @tool 装饰器 | Pydantic |
|---|---|---|---|
| 定义方式 | 手动编写字典 | 装饰 Python 函数 | 继承 BaseModel |
| 自动化程度 | 低 | 高 | 中等 |
| 数据验证 | 需手动 | 基础类型检查 | 强大验证 |
| 适用场景 | 与其他系统集成 | 快速开发 | 复杂数据验证 |
1.4 Agent 调用 tool
Agent 是基于大模型的智能体,具备任务规划、工具调用和记忆能力。以下示例使用 LangChain 的 Agent 调用工具。
from langchain.agents import initialize_agent, AgentType
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from agent_learn.config import Config
conf = Config()
@tool
def add(a: int, b: int) -> int:
return a + b
@tool
def multiply(a: int, b: int) -> int:
return a * b
tools = [add, multiply]
llm = ChatOpenAI(base_url=conf.base_url, api_key=conf.api_key, model=conf.model_name, temperature=0.1)
agent = initialize_agent(tools, llm, AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, verbose=True)
result = agent.invoke({"input": "2+1等于多少?"})
print(result["output"])
本节小结
本节介绍了 Function Call 的概念、工作原理及三种使用方式(自定义 JSON、装饰器、Pydantic),并演示了 Agent 如何调用工具。
2. MCP 协议
学习目标
- 理解协议在 AI 中的作用
- 掌握 MCP 的核心概念及应用
- 熟悉 MCP 的基本使用
2.1 背景
传统的工具调用由应用开发者负责描述工具,存在描述不一致、重复劳动等问题。MCP 协议旨在让工具“自描述”,实现“一次编写,到处调用”。
2.2 什么是 MCP 协议
MCP(Model Context Protocol)是由 Anthropic 提出的开放协议,旨在实现 LLM 与外部数据源和工具的无缝集成,被称为 AI 世界的“Type-C”接口。
2.2.1 MCP 核心架构
- MCP 服务端 (Tool Provider):将本地函数包装成标准 MCP 接口暴露出去。
- MCP 客户端 (Tool Consumer):连接到服务端,查询并调用工具。
2.2.2 MCP 工具调用流程
- 客户端注册并连接 MCP Server,获取工具描述列表。
- 客户端将工具元信息上报给 LLM。
- LLM 决定调用工具,生成 function calling 指令。
- 客户端通过 stdio 或 WebSocket 发送请求给服务端。
- 服务端执行工具逻辑并返回结果。
- 客户端将结果包装为 ToolMessage 返回给 LLM,生成最终回答。
2.2.3 MCP 的通信传输方式
- stdio:本地进程间通信,客户端启动服务端子进程,通过标准输入输出交互。
- SSE (Server-Sent Events):基于 HTTP 的单向推送协议,适用于网络环境。
- Streamable-HTTP:双向流式传输,支持复杂实时通信(已取代旧版 HTTP+SSE)。
| 传输方式 | 通信方向 | 适用场景 |
|---|---|---|
| stdio | 双向(请求-响应) | 本地命令行工具封装 |
| SSE | 单向(服务器推送) | 简单单向数据推送 |
| Streamable-HTTP | 双向 | 复杂实时双向流 |
2.3 mcp 包使用
2.3.1 stdio 传输方式
服务端 (server_stdio.py):
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("sdg", log_level="ERROR")
@mcp.tool(name="query_high_frequency_question", description="从知识库中检索常见问题解答")
async def query_high_frequency_question() -> str:
return "问题是: 恐龙是怎么灭绝的?"
@mcp.tool(name="get_weather", description="查询天气")
async def get_weather() -> str:
return "北京的天气是多云"
if __name__ == "__main__":
mcp.run(transport="stdio")
客户端直接调用 (client_raw.py):
import asyncio
from langchain_mcp_adapters.tools import load_mcp_tools
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
server_script = r".\server_stdio.py"
server_params = StdioServerParameters(command="python", args=[server_script])
async def run():
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
tools = await load_mcp_tools(session)
response = await session.call_tool("get_weather", arguments={})
print(response)
asyncio.run(run())
客户端通过 Agent 调用 (client_agent.py):
import asyncio
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_openai import ChatOpenAI
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from agent_learn.config import Config
conf = Config()
llm = ChatOpenAI(base_url=conf.base_url, api_key=conf.api_key, model=conf.model_name, temperature=0.1)
server_script = r".\server_stdio.py"
server_params = StdioServerParameters(command="python", args=[server_script])
async def run_agent():
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
tools = await load_mcp_tools(session)
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个乐于助人的助手,能够调用工具回答用户问题。"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
agent = create_tool_calling_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
response = await agent_executor.ainvoke({"input": "北京的天气怎么样?"})
print(response)
asyncio.run(run_agent())
2.3.2 SSE 传输方式
服务端 (server_sse.py):
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("sdg", log_level="ERROR", host="127.0.0.1", port=8001)
@mcp.tool(name="query_high_frequency_question", description="从知识库中检索常见问题解答")
async def query_high_frequency_question() -> str:
return "高频问题是: 恐龙是怎么灭绝的?"
@mcp.tool(name="get_weather", description="查询天气")
async def get_weather() -> str:
return "北京的天气是多云"
if __name__ == "__main__":
mcp.run(transport="sse")
客户端直接调用 (client_raw.py):
import asyncio
from mcp import ClientSession
from mcp.client.sse import sse_client
from langchain_mcp_adapters.tools import load_mcp_tools
server_url = "http://localhost:8001/sse"
async def run():
async with sse_client(url=server_url) as streams:
async with ClientSession(*streams) as session:
await session.initialize()
tools = await load_mcp_tools(session)
response = await session.call_tool("get_weather", arguments={})
print(response)
asyncio.run(run())
客户端通过 Agent 调用 (client_agent.py): 类似 stdio 版本,使用 sse_client 替代。
2.3.3 Streamable 方式
服务端 (server_streamable.py):
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("sdg", log_level="ERROR", host="127.0.0.1", port=8001)
# 工具定义同上
if __name__ == "__main__":
mcp.run(transport="streamable-http")
客户端直接调用 (client_raw.py): 使用 streamablehttp_client。
2.4 python_a2a 包使用
python_a2a 是 A2A 协议的 Python 实现,也可用于创建 MCP 服务端。
服务端 (server_a2a.py):
import logging
import uvicorn
from python_a2a.mcp import FastMCP, create_fastapi_app
mcp = FastMCP(name="MyMCPTools", description="提供高频问题和天气查询工具", version="1.0.0")
@mcp.tool(name="query_high_frequency_question", description="获取知识库中的高频问答")
async def query_high_frequency_question(**kwargs) -> str:
return '{"status": "success", "data": [{"question_id": 1, "question_text": "恐龙是怎么灭绝的?", "answer_text": "可能是小行星撞击", "category": "历史", "frequency_score": 0.9}]}'
@mcp.tool(name="get_weather", description="查询天气")
async def get_weather(**kwargs) -> str:
return '{"status": "success", "data": "北京的天气是多云"}'
if __name__ == "__main__":
app = create_fastapi_app(mcp)
uvicorn.run(app, host="127.0.0.1", port=8010)
客户端直接调用 (client_raw.py):
import asyncio
from python_a2a.mcp import MCPClient
async def main():
client = MCPClient("http://localhost:8010")
tools = await client.get_tools()
result = await client.call_tool("get_weather")
print(result)
await client.close()
asyncio.run(main())
客户端通过 Agent 调用 (client_agent.py): 使用 to_langchain_tool 转换工具。
本节小结
本小节介绍了 MCP 协议的概念、核心架构、通信传输方式,并通过 stdio、SSE、Streamable 三种方式演示了 MCP 服务端和客户端的实现,以及与 LangChain Agent 的结合。
3. Agent 智能体
学习目标
- 理解什么是 Agent
- 理解什么是 Agentic
- 知道 Agent 的五种模式
3.1 什么是 Agent
Agent 是一个能够感知环境、做出决策并采取行动来完成特定目标的智能体。基础 LLM 已具备初步的 Agent 能力,但真正的 Agent 需要更强的能动性,能够自主反思、规划、协作。
3.2 什么是 Agentic
Agentic 描述一个系统表现出的“像 Agent 一样的程度”,即自主性、目标导向性和主动性。高 Agentic 系统能主动思考、修正错误、调用工具、分解任务。
3.3 Agent 五种模式
3.3.1 工具使用模式 (Tool use pattern)
Agent 调用外部工具弥补自身知识不足,通常用于单步任务。
工作流程: 用户提问 → LLM 判断需工具 → 调用工具 → 给出答案。
3.3.2 ReAct 模式 (ReAct Pattern)
将思考(Reasoning)与行动(Acting)结合,形成动态循环。
工作流程: 思考 → 行动 → 观察 → 循环直至目标达成。
3.3.3 反思模式 (Reflection pattern)
Agent 在完成任务后自我评估和反思,根据反思结果修正。
工作流程: 用户提问 → LLM 给出初稿 → 用户反馈 → LLM 反思 → 调整 → 最终答案。
3.3.4 规划模式 (Planning Pattern)
将复杂任务分解为有序步骤,再逐一执行(每步可能是一个 ReAct 循环)。
工作流程: 规划(分解任务)→ 执行(按顺序执行子任务)。
3.3.5 多智能体模式 (Multi-agent Pattern)
多个具有不同角色的 Agent 协同工作,模拟人类团队协作。
工作流程: 用户需求 → 协调者分配任务 → 各专家 Agent 执行 → 结果汇总。
3.3.6 Agent 模式的演进关系
Tool Use → ReAct → Planning → Reflection → Multi-Agent,层层递进,复杂任务常组合使用多种模式。
3.4 代码实战
3.4.1 工具使用模式
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from agent_learn.config import Config
conf = Config()
llm = ChatOpenAI(base_url=conf.base_url, api_key=conf.api_key, model=conf.model_name, temperature=0.1)
@tool
def multiply(a: int, b: int) -> int:
return a * b
@tool
def search_weather(city: str) -> str:
if "北京" in city:
return "北京今天是晴天,气温25摄氏度。"
return f"抱歉,我没有'{city}'的天气信息。"
tools = [multiply, search_weather]
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个强大的AI助手,可以访问和使用各种工具来回答问题。"),
("user", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
agent = create_tool_calling_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
executor.invoke({"input": "上海今天的天气怎么样?"})
3.4.2 ReAct 模式
from langchain.agents import AgentExecutor, create_react_agent
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
@tool
def multiply(numbers_str: str) -> int:
"""计算两个整数的乘积,输入格式:'100,25'"""
a, b = map(int, numbers_str.split(','))
return a * b
tools = [multiply, search_weather]
react_prompt = ChatPromptTemplate.from_template("""
你是一个有用的AI助手,可以访问以下工具:{tools}
请根据用户输入一步步推理,按规则操作:Thought: ... Action: ... Action Input: ... Final Answer: ...
可用的工具名称:{tool_names}
用户输入:{input}
{agent_scratchpad}
""")
agent = create_react_agent(llm, tools, react_prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)
executor.invoke({"input": "100乘以25等于多少?"})
3.4.3 反思模式
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
initial_prompt = ChatPromptTemplate.from_template("请根据以下问题给出你的初步回答: {question}")
initial_chain = initial_prompt | llm | StrOutputParser()
reflection_prompt = ChatPromptTemplate.from_template("""
你之前给出了以下回答:{previous_response}
现在收到用户反馈:{user_feedback}
请反思并生成新回答。
""")
reflection_chain = reflection_prompt | llm | StrOutputParser()
def reflect_and_refine(query, feedback):
initial = initial_chain.invoke({"question": query})
refined = reflection_chain.invoke({"previous_response": initial, "user_feedback": feedback})
return refined
reflect_and_refine("请用一句话介绍一下 LangChain。", "你的回答太简单了,请更详细地解释核心概念。")
3.4.4 规划模式
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain.agents import AgentExecutor, create_react_agent
planner_prompt = ChatPromptTemplate.from_template("将以下任务分解为简单步骤:{user_input}")
planner_chain = planner_prompt | llm | StrOutputParser()
# 执行者使用 ReAct Agent
executor_agent = create_react_agent(llm, tools, react_prompt)
executor = AgentExecutor(agent=executor_agent, tools=tools, verbose=True)
def execute_plan(query):
plan = planner_chain.invoke({"user_input": query})
tasks = [task.strip() for task in plan.split('\n') if task.strip()]
for task in tasks:
executor.invoke({"input": task})
3.4.5 多智能体模式
创建两个专家 Agent:数学专家和信息专家,由主程序协调。
math_tools = [multiply, add]
math_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个数学计算专家。"),
("user", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
math_agent = create_tool_calling_agent(llm, math_tools, math_prompt)
math_executor = AgentExecutor(agent=math_agent, tools=math_tools, verbose=True)
info_tools = [search_weather, get_current_date]
info_agent = create_tool_calling_agent(llm, info_tools, info_prompt)
info_executor = AgentExecutor(agent=info_agent, tools=info_tools, verbose=True)
def multi_agent_workflow(query, math_task, info_task):
math_result = math_executor.invoke({"input": math_task})["output"]
info_result = info_executor.invoke({"input": info_task})["output"]
# 汇总结果
return f"计算结果:{math_result}\n信息查询:{info_result}"
本节小结
本部分介绍了 Agent 的概念、Agentic 特性,并详细阐述了五种 Agent 模式(工具使用、ReAct、反思、规划、多智能体),给出了相应的代码实现。
4. A2A 协议
学习目标
- 什么是 A2A 协议
- A2A 协议核心概念
- 实现 A2A 协议实战
4.1 Agent2Agent Protocol
2025年4月9日 Google 发布 A2A 协议,为不同类型的智能体之间搭建高效沟通与协作的桥梁。
核心特性:
- 安全协作:身份验证、数据加密。
- 任务与状态管理:跟踪任务生命周期。
- 用户体验协商:智能体间协商调整交互。
- 能力发现:通过 AgentCard 自动发现其他智能体的能力。
4.1.1 Agent2Agent 架构剖析
- User:用户,负责认证授权。
- Client Agent:任务发起者。
- Server Agent:任务执行者。
通信基于任务的请求与响应机制,一个 Agent 可同时作为客户端和服务端。
4.1.2 Agent2Agent 核心概念
AgentSkill:描述代理的具体能力。
from python_a2a import AgentSkill
skill = AgentSkill(name="book_ticket", description="预订火车票", examples=["预订从上海到北京的火车票"])
AgentCard:代理的元数据名片,用于服务发现。
from python_a2a import AgentCard
card = AgentCard(name="TicketAgent", description="一个可以预订票务的代理", url="http://127.0.0.1:5009", skills=[skill])
Task:具有明确状态的实体,由 Client 创建,Server 维护状态。
from python_a2a import Task, Message, MessageRole, TextContent
message = Message(content=TextContent(text="查询天气"), role=MessageRole.USER)
task = Task(message=message.to_dict())
TaskState:任务状态枚举(SUBMITTED, WAITING, INPUT_REQUIRED, COMPLETED, CANCELED, FAILED, UNKNOWN)。
TaskStatus:任务状态对象,包含 state、message、timestamp。
from python_a2a import TaskStatus, TaskState
status = TaskStatus(state=TaskState.COMPLETED)
A2AServer:构建代理服务器的核心类。
from python_a2a import A2AServer, run_server
class MyServer(A2AServer):
def handle_task(self, task):
# 处理任务
return task
server = MyServer(agent_card=card)
run_server(server, host="127.0.0.1", port=5009)
artifacts:任务执行后的输出产物。
task.artifacts = [{"parts": [{"type": "text", "text": "结果"}]}]
AgentNetwork:管理多个代理的网络。
from python_a2a import AgentNetwork
network = AgentNetwork(name="MyNetwork")
network.add("TicketAgent", "http://127.0.0.1:5009")
client = network.get_agent("TicketAgent")
AIAgentRouter:使用 LLM 智能路由查询到合适代理。
from python_a2a import AIAgentRouter
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(...)
router = AIAgentRouter(llm_client=llm, agent_network=network)
agent_name, confidence = router.route_query("预订票")
4.2 A2AServer 结合 MCP Server
演示 A2A 服务器内部调用 MCP 工具。
MCP 工具服务 (mcp_weather_tool_agent.py):
from python_a2a.mcp import FastMCP, create_fastapi_app
import uvicorn
mcp = FastMCP(name="WeatherTool")
@mcp.tool(name="get_weather", description="获取城市天气")
def get_weather(city: str) -> str:
if city == "北京":
return "北京今天阳光明媚,29°C"
return f"找不到 {city} 的天气"
if __name__ == "__main__":
app = create_fastapi_app(mcp)
uvicorn.run(app, host="127.0.0.1", port=6005)
A2A 服务器 (a2a_weather_agent.py):
from python_a2a import A2AServer, run_server, AgentCard, AgentSkill, TaskStatus, TaskState
from python_a2a.mcp import MCPClient
import asyncio
agent_card = AgentCard(name="WeatherServer", description="用来查询天气", url="http://127.0.0.1:8005", skills=[AgentSkill(name="查询天气", description="查询指定城市的天气")])
class WeatherServer(A2AServer):
def __init__(self):
super().__init__(agent_card=agent_card)
self.mcp_client = MCPClient("http://127.0.0.1:6005")
def handle_task(self, task):
query = task.message.get("content", {}).get("text", "")
if "天气" in query:
city = "北京" # 简化提取
weather_result = asyncio.run(self.mcp_client.call_tool("get_weather", city=city))
task.artifacts = [{"parts": [{"type": "text", "text": weather_result}]}]
task.status = TaskStatus(state=TaskState.COMPLETED)
return task
if __name__ == "__main__":
server = WeatherServer()
run_server(server, host="127.0.0.1", port=8005)
客户端调用 (main_client.py):
import asyncio
from python_a2a import A2AClient
async def main():
client = A2AClient("http://127.0.0.1:8005")
result = client.ask("请帮我查一下北京的天气")
print(result)
asyncio.run(main())
4.3 A2AServer 串行
实现两个 Agent 串行协作:先查天气,再根据天气订票。
天气 Agent (weather_agent.py) 监听 5008,票务 Agent (ticket_agent.py) 监听 5009。
主控协调 (main_orchestrator.py):
import asyncio
from python_a2a import AgentNetwork, Task, Message, MessageRole, TextContent
import uuid
async def main():
network = AgentNetwork(name="TravelOrchestrator")
network.add("TicketAgent", "http://127.0.0.1:5009")
network.add("WeatherAgent", "http://127.0.0.1:5008")
# 第一步:查天气
weather_client = network.get_agent("WeatherAgent")
task_weather = Task(id=f"task-{uuid.uuid4()}", message=Message(content=TextContent(text="北京的天气怎么样"), role=MessageRole.USER).to_dict())
weather_result = await weather_client.send_task_async(task_weather)
weather_info = weather_result.artifacts[0]["parts"][0]["text"]
# 第二步:订票
ticket_client = network.get_agent("TicketAgent")
ticket_query = f"预订一张从北京到上海的火车票,当前天气是:{weather_info}"
task_ticket = Task(id=f"task-{uuid.uuid4()}", message=Message(content=TextContent(text=ticket_query), role=MessageRole.USER).to_dict())
ticket_result = await ticket_client.send_task_async(task_ticket)
print(ticket_result)
asyncio.run(main())
4.4 A2A 实战案例
构建一个 LLM 驱动的路由系统,包含路由服务器、天气代理、票务代理。
路由服务器 (router_A2Aagent_Server.py): 使用 LangChain LLM 转换为 A2A 服务器。
from langchain_openai import ChatOpenAI
from python_a2a import run_server
from python_a2a.langchain import to_a2a_server
import asyncio
from agent_learn.config import Config
conf = Config()
llm = ChatOpenAI(base_url=conf.base_url, api_key=conf.api_key, model=conf.model_name, temperature=0.1, streaming=True)
llm_server = to_a2a_server(llm)
asyncio.run(run_server(llm_server, port=5555))
天气代理 (weather_agent.py) 和 票务代理 (ticket_agent.py) 同上。
主控客户端 (main.py): 使用 AgentNetwork 和 AIAgentRouter 路由查询。
import asyncio
from python_a2a import AgentNetwork, AIAgentRouter, A2AClient, Task, Message, MessageRole, TextContent
import uuid
async def main():
network = AgentNetwork(name="TravelAgentNetwork")
network.add("TicketAgent", "http://127.0.0.1:5009")
network.add("WeatherAgent", "http://127.0.0.1:5008")
router = AIAgentRouter(llm_client=A2AClient("http://localhost:5555"), agent_network=network)
queries = ["帮我查下北京的天气", "预订一张从北京到上海的火车票"]
for query in queries:
agent_name, confidence = router.route_query(query)
if agent_name:
agent_client = network.get_agent(agent_name)
task = Task(id=f"task-{uuid.uuid4()}", message=Message(content=TextContent(text=query), role=MessageRole.USER).to_dict())
result = await agent_client.send_task_async(task)
print(result.to_dict())
asyncio.run(main())
4.5 扩展:多意图处理 (multi_intents.py)
演示将多意图查询分解为子查询,并行路由到不同 Agent。
# 使用 LLM 分解查询
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
import json, re, asyncio
decompose_chain = PromptTemplate.from_template("将查询分解为子查询,返回JSON:{{'sub_queries': [...]}}") | llm | StrOutputParser()
# 主逻辑:分解 -> 路由 -> 并行执行 -> 收集结果
本节小结
本小节介绍了 A2A 协议的核心概念(AgentSkill, AgentCard, Task 等),并通过多个实战案例演示了 A2A 服务器的创建、与 MCP 的结合、串行协作以及基于 LLM 的路由系统。
总结
本文档系统总结了 SmartVoyage 项目涉及的四项关键技术:Function Call 实现了大模型与外部工具的交互基础;MCP 协议规范了工具的描述与调用,使工具可复用;Agent 智能体通过多种模式(工具使用、ReAct、反思、规划、多智能体)赋予大模型自主解决问题的能力;A2A 协议则打通了不同 Agent 之间的协作通道。这些技术共同构成了现代智能体应用的核心架构。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)