Agent间能力发现与注册:动态服务发现机制设计

关键词

  • Agent能力发现
  • 动态服务注册
  • 多Agent系统
  • 分布式系统架构
  • 服务发现协议
  • 能力匹配算法
  • 去中心化通信

摘要

在当今分布式计算和智能系统蓬勃发展的时代,多Agent系统(MAS)正成为解决复杂问题的重要范式。本文深入探讨Agent间能力发现与注册机制的设计原理与实现方法,这是构建高效、可扩展的多Agent协作系统的核心基础。我们将从基础概念出发,逐步解析动态服务发现的关键技术,包括能力描述模型、注册机制、发现协议、匹配算法等核心要素。文章结合生动类比、详细数学模型、完整流程图和实用代码示例,带领读者全面理解这一复杂技术领域。最后,我们展望该技术的未来发展趋势及其在物联网、边缘计算、智能协作等领域的广阔应用前景。


1. 背景介绍

1.1 主题背景和重要性

想象一下,在一个繁忙的城市中,有无数的专业服务提供者——医生、律师、工程师、厨师等等。每个服务提供者都有自己独特的技能和服务能力,但如果没有一个高效的方式让需要这些服务的人找到他们,整个城市的运转效率将会大大降低。这就是为什么我们需要电话簿、服务平台和推荐系统的原因。

在数字世界中,多Agent系统面临着完全相同的挑战。随着分布式系统规模的不断扩大,系统中可能包含成百上千个具有不同能力的Agent。这些Agent可能运行在不同的硬件设备上,采用不同的编程语言实现,提供各种各样的功能服务。如果没有一套高效的能力发现与注册机制,Agent之间就像城市中互不相识的居民,无法有效地协作完成复杂任务。

动态服务发现机制正是解决这一问题的关键技术。它就像数字世界中的"智能服务平台",让Agent能够:

  1. 主动注册自己的能力和服务
  2. 动态发现其他Agent提供的能力
  3. 根据任务需求匹配合适的服务提供者
  4. 在系统环境变化时自动适应调整

这一机制的重要性体现在多个方面:

  • 系统可扩展性:支持Agent的动态加入和离开,无需手动配置
  • 容错性:能够检测和应对Agent故障,维持系统稳定运行
  • 灵活性:适应能力变化和服务更新,保持系统活力
  • 协作效率:减少Agent间通信开销,提高任务完成效率

1.2 目标读者

本文适合以下几类读者:

  • 分布式系统开发者:希望了解如何在系统中实现服务发现功能
  • 多Agent系统研究者:对Agent协作机制和动态交互感兴趣
  • 微服务架构师:寻求更灵活的服务治理解决方案
  • 人工智能从业者:关注智能体协作和能力共享
  • 技术爱好者:对前沿分布式计算技术有学习热情

无论您是初学者还是有经验的专家,本文都将通过由浅入深的方式,带您全面了解Agent间能力发现与注册机制的设计与实现。

1.3 核心问题或挑战

设计一个高效、可靠的Agent能力发现与注册机制并非易事,我们需要面对以下核心挑战:

  1. 能力描述的标准化:如何用统一的方式描述不同Agent的多样化能力?
  2. 动态环境的适应性:如何处理Agent的频繁加入、离开和能力变化?
  3. 发现效率与资源消耗的平衡:如何在保证发现及时性的同时,避免过多的网络和计算资源消耗?
  4. 异构系统的互操作性:如何让运行在不同平台、使用不同技术的Agent能够相互理解和协作?
  5. 可扩展性:如何设计能够支持成千上万个Agent的发现机制?
  6. 安全性与信任:如何确保能力信息的真实性,防止恶意Agent的欺骗行为?

在接下来的章节中,我们将逐一探讨这些问题的解决方案,并构建一个完整的动态服务发现机制设计方案。


2. 核心概念解析

2.1 使用生活化比喻解释关键概念

在深入技术细节之前,让我们先用一个生活化的比喻来帮助理解Agent间能力发现与注册的核心概念。

比喻:数字世界的"人才市场"

我们可以将多Agent系统想象成一个大型的"数字人才市场",在这个市场中:

  • Agent = 市场中的求职者/服务提供者
  • 能力(Capability) = 求职者的技能和专长
  • 注册(Registration) = 求职者在人才市场登记自己的信息和技能
  • 发现(Discovery) = 雇主在人才市场中寻找具有特定技能的人才
  • 匹配(Matching) = 将雇主的需求与求职者的技能进行配对
  • 目录服务(Directory Service) = 人才市场的管理机构,负责维护求职者信息
  • 查询(Query) = 雇主发布的职位需求描述

就像人才市场需要一套高效的运营机制一样,多Agent系统也需要精心设计的能力发现与注册机制,才能让"求职者"(Agent)和"雇主"(请求服务的Agent)高效匹配。

