从“协议标准化“到“工具落地“:AI Agent 的通信协议与执行闭环

本篇导读:本文是 AI Agent 逻辑层深度解析系列的下篇,聚焦"执行层"核心技术——标准化协议与工具调用。结合 AWS Bedrock、LangGraph、Redis 工业级实践,通过 3 个完整实战场景、4 张 Mermaid 架构/流程图、6 段可直接复用代码,系统拆解如何让多 Agent 统一通信、安全可靠地执行工具调用,并提供外卖客服与医疗预约两个端到端完整实现,直击"协议不兼容""工具调用失控"两大落地痛点。
系列说明:本文为下篇,重点讲解标准化协议与工具调用;上篇已深入讲解意图识别与槽位填充,建议先阅读上篇。
1. 承上启下:从"理解"到"执行"的关键桥梁
在上篇中,我们深入拆解了 AI Agent 如何"听懂"用户需求(意图识别)并将其转化为结构化参数(槽位填充)。但"听懂"只是第一步,Agent 要真正产生价值,还需要:
- 统一通信协议:让意图 Agent、槽位 Agent、工具 Agent、记忆 Agent 之间能无缝协作
- 安全工具调用:将结构化参数转化为实际的 API 调用,并处理异常、重试、回滚
本文(下篇)将聚焦这两大"执行层"技术,并通过外卖客服 Agent 与医疗预约 Agent 的端到端实现,展示四大模块如何协同工作。
四大模块回顾与本篇聚焦
2. 标准化消息协议:Agent 间的"普通话"
在复杂 Multi-Agent Collaboration 系统中(意图 Agent + 槽位 Agent + 工具 Agent + 记忆 Agent),各 Agent 之间必须使用统一通信协议——否则会出现"消息无法解析"“参数传递错误”"状态丢失"等问题。
2.1 核心协议字段定义(工业级标准)
| 字段名 | 类型 | 说明 | 架构师注释 |
|---|---|---|---|
role |
String | user / assistant / tool / system / agent | 定义消息来源与权限边界,防止权限越权 |
content |
String/JSON | 消息文本或工具返回数据 | 工具返回必须为 JSON,便于下游解析 |
tool_calls |
Array | [{"name": "工具名", "parameters": {...}}] |
实现决策与执行分离,LLM 只生成指令 |
session_id |
String | 会话唯一标识 | LangGraph 持久化关键,追踪长对话状态 |
task_id |
String | 任务唯一标识 | 多 Agent 协作时关联同一任务所有消息 |
confidence |
Float | 0.0-1.0 置信度评分 | Reflect-Refine 触发器,< 0.6 进入反思流程 |
timestamp |
String | 消息发送时间(ISO 格式) | 日志审计、状态回溯 |
error |
String/null | 错误信息(可选) | 异常状态下传递错误详情,不直接暴露给用户 |
新增字段说明(v2.0 扩展):生产环境中还建议增加
user_role(权限标识,用于工具鉴权)、retry_count(重试计数,防止无限循环)、parent_task_id(子任务关联,支持任务分解)三个字段,详见第 3.4 节工具权限分级。
2.2 多 Agent 协作消息示例
// Step 1: 用户发起请求 (role=user)
{
"role": "user",
"content": "帮我订一张2024-06-20从上海去北京的机票,乘客张三,身份证号110101199001011234",
"tool_calls": [],
"session_id": "session_20240618123456",
"task_id": "task_20240618123456_001",
"confidence": 1.0,
"timestamp": "2024-06-18T12:34:56Z",
"error": null
}
// Step 2: 意图 Agent 返回 (role=agent)
{
"role": "agent",
"content": "意图识别完成,核心意图:PURCHASE_FLIGHT,置信度:0.95",
"tool_calls": [],
"session_id": "session_20240618123456",
"task_id": "task_20240618123456_001",
"confidence": 0.95,
"timestamp": "2024-06-18T12:35:00Z",
"error": null
}
// Step 3: 槽位 Agent 返回 + 触发工具调用 (role=agent)
{
"role": "agent",
"content": {
"status": "success",
"slot": {
"destination": "北京",
"origin": "上海",
"departure_date": "2024-06-20",
"passenger_name": "张三",
"passenger_id": "110101199001011234"
}
},
"tool_calls": [{
"name": "book_flight_ticket",
"parameters": {
"origin": "上海",
"destination": "北京",
"departure_date": "2024-06-20",
"passenger_name": "张三",
"passenger_id": "110101199001011234"
}
}],
"session_id": "session_20240618123456",
"task_id": "task_20240618123456_001",
"confidence": 0.98,
"timestamp": "2024-06-18T12:35:05Z",
"error": null
}
// Step 4: 工具 Agent 返回 (role=tool)
{
"role": "tool",
"content": {
"status": "success",
"data": {
"order_no": "FL-20240620001",
"origin": "上海",
"destination": "北京",
"departure_date": "2024-06-20",
"departure_time": "09:30",
"arrival_time": "11:50",
"passenger_name": "张三"
}
},
"tool_calls": [],
"session_id": "session_20240618123456",
"task_id": "task_20240618123456_001",
"confidence": 1.0,
"timestamp": "2024-06-18T12:35:10Z",
"error": null
}
// Step 5: 最终回复用户 (role=assistant)
{
"role": "assistant",
"content": "机票预订成功!订单号 FL-20240620001,上海→北京,2024-06-20 09:30 出发,预计 11:50 到达。请携带身份证原件乘机。",
"tool_calls": [],
"session_id": "session_20240618123456",
"task_id": "task_20240618123456_001",
"confidence": 1.0,
"timestamp": "2024-06-18T12:35:15Z",
"error": null
}
2.3 LangGraph + Redis 实现"时间旅行"(状态持久化与回滚)
LangGraph 的核心能力:Persistence(持久化)——通过在协议中强制包含 session_id、task_id 和上下文快照,系统可在 Agent 出错时回滚到任意历史状态重新推理。
import redis
import json
from datetime import datetime
# 初始化 Redis
redis_client = redis.Redis(host='your-redis-host', port=6379, db=0, decode_responses=True)
def persist_protocol_message(message: dict) -> dict:
"""
将协议消息存入 Redis,关联 session_id 和 task_id
使用 list 结构保留整个链路记录,支持回滚
"""
message["timestamp"] = datetime.now().isoformat()
key = f"agent:msg:{message['session_id']}:{message['task_id']}"
redis_client.rpush(key, json.dumps(message, ensure_ascii=False))
# 设置过期时间:24小时(避免内存积压)
redis_client.expire(key, 86400)
return message
def rollback_protocol_message(session_id: str, task_id: str, rollback_step: int = 1) -> dict:
"""
回滚到指定步数前的协议消息(时间旅行)
- rollback_step=1:回滚一步
- rollback_step=2:回滚两步
"""
key = f"agent:msg:{session_id}:{task_id}"
message_list = redis_client.lrange(key, 0, -1)
if not message_list:
raise ValueError(f"未找到会话记录:{key}")
if len(message_list) <= rollback_step:
return json.loads(message_list[0]) # 回滚至初始状态
rollback_index = len(message_list) - 1 - rollback_step
return json.loads(message_list[rollback_index])
def tool_agent_node(state: dict) -> dict:
"""
LangGraph 工具节点:工具调用失败时自动回滚
"""
protocol_message = state["protocol_message"]
try:
tool_name = protocol_message["tool_calls"][0]["name"]
tool_params = protocol_message["tool_calls"][0]["parameters"]
tool_result = call_tool(tool_name, tool_params) # 自定义工具调用函数
protocol_message["role"] = "tool"
protocol_message["content"] = tool_result
protocol_message["tool_calls"] = []
protocol_message["error"] = None
persist_protocol_message(protocol_message)
return {"protocol_message": protocol_message, "current_state": "success"}
except Exception as e:
# 工具调用失败:自动回滚至上一步
rollback_message = rollback_protocol_message(
protocol_message["session_id"],
protocol_message["task_id"],
rollback_step=1
)
rollback_message["error"] = str(e)
return {
"protocol_message": rollback_message,
"current_state": "retry",
"error": str(e)
}
3. Tool Calling 深度实践:解耦决策与执行
工具调用是 Agent 产生实际价值的"最后一公里"。核心思维:模型负责决策(做什么、调哪个工具、传什么参数),系统负责执行(如何通过 API 实现、处理异常、返回结果)。这种解耦能大幅提升可维护性,避免工具变更导致整个 Agent 重构。
3.1 命名规范:Relevance AI 风格指南
工具命名必须具备自解释性,避免模糊命名导致 LLM 混淆:
| 等级 | 错误命名 | 正确命名 | 规则 |
|---|---|---|---|
| ❌ 初级 | search_it、get_data、call_api |
- | 模糊,无明确动作和对象 |
| ✅ 资深 | - | get_linkedin_company_postsbook_flight_ticketrefund_order_by_id |
动作(Action)+ 对象(Object)+ 系统(可选) |
Purpose Statement(目的陈述):每个工具必须声明用途,帮助 LLM 正确判断是否调用:
# ✅ 标准工具定义(含 purpose 字段)
TOOL_DEFINITIONS = [
{
"name": "get_linkedin_company_posts",
"purpose": "获取特定公司最近三个月内的 LinkedIn 帖子信息,包含内容、发布时间、点赞数、评论数。",
"when_to_use": "用户需要了解某公司的 LinkedIn 动态、品牌宣传内容或社媒活跃度时调用。",
"when_not_to_use": "用户询问公司财务数据、员工信息或非 LinkedIn 平台的内容时,不得调用此工具。",
"parameters": {
"company_name": "String,必填,公司全称(如:字节跳动)",
"company_domain": "String,必填,公司官网域名(如:bytedance.com)",
"time_range": "String,可选,默认最近3个月,格式:YYYY-MM-DD 至 YYYY-MM-DD"
},
"return_format": "JSON,包含 posts 数组,每个 post 含 content、publish_time、like_count、comment_count"
}
]
架构师笔记:增加
when_not_to_use字段是减少工具误调的关键技巧,可降低约 30% 的工具混淆率。
3.2 ReAct 框架流程图
ReAct(Reason + Act)让 Agent 执行工具调用前先"思考",明确"为什么调用、需要什么参数、能得到什么结果":
3.3 工业级 Agent Prompt 模板(工具调用"安全护栏")
## Agent Role
你是资深市场调研专家,可访问 Google Search 和 LinkedIn API,完成用户的市场调研需求。
## Tool Constraints(严格遵守)
1. 调用工具前,必须校验所有必填参数,缺失则反问用户,补充后再调用;
2. 调用 `get_linkedin_company_posts` 前,必须先通过 `search_company_domain` 确认域名;
3. 禁止向用户展示 API 密钥、原始 JSON 错误、工具调用地址等敏感信息;
4. 工具返回 404:告知"未找到相关企业信息",不编造结果;
5. 工具返回 500:告知"工具暂时不可用,请稍后重试",触发重试(最多3次,间隔1秒);
6. 工具调用结果必须整理成自然语言,禁止直接返回原始 JSON。
## Workflow(严格遵循)
1. **Thought(思考)**:分析需求,判断是否需要工具、调哪个、需要什么参数;
2. **Action(行动)**:生成工具调用指令,严格按工具定义传递参数;
3. **Observation(观察)**:接收工具结果,判断是否满足需求;
4. **Loop(循环)**:不满足则重新思考;满足则整理结果生成最终答案。
## Tool Definitions
{tool_definitions}
## User Query
{user_query}
## Output Format(严格遵守)
<thought>思考过程:是否需要工具、选哪个、参数是否完整、为什么这么调用</thought>
<action>{"name": "工具名", "parameters": {"参数名": "参数值"}}</action>
<observation>工具返回结果摘要(若未调用工具,留空)</observation>
<final_answer>最终回答(自然语言,不含敏感信息)</final_answer>
3.4 实战场景一:金融风控的串行工具调用链
金融场景中,工具调用必须严格按顺序执行,前一步结果是后一步的输入,且任何一步失败都必须终止并告警:
import time
from typing import Callable, Any
def execute_financial_tool_chain(
user_id: str,
transfer_params: dict,
tools: dict[str, Callable]
) -> dict:
"""
金融风控串行工具调用链(任意一步失败则终止)
- tools: {"verify_user_identity": fn, "check_balance": fn, ...}
"""
steps = [
("verify_user_identity", {"user_id": user_id}, "身份验证失败,请重试"),
("check_account_balance", {"user_id": user_id, "amount": transfer_params["amount"]}, "账户余额不足"),
("check_transfer_risk", {"target_account": transfer_params["target_account"]}, "风控检查异常"),
]
results = {}
for tool_name, params, error_msg in steps:
try:
result = _call_with_retry(tools[tool_name], params, max_retries=3, backoff=1.0)
# 风险分特殊处理
if tool_name == "check_transfer_risk" and result.get("risk_score", 0) >= 0.7:
return {
"status": "pending_review",
"message": "您的转账正在安全审核,预计2小时内完成",
"risk_score": result["risk_score"]
}
if not result.get("success"):
return {"status": "failed", "step": tool_name, "message": error_msg}
results[tool_name] = result
except Exception as e:
return {"status": "error", "step": tool_name, "message": f"系统异常:{str(e)}"}
# 执行转账(最终步骤)
try:
transfer_result = tools["execute_transfer"](transfer_params)
if transfer_result.get("success"):
return {
"status": "success",
"message": f"转账成功,流水号:{transfer_result['transaction_id']}",
"amount": transfer_params["amount"]
}
else:
# 转账失败:触发回滚
tools["rollback_transfer"](transfer_result.get("transaction_id"))
return {"status": "failed", "message": "转账执行失败,资金已原路返回"}
except Exception as e:
return {"status": "error", "message": f"转账异常,请联系客服:{str(e)}"}
def _call_with_retry(fn: Callable, params: dict, max_retries: int = 3, backoff: float = 1.0) -> Any:
"""带指数退避的重试机制"""
for attempt in range(max_retries):
try:
return fn(**params)
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(backoff * (2 ** attempt))
3.5 六大避坑点(工业级落地核心注意事项)
| # | 避坑点 | 根因 | 解决方案 |
|---|---|---|---|
| 1 | 参数校验双重保险 | 仅依赖 LLM 提取参数,格式错误率 15-30% | LLM 提取 + Pydantic 系统校验,双重过滤 |
| 2 | 工具调用超时处理 | API 响应缓慢导致整个 Agent 流程卡死 | 设置 3-5s 超时 + 指数退避重试(最多3次)+ 降级告知用户 |
| 3 | 工具权限分级 | 低权限 Agent 调用高权限工具导致数据泄露 | 协议中关联 user_role,工具层二次鉴权 |
| 4 | 结果缓存机制 | 高频相同查询重复调用 API,成本爆炸 | Redis 缓存工具结果,TTL 按业务场景(1h-24h)设置 |
| 5 | 日志审计全覆盖 | 出错无法回溯,问题排查困难 | 记录 query/工具/参数/结果/耗时/状态,关联 session_id |
| 6 | 多工具调用优先级 | 并行/乱序调用导致参数依赖缺失 | 明确 DAG 执行顺序,前置依赖工具必须先完成 |
4. 端到端实战一:外卖客服 Agent 完整实现
结合上篇的意图识别、槽位填充,以及本篇的标准化协议、工具调用,我们实现一个完整的外卖客服 Agent,支持订单查询、退款、催单三大核心场景。
4.1 系统架构全景图
4.2 完整代码实现
import json
import redis
from typing import Dict, Any
from datetime import datetime
# ========== 1. 初始化 Redis ==========
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# ========== 2. 意图识别 Agent ==========
INTENT_PROMPT = """
你是外卖客服意图识别专家,根据用户输入判断核心意图:
- CHECK_ORDER_STATUS:查询订单状态
- REQUEST_REFUND:申请退款
- URGE_DELIVERY:催单
用户输入:{user_input}
输出格式(JSON):
{{"intent": "意图名称", "confidence": 0.0-1.0}}
"""
def intent_recognition_agent(user_input: str) -> Dict[str, Any]:
"""意图识别(实际应调用 LLM,此处简化为规则)"""
if "还没到" in user_input or "订单" in user_input:
return {"intent": "CHECK_ORDER_STATUS", "confidence": 0.92}
elif "退款" in user_input or "退单" in user_input:
return {"intent": "REQUEST_REFUND", "confidence": 0.95}
elif "催" in user_input or "快点" in user_input:
return {"intent": "URGE_DELIVERY", "confidence": 0.88}
else:
return {"intent": "UNKNOWN", "confidence": 0.3}
# ========== 3. 槽位填充 Agent ==========
def slot_filling_agent(user_input: str, history: list) -> Dict[str, Any]:
"""槽位填充(实际应调用 LLM,此处简化为正则)"""
import re
match = re.search(r'OD-\d{8}-\d{3}', user_input)
if match:
return {"order_id": match.group(), "missing_slots": []}
else:
return {"order_id": None, "missing_slots": ["order_id"]}
# ========== 4. 标准化协议生成 ==========
def generate_protocol_message(
role: str,
content: Any,
tool_calls: list,
session_id: str,
task_id: str,
confidence: float
) -> Dict[str, Any]:
"""生成标准化协议消息"""
message = {
"role": role,
"content": content,
"tool_calls": tool_calls,
"session_id": session_id,
"task_id": task_id,
"confidence": confidence,
"timestamp": datetime.now().isoformat(),
"error": None
}
# 持久化到 Redis
key = f"agent:msg:{session_id}:{task_id}"
redis_client.rpush(key, json.dumps(message, ensure_ascii=False))
redis_client.expire(key, 86400)
return message
# ========== 5. 工具调用 Agent ==========
def query_order_status(order_id: str) -> Dict[str, Any]:
"""模拟查询订单状态(实际应调用外卖系统 API)"""
mock_orders = {
"OD-20240618-001": {"status": "配送中", "eta": "15分钟"},
"OD-20240618-002": {"status": "已送达", "eta": None}
}
if order_id in mock_orders:
return {"success": True, "data": mock_orders[order_id]}
else:
return {"success": False, "error": "订单不存在"}
def tool_calling_agent(protocol_message: Dict[str, Any]) -> Dict[str, Any]:
"""工具调用执行器"""
tool_name = protocol_message["tool_calls"][0]["name"]
tool_params = protocol_message["tool_calls"][0]["parameters"]
if tool_name == "query_order_status":
result = query_order_status(**tool_params)
protocol_message["role"] = "tool"
protocol_message["content"] = result
protocol_message["tool_calls"] = []
return protocol_message
else:
raise ValueError(f"未知工具:{tool_name}")
# ========== 6. 主流程编排 ==========
def delivery_customer_service_agent(user_input: str, session_id: str, history: list = []) -> str:
"""外卖客服 Agent 主流程"""
task_id = f"task_{datetime.now().strftime('%Y%m%d%H%M%S')}"
# Step 1: 意图识别
intent_result = intent_recognition_agent(user_input)
generate_protocol_message("agent", f"意图识别完成:{intent_result['intent']}",
[], session_id, task_id, intent_result["confidence"])
if intent_result["intent"] == "UNKNOWN":
return "抱歉,我没理解您的需求,请问您是要查询订单、申请退款还是催单?"
# Step 2: 槽位填充
slot_result = slot_filling_agent(user_input, history)
if slot_result["missing_slots"]:
return "请提供您的订单号(格式:OD-YYYYMMDD-XXX)"
# Step 3: 生成工具调用协议消息
protocol_message = generate_protocol_message(
"agent", {"status": "success", "slot": slot_result},
[{"name": "query_order_status", "parameters": {"order_id": slot_result["order_id"]}}],
session_id, task_id, 0.98
)
# Step 4: 工具调用
tool_result = tool_calling_agent(protocol_message)
generate_protocol_message("tool", tool_result["content"], [], session_id, task_id, 1.0)
# Step 5: 生成最终回复
if tool_result["content"]["success"]:
data = tool_result["content"]["data"]
if data["status"] == "配送中":
return f"您的订单正在配送中,预计 {data['eta']} 到达,请耐心等待。"
elif data["status"] == "已送达":
return "您的订单已送达,如有问题请联系骑手。"
else:
return "未找到该订单,请核对订单号后重试。"
# ========== 7. 测试运行 ==========
if __name__ == "__main__":
session_id = "session_20240618_001"
# 场景1:用户直接提供订单号
response1 = delivery_customer_service_agent(
"我的订单 OD-20240618-001 怎么还没到?", session_id
)
print(f"Agent: {response1}")
# 输出:您的订单正在配送中,预计 15分钟 到达,请耐心等待。
# 场景2:用户未提供订单号
response2 = delivery_customer_service_agent("我的订单怎么还没到?", session_id)
print(f"Agent: {response2}")
# 输出:请提供您的订单号(格式:OD-YYYYMMDD-XXX)
5. 工业级落地经验总结
5.1 协议设计三大原则
向后兼容:新增字段不影响旧版本 Agent,使用 optional 标记可选字段。当系统升级时,旧版本 Agent 应能优雅忽略未知字段,而非直接报错崩溃。
最小化传输:避免在协议中传递大文件(如图片、视频),改用 URL 引用。每条协议消息建议控制在 5KB 以内,超出则拆分为多条或外链存储。
安全优先:敏感字段(如 API 密钥、用户密码、完整身份证号)禁止出现在协议消息中。协议中只传递脱敏标识符,真实数据通过加密通道单独传递。
5.2 工具调用成本优化
| 优化策略 | 成本降低 | 实施难度 |
|---|---|---|
| Redis 缓存高频查询结果 | 40-60% | 低 |
| 批量工具调用(Batch API) | 30-50% | 中 |
| 工具调用前置条件判断(避免无效调用) | 20-30% | 低 |
| 使用更小模型做工具路由(Haiku 替代 Sonnet) | 50-70% | 中 |
| 并行工具调用替代串行(独立依赖场景) | 响应时间降低 30-50% | 中 |
5.3 监控指标体系
# 关键监控指标(Prometheus + Grafana)
METRICS = {
"intent_recognition_accuracy": "意图识别准确率(> 90%)",
"slot_filling_completion_rate": "槽位填充完成率(> 85%)",
"tool_call_success_rate": "工具调用成功率(> 95%)",
"tool_call_latency_p99": "工具调用 P99 延迟(< 3s)",
"protocol_message_size_avg": "协议消息平均大小(< 5KB)",
"redis_cache_hit_rate": "Redis 缓存命中率(> 70%)",
# 新增:并行调用相关指标
"parallel_tool_call_ratio": "并行工具调用占比(目标 > 30%)",
"rollback_rate": "事务回滚率(告警阈值 > 1%)",
"slot_refill_turns_avg": "平均槽位补全轮次(< 2 轮为优)"
}
5.4 三大场景对比:选型指南
根据本文三个实战场景,整理出工具调用模式的选型建议:
| 场景 | 调用模式 | 核心关注点 | 典型行业 |
|---|---|---|---|
| 外卖客服(订单查询) | 简单串行 | 用户体验、响应速度 | 电商、物流 |
| 金融风控(转账) | 严格串行 + 强一致性 | 安全合规、回滚机制 | 银行、支付、保险 |
| 医疗预约(跨系统) | 串并行混合 + 事务 | 数据一致性、并发控制 | 医疗、政务、教育 |
6. 总结与展望
本文(下篇)深入拆解了 AI Agent 逻辑层的"执行层"核心技术:
标准化协议方面,通过 role、content、tool_calls、session_id 等字段,实现多 Agent 统一通信,结合 LangGraph + Redis 支持状态持久化与回滚。
工具调用方面,基于 ReAct 框架,解耦决策与执行,通过命名规范、参数校验、重试机制、权限分级六大避坑点,确保工具调用安全可靠。
端到端实战方面,通过外卖客服(简单串行)、金融风控(强一致性串行)、医疗预约(串并行混合)三个场景,系统展示了不同复杂度下四大模块的协同工作方式。
结合上篇的意图识别与槽位填充,至此我们已完整覆盖 AI Agent 逻辑层的四大核心模块。下一篇将聚焦记忆层(Memory Layer),讲解如何让 Agent 具备长期记忆能力,支持跨会话上下文理解。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)