A2A 协议实战指南:用 TypeScript 构建可互通的 AI Agent 服务
A2A 协议实战指南:用 TypeScript 构建可互通的 AI Agent 服务
作者导读:MCP 还没搞清楚,A2A 又来了?本文带你从协议本质出发,一步步用 TypeScript 实现一个符合 A2A 规范的 Agent 服务,并演示两个 Agent 之间的跨框架协作全流程。读完你就明白:A2A 不是 MCP 的竞争者,它们解决的根本就是不同层次的问题。
一、为什么需要 A2A?
随着 AI Agent 从单点工具演进为企业级工作流,一个绕不开的问题浮出水面:
不同框架、不同厂商构建的 Agent,怎么互相"对话"?
- LangChain Agent 如何调用 AutoGen 构建的分析 Agent?
- 客户的私有 Agent 如何安全地调用你提供的能力服务?
- 多 Agent 协作时,任务状态、进度、结果如何统一传递?
Google 于 2026 年正式推出 A2A(Agent2Agent)协议,联合 50+ 合作伙伴(Salesforce、SAP、Atlassian 等),目标就是成为 AI Agent 世界的 HTTP 协议。
A2A vs MCP:两者不是竞争关系
很多人把 A2A 和 MCP 混为一谈,其实它们层次完全不同:
| 维度 | MCP(Model Context Protocol) | A2A(Agent2Agent) |
|---|---|---|
| 解决什么 | AI 模型 ↔ 工具/数据源 | Agent ↔ Agent |
| 通信方向 | 单向:模型调用工具 | 双向:Agent 间平等协作 |
| 类比 | 函数调用(Function Call) | 微服务间 REST/gRPC |
| 状态管理 | 无状态 | 有状态(Task 生命周期) |
简单记:MCP 让 AI 用工具,A2A 让 Agent 找 Agent。
二、A2A 协议核心概念
2.1 三个核心实体
┌─────────────────────────────────────────────────────────┐
│ A2A 协议栈 │
│ │
│ ┌──────────────┐ ┌──────────────────────────┐ │
│ │ Client Agent │ ──────► │ Remote Agent │ │
│ │ (调用方) │ │ (被调用方 / Agent Card) │ │
│ └──────────────┘ └──────────────────────────┘ │
│ │ │
│ Task + Artifacts │
│ (任务 + 产物) │
└─────────────────────────────────────────────────────────┘
Agent Card:每个 A2A Agent 必须在固定地址 /.well-known/agent.json 暴露自己的能力描述卡片,包括支持的 skills、认证方式、流式能力等。
Task:A2A 中的核心工作单元,有完整生命周期(submitted → working → completed/failed),支持异步查询和流式更新。
Artifact:Task 完成后产出的结果物,可以是文本、文件、结构化数据等。
2.2 通信方式
A2A 支持三种通信模式:
- 同步 JSON-RPC — 简单请求响应
- Server-Sent Events (SSE) — 流式进度推送
- Push Notification — Webhook 回调(适合长任务)
三、动手实战:TypeScript 实现 A2A Agent
我们来构建一个代码审查 Agent,它接受代码片段,返回审查建议。
3.1 项目初始化
mkdir a2a-code-review-agent && cd a2a-code-review-agent
npm init -y
npm install express zod uuid
npm install -D typescript @types/express @types/node ts-node
npx tsc --init
tsconfig.json 关键配置:
{
"compilerOptions": {
"target": "ES2022",
"module": "commonjs",
"outDir": "./dist",
"strict": true,
"esModuleInterop": true
}
}
3.2 定义 A2A 类型
// src/types/a2a.ts
export interface AgentCard {
name: string;
description: string;
url: string;
version: string;
capabilities: {
streaming?: boolean;
pushNotifications?: boolean;
};
skills: AgentSkill[];
authentication?: {
schemes: string[];
};
}
export interface AgentSkill {
id: string;
name: string;
description: string;
inputModes: string[]; // "text" | "file" | "data"
outputModes: string[];
}
export type TaskStatus =
| "submitted"
| "working"
| "input-required"
| "completed"
| "failed"
| "canceled";
export interface Task {
id: string;
status: TaskStatus;
message?: Message; // 最新消息
artifacts?: Artifact[]; // 产物列表
createdAt: string;
updatedAt: string;
}
export interface Message {
role: "user" | "agent";
parts: MessagePart[];
}
export interface MessagePart {
type: "text" | "file" | "data";
text?: string;
data?: unknown;
}
export interface Artifact {
name?: string;
parts: MessagePart[];
}
3.3 实现 Agent 核心逻辑
// src/agent/code-review-agent.ts
import { Task, Message, Artifact } from "../types/a2a";
interface ReviewResult {
issues: Array<{
line?: number;
severity: "error" | "warning" | "info";
message: string;
}>;
score: number;
summary: string;
}
export async function reviewCode(code: string): Promise<ReviewResult> {
// 实际场景中可以调用 LLM 或静态分析工具
// 这里演示规则检测逻辑
const issues: ReviewResult["issues"] = [];
// 检测 1:console.log 残留
const consoleLines = code.split("\n").reduce<number[]>((acc, line, idx) => {
if (line.includes("console.log")) acc.push(idx + 1);
return acc;
}, []);
consoleLines.forEach((line) => {
issues.push({
line,
severity: "warning",
message: `第 ${line} 行存在 console.log,请在生产代码中移除`,
});
});
// 检测 2:any 类型滥用
const anyMatches = (code.match(/:\s*any\b/g) || []).length;
if (anyMatches > 2) {
issues.push({
severity: "warning",
message: `检测到 ${anyMatches} 处 any 类型,建议使用具体类型替代`,
});
}
// 检测 3:函数过长(超过50行)
const functionBlocks = code.match(/\{[^{}]*\}/gs) || [];
functionBlocks.forEach((block) => {
const lines = block.split("\n").length;
if (lines > 50) {
issues.push({
severity: "info",
message: `发现超过 50 行的函数体(${lines} 行),建议拆分为更小的函数`,
});
}
});
const errorCount = issues.filter((i) => i.severity === "error").length;
const warnCount = issues.filter((i) => i.severity === "warning").length;
const score = Math.max(0, 100 - errorCount * 20 - warnCount * 5);
return {
issues,
score,
summary: `代码质量评分:${score}/100。发现 ${errorCount} 个错误,${warnCount} 个警告`,
};
}
export function buildTaskResponse(
taskId: string,
result: ReviewResult
): Task {
const artifactText = [
`## 代码审查报告`,
``,
`**总评分:${result.score}/100**`,
``,
`${result.summary}`,
``,
`### 问题详情`,
...result.issues.map(
(i) =>
`- [${i.severity.toUpperCase()}]${i.line ? ` 第${i.line}行` : ""} ${i.message}`
),
result.issues.length === 0 ? "✅ 未发现问题,代码质量良好!" : "",
].join("\n");
return {
id: taskId,
status: "completed",
artifacts: [
{
name: "review-report",
parts: [{ type: "text", text: artifactText }],
},
],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
};
}
3.4 搭建 A2A HTTP 服务
// src/server.ts
import express from "express";
import { v4 as uuidv4 } from "uuid";
import { AgentCard, Task } from "./types/a2a";
import { reviewCode, buildTaskResponse } from "./agent/code-review-agent";
const app = express();
app.use(express.json());
// Task 内存存储(生产环境应使用 Redis/DB)
const taskStore = new Map<string, Task>();
// ── 1. Agent Card 端点(A2A 必须实现)──────────────────────
app.get("/.well-known/agent.json", (_req, res) => {
const agentCard: AgentCard = {
name: "Code Review Agent",
description: "基于规则和 AI 的代码审查服务,支持 TypeScript/JavaScript",
url: `http://localhost:${PORT}`,
version: "1.0.0",
capabilities: {
streaming: true,
pushNotifications: false,
},
skills: [
{
id: "code-review",
name: "代码审查",
description: "分析代码质量,输出问题报告和评分",
inputModes: ["text"],
outputModes: ["text"],
},
],
authentication: {
schemes: ["Bearer"],
},
};
res.json(agentCard);
});
// ── 2. 发送任务(tasks/send)────────────────────────────────
app.post("/tasks/send", async (req, res) => {
const { id, message } = req.body;
const taskId = id || uuidv4();
// 提取用户发来的代码
const codeText = message?.parts
?.filter((p: { type: string }) => p.type === "text")
?.map((p: { text: string }) => p.text)
?.join("\n");
if (!codeText) {
return res.status(400).json({
error: { code: -32602, message: "message 中未找到 text 类型的代码内容" },
});
}
// 立即返回 working 状态
const workingTask: Task = {
id: taskId,
status: "working",
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
};
taskStore.set(taskId, workingTask);
// 异步执行审查
reviewCode(codeText).then((result) => {
const completedTask = buildTaskResponse(taskId, result);
taskStore.set(taskId, completedTask);
});
return res.json(workingTask);
});
// ── 3. 查询任务状态(tasks/get)─────────────────────────────
app.get("/tasks/:taskId", (req, res) => {
const task = taskStore.get(req.params.taskId);
if (!task) {
return res.status(404).json({
error: { code: -32001, message: "Task not found" },
});
}
return res.json(task);
});
// ── 4. SSE 流式订阅(tasks/sendSubscribe)───────────────────
app.post("/tasks/sendSubscribe", async (req, res) => {
const { message } = req.body;
const taskId = uuidv4();
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
// 发送初始状态
const sendEvent = (data: object) => {
res.write(`data: ${JSON.stringify(data)}\n\n`);
};
sendEvent({ id: taskId, status: "working" });
const codeText = message?.parts
?.filter((p: { type: string }) => p.type === "text")
?.map((p: { text: string }) => p.text)
?.join("\n") ?? "";
const result = await reviewCode(codeText);
const completedTask = buildTaskResponse(taskId, result);
sendEvent(completedTask);
res.end();
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`✅ A2A Code Review Agent running at http://localhost:${PORT}`);
console.log(`📋 Agent Card: http://localhost:${PORT}/.well-known/agent.json`);
});
四、构建 Client Agent:调用远程 A2A 服务
// src/client/a2a-client.ts
import { AgentCard, Task } from "../types/a2a";
export class A2AClient {
private baseUrl: string;
private agentCard?: AgentCard;
constructor(baseUrl: string) {
this.baseUrl = baseUrl.replace(/\/$/, "");
}
// Step 1:发现 Agent 能力
async discover(): Promise<AgentCard> {
const response = await fetch(
`${this.baseUrl}/.well-known/agent.json`
);
if (!response.ok) throw new Error("Failed to fetch Agent Card");
this.agentCard = await response.json();
return this.agentCard!;
}
// Step 2:发送任务并轮询等待完成
async sendAndWait(code: string, timeoutMs = 30000): Promise<Task> {
// 发送任务
const sendRes = await fetch(`${this.baseUrl}/tasks/send`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
message: {
role: "user",
parts: [{ type: "text", text: code }],
},
}),
});
const task: Task = await sendRes.json();
if (task.status === "completed") return task;
// 轮询直到完成
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
await new Promise((r) => setTimeout(r, 800));
const pollRes = await fetch(`${this.baseUrl}/tasks/${task.id}`);
const updated: Task = await pollRes.json();
if (updated.status === "completed" || updated.status === "failed") {
return updated;
}
}
throw new Error(`Task ${task.id} timed out`);
}
// Step 3:流式接收
async sendStreaming(
code: string,
onEvent: (task: Partial<Task>) => void
): Promise<void> {
const response = await fetch(`${this.baseUrl}/tasks/sendSubscribe`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
message: { role: "user", parts: [{ type: "text", text: code }] },
}),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
chunk
.split("\n")
.filter((line) => line.startsWith("data:"))
.forEach((line) => {
const data = JSON.parse(line.replace("data:", "").trim());
onEvent(data);
});
}
}
}
使用示例
// src/demo.ts
import { A2AClient } from "./client/a2a-client";
async function main() {
const client = new A2AClient("http://localhost:3000");
// 1. 发现 Agent
const card = await client.discover();
console.log(`🤖 连接到:${card.name} v${card.version}`);
console.log(`📦 支持技能:${card.skills.map((s) => s.name).join(", ")}`);
// 2. 发送待审查代码
const codeToReview = `
function processData(data: any) {
console.log("processing", data);
const result: any = {};
for (let i = 0; i < data.length; i++) {
const item: any = data[i];
result[item.id] = item.value;
console.log("item:", item);
}
return result;
}
`;
console.log("\n⏳ 正在审查代码...");
const task = await client.sendAndWait(codeToReview);
// 3. 获取结果
if (task.status === "completed" && task.artifacts) {
const report = task.artifacts[0].parts[0].text;
console.log("\n📋 审查报告:");
console.log(report);
}
}
main().catch(console.error);
运行效果:
🤖 连接到:Code Review Agent v1.0.0
📦 支持技能:代码审查
⏳ 正在审查代码...
📋 审查报告:
## 代码审查报告
**总评分:85/100**
代码质量评分:85/100。发现 0 个错误,3 个警告
### 问题详情
- [WARNING] 第 3 行 存在 console.log,请在生产代码中移除
- [WARNING] 第 9 行 存在 console.log,请在生产代码中移除
- [WARNING] 检测到 3 处 any 类型,建议使用具体类型替代
五、企业级场景:多 Agent 协作流水线
用户请求
│
▼
┌──────────────┐ A2A ┌──────────────────┐
│ Orchestrator │ ────────► │ Code Review Agent│
│ Agent │ └──────────────────┘
│ │ A2A ┌──────────────────┐
│ │ ────────► │ Doc Gen Agent │
│ │ └──────────────────┘
│ │ A2A ┌──────────────────┐
│ │ ────────► │ Deploy Agent │
└──────────────┘ └──────────────────┘
每个 Agent 只负责自己的领域,通过 A2A 协议互相调用,实现真正的关注点分离。
六、踩坑总结
| 坑点 | 说明 | 解决方案 |
|---|---|---|
| Agent Card 地址固定 | 必须是 /.well-known/agent.json |
不能自定义路径 |
| Task ID 唯一性 | 同一 taskId 重复提交会覆盖 | Client 端每次生成新 UUID |
| SSE 连接超时 | 长任务 SSE 会被代理断开 | 改用 Push Notification 或定时心跳 |
| 跨域问题 | Client Agent 跨域调用 Remote Agent | 服务端加 CORS 头或走 API 网关 |
| 认证传递 | Bearer Token 在 Authorization Header | 每次请求都带上,不能缓存 |
七、总结
A2A 协议的核心价值在于标准化 Agent 间的通信契约,就像 REST 统一了 Web 服务调用一样。掌握 A2A,你就具备了构建多 Agent 系统的基础设施能力。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)