主要参与者及其角色

在这个"数字人才市场"中,有几个关键参与者:

  1. 能力提供者(Provider Agent):拥有特定能力并愿意提供服务的Agent,类似于人才市场中的求职者。
  2. 能力请求者(Requester Agent):需要特定能力来完成任务的Agent,类似于人才市场中的雇主。
  3. 能力目录(Directory Facilitator):维护能力提供者信息的中介服务,类似于人才市场的管理中心。
  4. 能力匹配器(Matchmaker):负责将请求与可用能力进行匹配的组件,类似于人才市场中的猎头顾问。

2.2 概念间的关系和相互作用

现在让我们更系统地理解这些核心概念及其相互关系:

核心概念定义
  1. Agent能力(Capability):Agent能够执行的功能或提供的服务的正式描述。能力通常包括:

    • 功能描述:Agent能做什么
    • 输入输出规范:需要什么参数,返回什么结果
    • 质量属性:响应时间、可靠性、成本等
    • 约束条件:使用限制、依赖关系等
  2. 能力注册(Capability Registration):能力提供者向目录服务公布其能力信息的过程。注册信息可能包括:

    • Agent标识符和地址
    • 能力描述
    • 可用性状态
    • 注册时间和有效期
  3. 能力发现(Capability Discovery):能力请求者查找具有所需能力的Agent的过程。发现可以分为:

    • 主动发现:请求者主动查询目录服务
    • 被动发现:提供者广播能力信息,请求者监听
    • 混合发现:结合上述两种方式
  4. 能力匹配(Capability Matching):将请求的能力描述与注册的能力描述进行比较,确定它们是否兼容的过程。匹配可以基于:

    • 语法匹配:能力描述的字面比较
    • 语义匹配:基于能力含义的理解
    • 质量匹配:考虑非功能属性的匹配
  5. 动态更新(Dynamic Update):处理Agent能力变化、加入和离开的机制,包括:

    • 重新注册:能力变化时更新注册信息
    • 心跳检测:定期确认Agent是否活跃
    • 注销:Agent离开时移除注册信息
概念交互流程

这些概念之间的典型交互流程如下:

  1. 能力提供者启动后,向能力目录注册自己的能力
  2. 能力目录验证并存储提供者的能力信息
  3. 能力请求者有任务需求时,向能力目录发送能力查询
  4. 能力目录使用能力匹配器查找合适的提供者
  5. 匹配结果返回给请求者
  6. 请求者与选定的提供者直接通信,使用其能力
  7. 提供者和请求者定期更新自己的状态,目录维护信息的时效性

2.3 概念结构与核心要素组成

为了更清晰地展示这些概念的结构,让我们详细分析每个核心概念的组成要素:

Agent能力描述模型

一个完整的Agent能力描述应包含以下要素:

Capability {
  // 基本标识信息
  id: UniqueIdentifier          // 能力唯一标识
  name: String                  // 能力名称
  description: String           // 能力详细描述
  provider: AgentIdentifier     // 提供者Agent标识
  
  // 功能规格
  inputs: List<Parameter>       // 输入参数列表
  outputs: List<Parameter>      // 输出结果列表
  preconditions: List<Condition> // 前置条件
  postconditions: List<Condition> // 后置条件
  sideEffects: List<Effect>     // 副作用描述
  
  // 质量属性
  quality: QualityAttributes {
    responseTime: Range         // 响应时间范围
    reliability: Float          // 可靠性(0-1)
    availability: Float         // 可用性(0-1)
    throughput: Integer         // 吞吐量
    cost: Currency              // 使用成本
  }
  
  // 分类与标签
  category: Category            // 能力分类
  tags: List<String>            // 相关标签
  keywords: List<String>        // 搜索关键词
  
  // 元数据
  version: Version              // 能力版本
  createdAt: Timestamp          // 创建时间
  updatedAt: Timestamp          // 更新时间
  expiresAt: Timestamp          // 过期时间
}
能力注册信息结构

能力注册是能力提供者向目录服务提交的信息包,结构如下:

Registration {
  // 注册标识
  id: RegistrationId            // 注册记录唯一标识
  
  // 提供者信息
  provider: AgentInfo {
    id: AgentIdentifier         // Agent唯一标识
    address: NetworkAddress     // 网络地址
    protocol: CommunicationProtocol // 通信协议
    status: AgentStatus         // 状态(活跃/离线/繁忙等)
  }
  
  // 能力信息
  capabilities: List<Capability> // 提供的能力列表
  
  // 注册策略
  policy: RegistrationPolicy {
    leaseDuration: Duration     // 租约持续时间
    visibility: VisibilityLevel  // 可见性级别
    discoveryPreference: DiscoveryMode // 发现偏好
  }
  
  // 元数据
  timestamp: Timestamp          // 注册时间
  lastRenewed: Timestamp        // 最后更新时间
}
能力查询结构

