A2A 协议实战指南:用 TypeScript 构建可互通的 AI Agent 服务

作者导读:MCP 还没搞清楚,A2A 又来了?本文带你从协议本质出发,一步步用 TypeScript 实现一个符合 A2A 规范的 Agent 服务,并演示两个 Agent 之间的跨框架协作全流程。读完你就明白:A2A 不是 MCP 的竞争者,它们解决的根本就是不同层次的问题。


一、为什么需要 A2A?

随着 AI Agent 从单点工具演进为企业级工作流,一个绕不开的问题浮出水面:

不同框架、不同厂商构建的 Agent,怎么互相"对话"?

  • LangChain Agent 如何调用 AutoGen 构建的分析 Agent?
  • 客户的私有 Agent 如何安全地调用你提供的能力服务?
  • 多 Agent 协作时,任务状态、进度、结果如何统一传递?

Google 于 2026 年正式推出 A2A(Agent2Agent)协议,联合 50+ 合作伙伴(Salesforce、SAP、Atlassian 等),目标就是成为 AI Agent 世界的 HTTP 协议

A2A vs MCP:两者不是竞争关系

很多人把 A2A 和 MCP 混为一谈,其实它们层次完全不同:

维度 MCP(Model Context Protocol) A2A(Agent2Agent)
解决什么 AI 模型 ↔ 工具/数据源 Agent ↔ Agent
通信方向 单向:模型调用工具 双向:Agent 间平等协作
类比 函数调用(Function Call) 微服务间 REST/gRPC
状态管理 无状态 有状态(Task 生命周期)

简单记:MCP 让 AI 用工具,A2A 让 Agent 找 Agent。


二、A2A 协议核心概念

2.1 三个核心实体

┌─────────────────────────────────────────────────────────┐
│                      A2A 协议栈                          │
│                                                          │
│  ┌──────────────┐          ┌──────────────────────────┐ │
│  │  Client Agent │ ──────► │      Remote Agent         │ │
│  │  (调用方)     │         │  (被调用方 / Agent Card)  │ │
│  └──────────────┘          └──────────────────────────┘ │
│                                    │                     │
│                              Task + Artifacts            │
│                              (任务 + 产物)               │
└─────────────────────────────────────────────────────────┘

Agent Card:每个 A2A Agent 必须在固定地址 /.well-known/agent.json 暴露自己的能力描述卡片,包括支持的 skills、认证方式、流式能力等。

Task:A2A 中的核心工作单元,有完整生命周期(submitted → working → completed/failed),支持异步查询和流式更新。

Artifact:Task 完成后产出的结果物,可以是文本、文件、结构化数据等。

2.2 通信方式

A2A 支持三种通信模式:

  1. 同步 JSON-RPC — 简单请求响应
  2. Server-Sent Events (SSE) — 流式进度推送
  3. Push Notification — Webhook 回调(适合长任务)

三、动手实战:TypeScript 实现 A2A Agent

我们来构建一个代码审查 Agent,它接受代码片段,返回审查建议。

3.1 项目初始化

mkdir a2a-code-review-agent && cd a2a-code-review-agent
npm init -y
npm install express zod uuid
npm install -D typescript @types/express @types/node ts-node
npx tsc --init

tsconfig.json 关键配置:

{
  "compilerOptions": {
    "target": "ES2022",
    "module": "commonjs",
    "outDir": "./dist",
    "strict": true,
    "esModuleInterop": true
  }
}

3.2 定义 A2A 类型

// src/types/a2a.ts
export interface AgentCard {
  name: string;
  description: string;
  url: string;
  version: string;
  capabilities: {
    streaming?: boolean;
    pushNotifications?: boolean;
  };
  skills: AgentSkill[];
  authentication?: {
    schemes: string[];
  };
}

export interface AgentSkill {
  id: string;
  name: string;
  description: string;
  inputModes: string[];   // "text" | "file" | "data"
  outputModes: string[];
}