能力请求者发送的查询信息结构:

Query {
  // 查询标识
  id: QueryId                   // 查询唯一标识
  
  // 请求者信息
  requester: AgentInfo          // 请求者Agent信息
  
  // 能力需求
  required: CapabilityRequirement {
    // 功能需求
    functionality: FunctionDescription // 所需功能描述
    
    // 约束条件
    constraints: List<Constraint>     // 约束条件列表
    
    // 质量需求
    qualityPreferences: QualityPreferences // 质量偏好
    
    // 优先级
    priorities: Map<Criterion, Weight> // 各标准的权重
  }
  
  // 查询策略
  strategy: QueryStrategy {
    searchScope: SearchScope     // 搜索范围
    maxResults: Integer          // 最大结果数
    timeout: Duration            // 查询超时时间
    ranking: RankingCriteria     // 排序标准
  }
}
匹配结果结构

目录服务返回的匹配结果结构:

MatchResult {
  // 结果标识
  id: ResultId                  // 结果唯一标识
  queryId: QueryId              // 对应查询的标识
  
  // 匹配项列表
  matches: List<Match> {
    provider: AgentInfo         // 匹配的提供者信息
    capability: Capability      // 匹配的能力
    score: Float                // 匹配得分(0-1)
    explanation: String         // 匹配解释
    qualityAssessment: QualityAssessment // 质量评估
  }
  
  // 元数据
  timestamp: Timestamp          // 结果生成时间
  totalMatches: Integer         // 总匹配数
  returnedMatches: Integer      // 返回匹配数
}

2.4 概念之间的关系:对比与联系

为了更清晰地理解这些概念之间的关系,我们从多个维度进行对比分析,并使用图表可视化展示它们的联系。

核心概念属性对比表
概念 主要目的 发起者 核心内容 时效性 主要交互对象
能力描述 标准化表达Agent功能 能力提供者 功能、接口、质量属性 相对稳定 注册、查询、匹配
注册 公布Agent能力信息 能力提供者 Agent信息+能力列表 动态变化 目录服务
查询 寻找满足需求的能力 能力请求者 能力需求描述 一次性 目录服务、匹配器
匹配 比较需求与能力 目录服务/匹配器 相似度计算 按需执行 查询、注册信息
发现 获取能力提供者信息 能力请求者 匹配结果集合 按需执行 目录服务、广播
更新 维护信息时效性 能力提供者/目录 状态变化、能力变更 周期性/事件驱动 目录服务
概念实体关系图

下面是一个表示这些核心概念及其关系的实体关系(ER)图:

provides

submits

issues

included_in

stored_in

triggers

used_in

produces

received_by

AGENT

CAPABILITY

REGISTRATION

QUERY

DIRECTORY

MATCHING

MATCH_RESULT

概念交互流程图

以下是这些概念在典型场景中的交互流程图:

渲染错误: Mermaid 渲染失败: Parse error on line 13: ... Note over R,D,M: 发现阶段 R->>D: ----------------------^ Expecting 'TXT', got ','

通过这些图表,我们可以更直观地理解Agent能力发现与注册机制中各个概念的角色和相互关系。在接下来的章节中,我们将深入探讨这些概念背后的技术原理与实现方法。


3. 技术原理与实现

3.1 能力描述模型与本体论

为了让Agent能够有效地描述和理解彼此的能力,我们需要一个标准化、语义丰富的能力描述模型。这就好比我们需要一份标准的简历格式,让雇主能够快速了解求职者的技能。

能力描述的层次结构

一个完善的能力描述模型应该包含以下几个层次:

  1. 语法层(Syntactic Level):定义能力描述的结构和格式
  2. 语义层(Semantic Level):定义能力描述的含义和概念关系
  3. 语用层(Pragmatic Level):定义能力的使用场景和上下文

让我们使用Web本体语言(OWL)的思想来构建一个能力描述本体。本体是一种形式化的、对于共享概念体系的明确而又详细的说明,非常适合用于能力描述。

能力描述本体设计

我们的能力描述本体将包含以下核心类和属性:

本体: AgentCapabilityOntology

核心类:
- Agent: 表示智能体
- Capability: 表示能力
- Parameter: 表示参数
- Condition: 表示条件
- QualityAttribute: 表示质量属性
- ServiceCategory: 表示服务分类

对象属性:
- provides: Agent → Capability (Agent提供能力)
- hasInput: Capability → Parameter (能力有输入参数)
- hasOutput: Capability → Parameter (能力有输出参数)
- hasPrecondition: Capability → Condition (能力有前置条件)
- hasPostcondition: Capability → Condition (能力有后置条件)
- hasQuality: Capability → QualityAttribute (能力有质量属性)
- belongsTo: Capability → ServiceCategory (能力属于某分类)

数据属性:
- hasName: Agent/Capability/Parameter → String (名称)
- hasDescription: Agent/Capability → String (描述)
- hasID: Agent/Capability → String (唯一标识符)
- hasType: Parameter → String (参数类型)
- hasUnit: QualityAttribute → String (单位)
- hasValue: QualityAttribute → Float (数值)
- hasTimestamp: Capability → DateTime (时间戳)

这种基于本体的能力描述方法为我们提供了:

  1. 共享词汇表:所有Agent使用相同的术语描述能力
  2. 形式化语义:明确定义了概念之间的关系
  3. 推理能力:可以基于描述逻辑进行推理和推导
  4. 扩展性:可以轻松添加新的概念和属性

3.2 能力匹配算法

能力匹配是发现机制的核心环节,它决定了如何将请求的能力与注册的能力进行比较。让我们深入探讨几种常见的匹配算法。

语法匹配算法

语法匹配是最简单的匹配方式,它基于能力描述的字面相似度进行匹配。

向量空间模型(Vector Space Model)

我们可以将能力描述转换为向量,然后计算向量之间的相似度:

  1. 首先,从能力描述中提取关键词,构建词汇表
  2. 然后,将每个能力描述表示为一个TF-IDF向量
  3. 最后,计算两个向量之间的余弦相似度

TF-IDF计算方法:
TF(t,d)=词t在文档d中出现的次数文档d中的总词数TF(t,d) = \frac{\text{词t在文档d中出现的次数}}{\text{文档d中的总词数}}TF(t,d)=文档d中的总词数t在文档d中出现的次数
IDF(t)=log⁡(文档总数包含词t的文档数+1)IDF(t) = \log\left(\frac{\text{文档总数}}{\text{包含词t的文档数}+1}\right)IDF(t)=log(包含词t的文档数+1文档总数)
TFIDF(t,d)=TF(t,d)×IDF(t)TFIDF(t,d) = TF(t,d) \times IDF(t)TFIDF(t,d)=TF(t,d)×IDF(t)

余弦相似度计算:
similarity(A,B)=A⋅B∥A∥∥B∥=∑i=1nAiBi∑i=1nAi2∑i=1nBi2\text{similarity}(A,B) = \frac{A \cdot B}{\|A\| \|B\|} = \frac{\sum_{i=1}^{n} A_i B_i}{\sqrt{\sum_{i=1}^{n} A_i^2} \sqrt{\sum_{i=1}^{n} B_i^2}}similarity(A,B)=A∥∥BAB=i=1nAi2 i=1nBi2 i=1nAiBi

语义匹配算法

语法匹配虽然简单,但它无法理解能力描述的真正含义。语义匹配则试图解决这个问题,它基于概念的含义而非字面形式进行匹配。

基于本体的语义匹配

利用我们之前定义的能力描述本体,可以实现更智能的匹配:

  1. 概念等价匹配:检查两个概念是否在本体中被定义为等价
  2. 概念包含匹配:检查一个概念是否是另一个概念的子类
  3. 属性兼容性匹配:检查输入输出参数的类型是否兼容
  4. 约束满足匹配:检查前置条件和后置条件是否满足

我们可以使用描述逻辑(DL)来形式化这些匹配条件:

  • 等价匹配:C≡DC \equiv DCD
  • 包含匹配:C⊑DC \sqsubseteq DCDD⊑CD \sqsubseteq CDC
  • 兼容匹配:C⊓D≢⊥C \sqcap D \not\equiv \botCD (概念不相交)

相似度计算

基于本体的相似度计算可以考虑以下因素:

  1. 概念层次距离:两个概念在本体层次结构中的距离
  2. 信息内容相似度:基于概念在语料库中的出现频率
  3. 属性重叠度:两个概念共享属性的程度

概念层次距离计算公式:
sim(C,D)=2×depth(LCS(C,D))depth(C)+depth(D)\text{sim}(C,D) = \frac{2 \times \text{depth}(LCS(C,D))}{\text{depth}(C) + \text{depth}(D)}sim(C,D)=depth(C)+depth(D)2×depth(LCS(C,D))

其中,LCS(C,D)LCS(C,D)LCS(C,D)表示概念CCCDDD的最小公共祖先,depth(X)\text{depth}(X)depth(X)表示概念XXX在层次结构中的深度。

混合匹配算法

在实际应用中,我们通常会结合语法和语义匹配,形成混合匹配算法,以获得更好的匹配效果。

混合匹配的步骤:

  1. 首先使用语法匹配进行初步筛选,快速排除明显不相关的能力
  2. 然后对筛选结果进行语义匹配,更精确地评估能力的兼容性
  3. 最后考虑质量属性,对匹配结果进行排序和优化