export type TaskStatus =
  | "submitted"
  | "working"
  | "input-required"
  | "completed"
  | "failed"
  | "canceled";

export interface Task {
  id: string;
  status: TaskStatus;
  message?: Message;       // 最新消息
  artifacts?: Artifact[];  // 产物列表
  createdAt: string;
  updatedAt: string;
}

export interface Message {
  role: "user" | "agent";
  parts: MessagePart[];
}

export interface MessagePart {
  type: "text" | "file" | "data";
  text?: string;
  data?: unknown;
}

export interface Artifact {
  name?: string;
  parts: MessagePart[];
}

3.3 实现 Agent 核心逻辑

// src/agent/code-review-agent.ts
import { Task, Message, Artifact } from "../types/a2a";

interface ReviewResult {
  issues: Array<{
    line?: number;
    severity: "error" | "warning" | "info";
    message: string;
  }>;
  score: number;
  summary: string;
}

export async function reviewCode(code: string): Promise<ReviewResult> {
  // 实际场景中可以调用 LLM 或静态分析工具
  // 这里演示规则检测逻辑
  const issues: ReviewResult["issues"] = [];

  // 检测 1:console.log 残留
  const consoleLines = code.split("\n").reduce<number[]>((acc, line, idx) => {
    if (line.includes("console.log")) acc.push(idx + 1);
    return acc;
  }, []);
  consoleLines.forEach((line) => {
    issues.push({
      line,
      severity: "warning",
      message: `${line} 行存在 console.log,请在生产代码中移除`,
    });
  });

  // 检测 2:any 类型滥用
  const anyMatches = (code.match(/:\s*any\b/g) || []).length;
  if (anyMatches > 2) {
    issues.push({
      severity: "warning",
      message: `检测到 ${anyMatches} 处 any 类型,建议使用具体类型替代`,
    });
  }

  // 检测 3:函数过长(超过50行)
  const functionBlocks = code.match(/\{[^{}]*\}/gs) || [];
  functionBlocks.forEach((block) => {
    const lines = block.split("\n").length;
    if (lines > 50) {
      issues.push({
        severity: "info",
        message: `发现超过 50 行的函数体(${lines} 行),建议拆分为更小的函数`,
      });
    }
  });

  const errorCount = issues.filter((i) => i.severity === "error").length;
  const warnCount = issues.filter((i) => i.severity === "warning").length;
  const score = Math.max(0, 100 - errorCount * 20 - warnCount * 5);

  return {
    issues,
    score,
    summary: `代码质量评分:${score}/100。发现 ${errorCount} 个错误,${warnCount} 个警告`,
  };
}

export function buildTaskResponse(
  taskId: string,
  result: ReviewResult
): Task {
  const artifactText = [
    `## 代码审查报告`,
    ``,
    `**总评分:${result.score}/100**`,
    ``,
    `${result.summary}`,
    ``,
    `### 问题详情`,
    ...result.issues.map(
      (i) =>
        `- [${i.severity.toUpperCase()}]${i.line ? `${i.line}` : ""} ${i.message}`
    ),
    result.issues.length === 0 ? "✅ 未发现问题,代码质量良好!" : "",
  ].join("\n");

  return {
    id: taskId,
    status: "completed",
    artifacts: [
      {
        name: "review-report",
        parts: [{ type: "text", text: artifactText }],
      },
    ],
    createdAt: new Date().toISOString(),
    updatedAt: new Date().toISOString(),
  };
}

3.4 搭建 A2A HTTP 服务

// src/server.ts
import express from "express";
import { v4 as uuidv4 } from "uuid";
import { AgentCard, Task } from "./types/a2a";
import { reviewCode, buildTaskResponse } from "./agent/code-review-agent";

const app = express();
app.use(express.json());

// Task 内存存储(生产环境应使用 Redis/DB)
const taskStore = new Map<string, Task>();