综合匹配得分公式:
KaTeX parse error: Expected 'EOF', got '_' at position 12: \text{total_̲score}(R,P) = \…

其中,RRR表示请求的能力,PPP表示提供者的能力,α,β,γ\alpha, \beta, \gammaα,β,γ是权重系数,满足α+β+γ=1\alpha + \beta + \gamma = 1α+β+γ=1

3.3 发现机制设计模式

根据系统架构和应用场景的不同,我们可以采用不同的发现机制设计模式。

集中式发现模式

在集中式模式中,所有Agent的能力注册信息都存储在一个中央目录服务中,类似于电话簿的工作方式。

优点

  • 实现简单,易于管理
  • 可以提供复杂的查询和匹配功能
  • 便于实现全局优化和资源调度

缺点

  • 单点故障风险
  • 可扩展性有限
  • 可能成为性能瓶颈

适用场景

  • 中小型系统
  • 需要复杂查询和匹配的场景
  • 管理和控制需求较高的环境
分布式发现模式

在分布式模式中,没有中央目录服务,Agent通过点对点的方式直接交换能力信息。

实现方式

  1. 广播/多播:Agent定期广播自己的能力信息
  2. 闲聊协议(Gossip Protocol):Agent随机选择其他Agent交换信息
  3. 分布式哈希表(DHT):使用DHT组织和查找能力信息

优点

  • 无单点故障
  • 可扩展性好
  • 适合动态性强的环境

缺点

  • 实现复杂
  • 查询效率可能较低
  • 资源消耗较大

适用场景

  • 大规模系统
  • 动态性高的环境
  • 对可靠性要求高的场景
混合发现模式

混合模式结合了集中式和分布式的优点,通常采用层次化结构:

  • 局部区域使用集中式目录
  • 区域之间使用分布式方式互连

优点

  • 平衡了效率和可扩展性
  • 可以根据需求调整层次结构
  • 兼具集中式和分布式的优点

缺点

  • 设计和实现较为复杂
  • 需要处理好层次间的交互

3.4 注册与更新机制

高效的注册与更新机制是保持能力信息时效性的关键。

租约机制(Lease Mechanism)

租约机制是一种常见的管理注册信息有效期的方法:

  1. Agent注册时请求一个特定期限的租约
  2. 目录服务批准租约并设置过期时间
  3. Agent在租约过期前必须续订
  4. 过期未续订的注册信息自动失效

租约机制的好处是:

  • 自动处理Agent异常退出的情况
  • 控制注册信息的新鲜度
  • 减少显式注销的需要

数学模型:
LLL为租约时长,RRR为续订周期,NNN为系统中Agent数量,则:

  • 理想情况下:R<LR < LR<L,确保租约不会过期
  • 续订请求率:N/RN/RN/R (请求/秒)
  • 目录服务负载与NNN成正比,与RRR成反比
心跳机制(Heartbeat Mechanism)

心跳机制用于检测Agent的活性状态:

  1. Agent定期发送"心跳"消息到目录服务
  2. 目录服务记录最后一次收到心跳的时间
  3. 如果超过一定时间没有收到心跳,则认为Agent已失效
  4. 失效Agent的注册信息被标记或移除

心跳协议可以是:

  • 推模式:Agent主动发送心跳
  • 拉模式:目录服务主动询问
  • 混合模式:结合上述两种方式

3.5 算法流程图

为了更清晰地展示整个能力发现与注册流程,我们提供以下几个关键流程图。

能力注册流程

集中式

分布式

验证成功

验证失败

Agent启动

准备能力描述

选择注册模式

发送注册请求到目录

通过广播/DHT分享能力

目录验证能力描述

分配租约并存储

返回错误信息

注册确认

等待其他Agent确认

启动心跳/租约续订

修正能力描述

能力发现流程

查询目录

直接发现

可用

不可用

有能力需求

构建能力查询

选择发现方式

发送查询到目录服务

广播查询消息

目录执行匹配算法

生成匹配结果列表

等待Agent响应

收集响应并排序

选择最佳匹配

验证提供者可用性

建立连接并使用能力

选择下一个候选

能力匹配流程

接收查询和注册库

语法匹配筛选

计算TF-IDF向量

计算余弦相似度

设置阈值过滤

获取候选集合

语义匹配细化

本体概念推理

输入输出兼容性检查

前置后置条件验证

计算语义相似度

质量评估排序

综合得分计算

按优先级排序

返回匹配结果

3.6 算法源代码

现在让我们通过Python代码实现一个简化版的Agent能力发现与注册机制。这个示例将包含基本的能力描述、注册、发现和匹配功能。

import uuid
import time
import heapq
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
from abc import ABC, abstractmethod
import re
import math


# 基础数据结构定义
class AgentStatus(Enum):
    ACTIVE = "active"
    BUSY = "busy"
    OFFLINE = "offline"
    ERROR = "error"


@dataclass
class Parameter:
    name: str
    type: str
    description: str = ""
    optional: bool = False


@dataclass
class QualityAttributes:
    response_time: Optional[Tuple[float, float]] = None  # 最小/最大响应时间(秒)
    reliability: Optional[float] = None  # 可靠性 0-1
    availability: Optional[float] = None  # 可用性 0-1
    throughput: Optional[int] = None  # 吞吐量(请求/秒)
    cost: Optional[float] = None  # 使用成本


@dataclass
class Capability:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    description: str = ""
    inputs: List[Parameter] = field(default_factory=list)
    outputs: List[Parameter] = field(default_factory=list)
    quality: QualityAttributes = field(default_factory=QualityAttributes)
    category: str = ""
    tags: List[str] = field(default_factory=list)
    keywords: List[str] = field(default_factory=list)
    version: str = "1.0"
    created_at: float = field(default_factory=time.time)
    updated_at: float = field(default_factory=time.time)


@dataclass
class AgentInfo:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    address: str = ""
    protocol: str = "http"
    status: AgentStatus = AgentStatus.ACTIVE
    last_heartbeat: float = field(default_factory=time.time)


@dataclass
class Registration:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    provider: AgentInfo = field(default_factory=AgentInfo)
    capabilities: List[Capability] = field(default_factory=list)
    lease_duration: float = 3600.0  # 租约持续时间(秒)
    registered_at: float = field(default_factory=time.time)
    expires_at: float = 0.0
    
    def __post_init__(self):
        if self.expires_at == 0.0:
            self.expires_at = self.registered_at + self.lease_duration


@dataclass
class CapabilityRequirement:
    functionality: str = ""
    input_constraints: List[Parameter] = field(default_factory=list)
    output_constraints: List[Parameter] = field(default_factory=list)
    quality_preferences: QualityAttributes = field(default_factory=QualityAttributes)
    category: str = ""
    keywords: List[str] = field(default_factory=list)


@dataclass
class Query:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    requester: AgentInfo = field(default_factory=AgentInfo)
    requirement: CapabilityRequirement = field(default_factory=CapabilityRequirement)
    max_results: int = 10
    timeout: float = 30.0
    timestamp: float = field(default_factory=time.time)


@dataclass
class Match:
    provider: AgentInfo
    capability: Capability
    score: float = 0.0
    explanation: str = ""


@dataclass
class MatchResult:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    query_id: str = ""
    matches: List[Match] = field(default_factory=list)
    timestamp: float = field(default_factory=time.time)


# 匹配算法实现
class MatchingStrategy(ABC):
    """匹配策略抽象基类"""
    
    @abstractmethod
    def match(self, requirement: CapabilityRequirement, 
              registrations: List[Registration]) -> List[Match]:
        pass


class SimpleKeywordMatching(MatchingStrategy):
    """简单关键词匹配策略"""
    
    def match(self, requirement: CapabilityRequirement, 
              registrations: List[Registration]) -> List[Match]:
        matches = []
        
        # 提取查询关键词
        query_keywords = set(requirement.keywords)
        query_keywords.update(re.findall(r'\w+', requirement.functionality.lower()))
        
        if requirement.category:
            query_keywords.add(requirement.category.lower())
        
        for registration in registrations:
            if registration.provider.status != AgentStatus.ACTIVE:
                continue
                
            for capability in registration.capabilities:
                # 提取能力关键词
                capability_keywords = set(capability.tags)
                capability_keywords.update(capability.keywords)
                capability_keywords.update(re.findall(r'\w+', capability.name.lower()))
                capability_keywords.update(re.findall(r'\w+', capability.description.lower()))
                
                if capability.category:
                    capability_keywords.add(capability.category.lower())
                
                # 计算关键词匹配得分
                if not query_keywords:
                    score = 0.0
                else:
                    common_keywords = query_keywords.intersection(capability_keywords)
                    score = len(common_keywords) / len(query_keywords)
                
                if score > 0:
                    match = Match(
                        provider=registration.provider,
                        capability=capability,
                        score=score,
                        explanation=f"关键词匹配: {common_keywords}"
                    )
                    matches.append(match)
        
        # 按得分排序
        matches.sort(key=lambda m: m.score, reverse=True)
        return matches