// ── 1. Agent Card 端点(A2A 必须实现)──────────────────────
app.get("/.well-known/agent.json", (_req, res) => {
  const agentCard: AgentCard = {
    name: "Code Review Agent",
    description: "基于规则和 AI 的代码审查服务,支持 TypeScript/JavaScript",
    url: `http://localhost:${PORT}`,
    version: "1.0.0",
    capabilities: {
      streaming: true,
      pushNotifications: false,
    },
    skills: [
      {
        id: "code-review",
        name: "代码审查",
        description: "分析代码质量,输出问题报告和评分",
        inputModes: ["text"],
        outputModes: ["text"],
      },
    ],
    authentication: {
      schemes: ["Bearer"],
    },
  };
  res.json(agentCard);
});

// ── 2. 发送任务(tasks/send)────────────────────────────────
app.post("/tasks/send", async (req, res) => {
  const { id, message } = req.body;
  const taskId = id || uuidv4();

  // 提取用户发来的代码
  const codeText = message?.parts
    ?.filter((p: { type: string }) => p.type === "text")
    ?.map((p: { text: string }) => p.text)
    ?.join("\n");

  if (!codeText) {
    return res.status(400).json({
      error: { code: -32602, message: "message 中未找到 text 类型的代码内容" },
    });
  }

  // 立即返回 working 状态
  const workingTask: Task = {
    id: taskId,
    status: "working",
    createdAt: new Date().toISOString(),
    updatedAt: new Date().toISOString(),
  };
  taskStore.set(taskId, workingTask);

  // 异步执行审查
  reviewCode(codeText).then((result) => {
    const completedTask = buildTaskResponse(taskId, result);
    taskStore.set(taskId, completedTask);
  });

  return res.json(workingTask);
});

// ── 3. 查询任务状态(tasks/get)─────────────────────────────
app.get("/tasks/:taskId", (req, res) => {
  const task = taskStore.get(req.params.taskId);
  if (!task) {
    return res.status(404).json({
      error: { code: -32001, message: "Task not found" },
    });
  }
  return res.json(task);
});

// ── 4. SSE 流式订阅(tasks/sendSubscribe)───────────────────
app.post("/tasks/sendSubscribe", async (req, res) => {
  const { message } = req.body;
  const taskId = uuidv4();

  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  // 发送初始状态
  const sendEvent = (data: object) => {
    res.write(`data: ${JSON.stringify(data)}\n\n`);
  };

  sendEvent({ id: taskId, status: "working" });

  const codeText = message?.parts
    ?.filter((p: { type: string }) => p.type === "text")
    ?.map((p: { text: string }) => p.text)
    ?.join("\n") ?? "";

  const result = await reviewCode(codeText);
  const completedTask = buildTaskResponse(taskId, result);

  sendEvent(completedTask);
  res.end();
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`✅ A2A Code Review Agent running at http://localhost:${PORT}`);
  console.log(`📋 Agent Card: http://localhost:${PORT}/.well-known/agent.json`);
});

四、构建 Client Agent:调用远程 A2A 服务

// src/client/a2a-client.ts
import { AgentCard, Task } from "../types/a2a";

export class A2AClient {
  private baseUrl: string;
  private agentCard?: AgentCard;

  constructor(baseUrl: string) {
    this.baseUrl = baseUrl.replace(/\/$/, "");
  }

  // Step 1:发现 Agent 能力
  async discover(): Promise<AgentCard> {
    const response = await fetch(
      `${this.baseUrl}/.well-known/agent.json`
    );
    if (!response.ok) throw new Error("Failed to fetch Agent Card");
    this.agentCard = await response.json();
    return this.agentCard!;
  }

  // Step 2:发送任务并轮询等待完成
  async sendAndWait(code: string, timeoutMs = 30000): Promise<Task> {
    // 发送任务
    const sendRes = await fetch(`${this.baseUrl}/tasks/send`, {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({
        message: {
          role: "user",
          parts: [{ type: "text", text: code }],
        },
      }),
    });

    const task: Task = await sendRes.json();
    if (task.status === "completed") return task;

    // 轮询直到完成
    const deadline = Date.now() + timeoutMs;
    while (Date.now() < deadline) {
      await new Promise((r) => setTimeout(r, 800));
      const pollRes = await fetch(`${this.baseUrl}/tasks/${task.id}`);
      const updated: Task = await pollRes.json();
      if (updated.status === "completed" || updated.status === "failed") {
        return updated;
      }
    }
    throw new Error(`Task ${task.id} timed out`);
  }