class TfIdfMatching(MatchingStrategy):
    """基于TF-IDF的匹配策略"""
    
    def __init__(self):
        self.vocabulary = set()
        self.idf_scores = {}
        self.document_count = 0
    
    def _prepare_document(self, capability: Capability) -> str:
        """将能力描述转换为文档"""
        parts = [
            capability.name,
            capability.description,
            capability.category,
            ' '.join(capability.tags),
            ' '.join(capability.keywords)
        ]
        return ' '.join(filter(None, parts)).lower()
    
    def _tokenize(self, text: str) -> List[str]:
        """分词"""
        return re.findall(r'\w+', text)
    
    def train(self, registrations: List[Registration]):
        """训练TF-IDF模型"""
        documents = []
        
        # 收集所有文档
        for registration in registrations:
            for capability in registration.capabilities:
                doc = self._prepare_document(capability)
                documents.append(doc)
        
        self.document_count = len(documents)
        
        # 构建词汇表
        for doc in documents:
            tokens = self._tokenize(doc)
            self.vocabulary.update(tokens)
        
        # 计算IDF
        term_document_count = {term: 0 for term in self.vocabulary}
        for doc in documents:
            tokens = set(self._tokenize(doc))
            for term in tokens:
                term_document_count[term] += 1
        
        for term, count in term_document_count.items():
            self.idf_scores[term] = math.log(self.document_count / (count + 1))
    
    def _compute_tfidf_vector(self, text: str) -> Dict[str, float]:
        """计算文本的TF-IDF向量"""
        tokens = self._tokenize(text)
        if not tokens:
            return {}
        
        # 计算TF
        tf = {}
        token_count = len(tokens)
        for token in tokens:
            tf[token] = tf.get(token, 0) + 1 / token_count
        
        # 计算TF-IDF
        tfidf = {}
        for token, tf_value in tf.items():
            if token in self.idf_scores:
                tfidf[token] = tf_value * self.idf_scores[token]
        
        return tfidf
    
    def _cosine_similarity(self, vec1: Dict[str, float], vec2: Dict[str, float]) -> float:
        """计算余弦相似度"""
        common_terms = set(vec1.keys()) & set(vec2.keys())
        dot_product = sum(vec1[term] * vec2[term] for term in common_terms)
        
        norm1 = math.sqrt(sum(value ** 2 for value in vec1.values()))
        norm2 = math.sqrt(sum(value ** 2 for value in vec2.values()))
        
        if norm1 == 0 or norm2 == 0:
            return 0.0
        
        return dot_product / (norm1 * norm2)
    
    def match(self, requirement: CapabilityRequirement, 
              registrations: List[Registration]) -> List[Match]:
        # 确保模型已训练
        if not self.vocabulary:
            self.train(registrations)
        
        # 准备查询文档
        query_parts = [
            requirement.functionality,
            requirement.category,
            ' '.join(requirement.keywords)
        ]
        query_text = ' '.join(filter(None, query_parts)).lower()
        query_vector = self._compute_tfidf_vector(query_text)
        
        matches = []
        for registration in registrations:
            if registration.provider.status != AgentStatus.ACTIVE:
                continue
                
            for capability in registration.capabilities:
                # 计算能力文档的TF-IDF向量
                capability_text = self._prepare_document(capability)
                capability_vector = self._compute_tfidf_vector(capability_text)
                
                # 计算相似度
                similarity = self._cosine_similarity(query_vector, capability_vector)
                
                if similarity > 0:
                    match = Match(
                        provider=registration.provider,
                        capability=capability,
                        score=similarity,
                        explanation=f"TF-IDF余弦相似度: {similarity:.4f}"
                    )
                    matches.append(match)
        
        # 按得分排序
        matches.sort(key=lambda m: m.score, reverse=True)
        return matches


class HybridMatching(MatchingStrategy):
    """混合匹配策略"""
    
    def __init__(self, keyword_weight=0.3, tfidf_weight=0.7):
        self.keyword_matcher = SimpleKeywordMatching()
        self.tfidf_matcher = TfIdfMatching()
        self.keyword_weight = keyword_weight
        self.tfidf_weight = tfidf_weight
    
    def match(self, requirement: CapabilityRequirement, 
              registrations: List[Registration]) -> List[Match]:
        # 获取两种匹配策略的结果
        keyword_matches = self.keyword_matcher.match(requirement, registrations)
        tfidf_matches = self.tfidf_matcher.match(requirement, registrations)
        
        # 为了便于合并,先创建字典
        keyword_scores = {(m.provider.id, m.capability.id): m.score for m in keyword_matches}
        tfidf_scores = {(m.provider.id, m.capability.id): m.score for m in tfidf_matches}
        
        # 找出所有匹配的能力
        all_candidates = set(keyword_scores.keys()).union(set(tfidf_scores.keys()))
        
        # 创建提供者和能力的查找表
        provider_lookup = {}
        capability_lookup = {}
        
        for registration in registrations:
            provider_lookup[registration.provider.id] = registration.provider
            for capability in registration.capabilities:
                capability_lookup[capability.id] = capability
        
        # 计算混合得分
        matches = []
        for provider_id, capability_id in all_candidates:
            keyword_score = keyword_scores.get((provider_id, capability_id), 0)
            tfidf_score = tfidf_scores.get((provider_id, capability_id), 0)
            
            hybrid_score = (self.keyword_weight * keyword_score + 
                          self.tfidf_weight * tfidf_score)
            
            match = Match(
                provider=provider_lookup[provider_id],
                capability=capability_lookup[capability_id],
                score=hybrid_score,
                explanation=f"混合得分: 关键词={keyword_score:.4f}, TF-IDF={tfidf_score:.4f}"
            )
            matches.append(match)
        
        # 按得分排序
        matches.sort(key=lambda m: m.score, reverse=True)
        return matches


# 目录服务实现
class DirectoryService:
    """能力目录服务"""
    
    def __init__(self, matching_strategy: MatchingStrategy = None):
        self.registrations: Dict[str, Registration] = {}  # 注册ID -> 注册信息
        self.agent_registrations: Dict[str, List[str]] = {}  # Agent ID -> 注册ID列表
        self.matching_strategy = matching_strategy or HybridMatching()
        self.last_cleanup = time.time()
        self.cleanup_interval = 300.0  # 每5分钟清理一次过期注册
    
    def _cleanup_expired_registrations(self):
        """清理过期的注册信息"""
        current_time = time.time()
        
        # 检查是否需要清理
        if current_time - self.last_cleanup < self.cleanup_interval:
            return
        
        expired_ids = []
        for reg_id, registration in self.registrations.items():
            if registration.expires_at < current_time:
                expired_ids.append(reg_id)
                
                # 更新Agent的注册列表
                agent_id = registration.provider.id
                if agent_id in self.agent_registrations:
                    if reg_id in self.agent_registrations[agent_id]:
                        self.agent_registrations[agent_id].remove(reg_id)
        
        # 删除过期注册
        for reg_id in expired_ids:
            del self.registrations[reg_id]
        
        self.last_cleanup = current_time
        print(f"Cleaned up {len(expired_ids)} expired registrations")
    
    def register(self, registration: Registration) -> str:
        """注册Agent能力"""
        # 先清理过期注册
        self._cleanup_expired_registrations()
        
        # 存储注册信息
        self.registrations[registration.id] = registration
        
        # 更新Agent的注册列表
        agent_id = registration.provider.id
        if agent_id not in self.agent_registrations:
            self.agent_registrations[agent_id] = []
        self.agent_registrations[agent_id].append(registration.id)
        
        print(f"Registered agent {registration.provider.name} with {len(registration.capabilities)} capabilities")
        return registration.id
    
    def renew_lease(self, registration_id: str, lease_extension: float = None) -> bool:
        """续订租约"""
        if registration_id not in self.registrations:
            return False
        
        registration = self.registrations[registration_id]
        lease_extension = lease_extension or registration.lease_duration
        registration.expires_at = time.time() + lease_extension
        
        # 更新提供者的最后心跳时间
        registration.provider.last_heartbeat = time.time()
        
        return True
    
    def unregister(self, registration_id: str) -> bool:
        """注销注册"""
        if registration_id not in self.registrations:
            return False
        
        registration = self.registrations[registration_id]
        agent_id = registration.provider.id
        
        # 从Agent的注册列表中移除
        if agent_id in self.agent_registrations:
            if registration_id in self.agent_registrations[agent_id]:
                self.agent_registrations[agent_id].remove(registration_id)
        
        # 删除注册
        del self.registrations[registration_id]
        
        print(f"Unregistered agent {registration.provider.name}")
        return True
    
    def update_agent_status(self, agent_id: str, status: AgentStatus) -> bool:
        """更新Agent状态"""
        if agent_id not in self.agent_registrations:
            return False
        
        for reg_id in self.agent_registrations[agent_id]:
            if reg_id in self.registrations:
                self.registrations[reg_id].provider.status = status
                self.registrations[reg_id].provider.last_heartbeat = time.time()
        
        return True
    
    def discover(self, query: Query) -> MatchResult:
        """发现满足需求的能力"""
        # 先清理过期注册
        self._cleanup_expired_registrations()
        
        # 获取所有注册信息
        active_registrations = [
            reg for reg in self.registrations.values()
            if reg.provider.status == AgentStatus.ACTIVE
        ]
        
        # 使用匹配策略查找匹配
        matches = self.matching_strategy.match(query.requirement, active_registrations)
        
        # 限制结果数量
        matches = matches[:query.max_results]
        
        # 创建并返回结果
        result = MatchResult(
            query_id=query.id,
            matches=matches,
            timestamp=time.time()
        )
        
        print(f"Discovery query returned {len(matches)} matches")
        return result
    
    def get_agent_capabilities(self, agent_id: str) -> List[Capability]:
        """获取特定Agent的所有能力"""
        capabilities = []
        
        if agent_id in self.agent_registrations:
            for reg_id in self
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