  // Step 3:流式接收
  async sendStreaming(
    code: string,
    onEvent: (task: Partial<Task>) => void
  ): Promise<void> {
    const response = await fetch(`${this.baseUrl}/tasks/sendSubscribe`, {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({
        message: { role: "user", parts: [{ type: "text", text: code }] },
      }),
    });

    const reader = response.body!.getReader();
    const decoder = new TextDecoder();

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      const chunk = decoder.decode(value);
      chunk
        .split("\n")
        .filter((line) => line.startsWith("data:"))
        .forEach((line) => {
          const data = JSON.parse(line.replace("data:", "").trim());
          onEvent(data);
        });
    }
  }
}

使用示例

// src/demo.ts
import { A2AClient } from "./client/a2a-client";

async function main() {
  const client = new A2AClient("http://localhost:3000");

  // 1. 发现 Agent
  const card = await client.discover();
  console.log(`🤖 连接到:${card.name} v${card.version}`);
  console.log(`📦 支持技能:${card.skills.map((s) => s.name).join(", ")}`);

  // 2. 发送待审查代码
  const codeToReview = `
function processData(data: any) {
  console.log("processing", data);
  const result: any = {};
  for (let i = 0; i < data.length; i++) {
    const item: any = data[i];
    result[item.id] = item.value;
    console.log("item:", item);
  }
  return result;
}
  `;

  console.log("\n⏳ 正在审查代码...");
  const task = await client.sendAndWait(codeToReview);

  // 3. 获取结果
  if (task.status === "completed" && task.artifacts) {
    const report = task.artifacts[0].parts[0].text;
    console.log("\n📋 审查报告:");
    console.log(report);
  }
}

main().catch(console.error);

运行效果:

🤖 连接到:Code Review Agent v1.0.0
📦 支持技能:代码审查

⏳ 正在审查代码...

📋 审查报告:
## 代码审查报告

**总评分:85/100**

代码质量评分:85/100。发现 0 个错误,3 个警告

### 问题详情
- [WARNING] 第 3 行 存在 console.log,请在生产代码中移除
- [WARNING] 第 9 行 存在 console.log,请在生产代码中移除
- [WARNING] 检测到 3 处 any 类型,建议使用具体类型替代

五、企业级场景:多 Agent 协作流水线

用户请求
   │
   ▼
┌──────────────┐    A2A     ┌──────────────────┐
│  Orchestrator │ ────────► │  Code Review Agent│
│  Agent        │           └──────────────────┘
│               │    A2A     ┌──────────────────┐
│               │ ────────► │  Doc Gen Agent    │
│               │           └──────────────────┘
│               │    A2A     ┌──────────────────┐
│               │ ────────► │  Deploy Agent     │
└──────────────┘           └──────────────────┘

每个 Agent 只负责自己的领域,通过 A2A 协议互相调用,实现真正的关注点分离。


六、踩坑总结

坑点 说明 解决方案
Agent Card 地址固定 必须是 /.well-known/agent.json 不能自定义路径
Task ID 唯一性 同一 taskId 重复提交会覆盖 Client 端每次生成新 UUID
SSE 连接超时 长任务 SSE 会被代理断开 改用 Push Notification 或定时心跳
跨域问题 Client Agent 跨域调用 Remote Agent 服务端加 CORS 头或走 API 网关
认证传递 Bearer Token 在 Authorization Header 每次请求都带上,不能缓存

七、总结

A2A 协议的核心价值在于标准化 Agent 间的通信契约,就像 REST 统一了 Web 服务调用一样。掌握 A2A,你就具备了构建多 Agent 系统的基础设施能力。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