构建可控的 AI Agent Harness Engineering:约束、规则与政策引擎
构建可控的 AI Agent Harness Engineering:约束、规则与政策引擎
关键词
- AI Agent 控制
- Harness Engineering
- 约束引擎
- 规则系统
- 政策框架
- 安全对齐
- 可信AI
摘要
随着AI代理(Agent)系统在各个领域的广泛应用,如何确保这些系统在复杂环境中安全、可靠且符合人类价值观地运行,已成为AI领域的关键挑战。本文深入探讨"AI Agent Harness Engineering"——一套用于构建可控AI代理的工程方法,重点关注约束、规则与政策引擎的设计与实现。我们将从核心概念解析开始,通过生动比喻和技术原理分析,展示如何构建多层级的控制框架。文章包含详细的算法设计、代码实现和实际案例,为开发者提供构建安全可控AI代理系统的完整指南。
1. 背景介绍
1.1 AI Agent 时代的到来与挑战
想象一下,你拥有一位高度智能的私人助手,它可以帮你管理日程、处理邮件、甚至代表你进行简单的商务谈判。这个助手不仅理解你的指令,还能根据环境变化自主做出决策。这听起来像是科幻小说中的场景,但随着大语言模型(LLM)和自主代理技术的快速发展,这正在成为现实。
然而,随着AI代理能力的增强,一个关键问题浮出水面:我们如何确保这些自主系统的行为始终符合我们的期望、价值观和安全要求?
在传统的软件系统中,我们通过明确的代码逻辑来控制系统行为。但AI代理,特别是基于大语言模型的代理,具有高度的自主性和适应性,它们的决策过程往往是"黑盒"式的,难以预测和控制。这就引发了所谓的"控制问题"——如何在赋予AI系统强大能力的同时,确保它们安全可靠地服务于人类利益。
1.2 从"对齐"到"工程化控制"
AI安全研究领域已经提出了"对齐"(Alignment)概念,指的是确保AI系统的目标与人类价值观保持一致。然而,对齐研究更多是理论性的,而我们需要的是一套实用的工程方法,能够在实际系统中落地实施。
这就是"AI Agent Harness Engineering"应运而生的背景。"Harness"这个词有多重含义:它既指马具(用于控制和引导马匹),也指利用和开发(自然资源)。在AI语境下,它代表了一套工程方法,用于"驾驭"和"利用"AI代理的强大能力,同时确保它们在安全可控的范围内运行。
1.3 目标读者与核心问题
本文的目标读者包括:
- AI应用开发者,希望构建安全可靠的代理系统
- 企业技术决策者,需要评估和管理AI系统风险
- AI安全研究者,对实用控制方法感兴趣
- 对AI代理技术有兴趣的技术爱好者
我们将重点解决以下核心问题:
- 如何设计多层次的控制框架,平衡AI代理的自主性与可控性?
- 约束、规则和政策引擎各自的作用是什么?它们如何协同工作?
- 如何在实际系统中实现这些控制机制,同时不显著降低AI代理的性能和用户体验?
- 有哪些最佳实践和常见陷阱需要注意?
2. 核心概念解析
2.1 什么是 AI Agent Harness Engineering?
让我们用一个生动的比喻来理解这个概念。想象你正在驾驶一辆具有高级自动驾驶功能的汽车。这辆车非常智能,可以自动规划路线、避让障碍物、甚至根据交通状况调整驾驶风格。但无论它多么智能,你——作为人类驾驶员——仍然保持最终控制权。你可以随时介入,纠正车辆的行为,或者设定约束条件(比如最高车速)。
AI Agent Harness Engineering 就是要为AI代理系统构建这样一套"驾驶控制系统"。它不是要完全替代AI代理的自主决策,而是要建立一个框架,确保AI代理的行为始终在可接受的范围内,同时保留其适应环境和解决问题的能力。
2.2 核心概念:约束、规则与政策
在AI Agent Harness Engineering中,有三个核心概念需要理解:约束(Constraints)、规则(Rules)和政策(Policies)。虽然它们经常被互换使用,但在我们的框架中,它们代表了不同层级和类型的控制机制。
2.2.1 约束(Constraints)
约束是最底层、最刚性的控制机制。它们定义了AI代理绝对不能逾越的边界,就像汽车的物理限制(比如无法超过其机械设计的最高速度)或者道路上的物理障碍。
在技术实现上,约束通常表现为硬性限制:
- 资源约束:AI代理不能使用超过指定限额的计算资源、API调用次数或资金
- 操作约束:AI代理不能执行某些特定类型的操作(如删除系统文件、进行大额金融交易)
- 输出约束:AI代理的输出必须符合特定格式或长度限制
2.2.2 规则(Rules)
规则位于约束之上,是更加结构化和条件化的控制机制。如果说约束是"不能做什么",那么规则就是"在什么情况下应该/不应该做什么"。
规则可以类比为交通法规:红灯停、绿灯行、限速行驶等。它们更加灵活,可以根据具体情况应用,但仍然是相对明确和可执行的。
在AI系统中,规则可能表现为:
- 如果用户询问竞争对手的敏感信息,应拒绝回答
- 当执行金融交易时,必须获得用户的明确确认
- 如果检测到特定类型的内容(如暴力、仇恨言论),应进行过滤
2.2.3 政策(Policies)
政策是最高层级、最抽象的控制机制。它们定义了AI代理应该遵循的原则、价值观和目标,类似于一个组织的使命宣言或道德准则。
政策不像规则那样具体明确,它们需要根据具体情境进行解释和应用。例如,一个"以用户最佳利益为行动准则"的政策,在不同情况下可能有不同的具体表现形式。
在AI系统中,政策可能包括:
- 尊重用户隐私和数据所有权
- 避免造成伤害("首先,不伤害"原则)
- 促进公平和包容
- 保持透明和可解释性
2.3 概念间的关系与层次结构
约束、规则和政策不是孤立存在的,它们形成了一个金字塔状的层次结构,共同作用于AI代理系统:
这个层次结构体现了几个重要原则:
-
自上而下的指导:政策指导规则的制定,规则指导约束的设置。每一层都是上一层的具体实现。
-
自下而上的执行:约束直接作用于AI代理,是最直接的控制机制;规则通过约束实现;政策则通过规则和约束间接发挥作用。
-
反馈循环:AI代理的行为会反馈到各个控制层,用于持续优化和调整控制机制。
为了更清晰地理解这三个概念的区别,我们可以通过以下维度进行对比:
| 维度 | 约束(Constraints) | 规则(Rules) | 政策(Policies) |
|---|---|---|---|
| 性质 | 刚性边界 | 条件性指令 | 指导原则 |
| 具体程度 | 高度具体 | 中等具体 | 抽象 |
| 灵活性 | 低 | 中 | 高 |
| 执行方式 | 自动强制执行 | 条件触发执行 | 解释性应用 |
| 变更频率 | 低 | 中 | 高 |
| 主要目的 | 防止灾难性后果 | 规范常见行为 | 引导价值对齐 |
| 类比 | 物理围栏 | 交通法规 | 道德准则 |
2.4 实体关系与交互模型
现在让我们进一步探讨这些控制组件如何与AI代理系统交互,以及它们之间的关系。我们可以用实体关系图来表示:
这个ER图展示了各个实体及其关系:
- 政策可以指导规则的制定,也可以直接映射为约束
- 规则可以实现为具体的约束
- 约束、规则和政策分别以不同方式作用于AI代理
- AI代理产生行为,行为生成反馈
- 反馈用于优化政策、调整规则和更新约束
最后,让我们来看一个更完整的交互关系图,展示整个控制系统的工作流程:
这个序列图展示了一个完整的交互流程:
- 用户向AI代理发送请求
- AI代理依次查询政策、规则和约束引擎
- 基于控制信息,AI代理规划行动
- 如果行动符合要求,则执行并记录;否则拒绝并记录
- 行为数据被发送到反馈收集器
- 反馈用于持续优化各个控制层
3. 技术原理与实现
3.1 约束引擎设计与实现
约束引擎是整个控制系统的第一道防线,负责实施最刚性的边界限制。让我们深入探讨其设计原理和实现方法。
3.1.1 约束的数学模型
从数学角度来看,约束可以定义为对AI代理状态空间的限制。假设AI代理的状态可以表示为一个向量 s∈Ss \in Ss∈S,其中 SSS 是所有可能状态的集合,那么约束 CCC 就定义了一个允许的状态子集 SC⊆SS_C \subseteq SSC⊆S:
C:S→{true,false}C: S \rightarrow \{true, false\}C:S→{true,false}
其中 C(s)=trueC(s) = trueC(s)=true 当且仅当 s∈SCs \in S_Cs∈SC,即状态 sss 满足约束 CCC。
对于多个约束 C1,C2,...,CnC_1, C_2, ..., C_nC1,C2,...,Cn,AI代理的允许状态空间是所有约束允许状态的交集:
Sallowed=SC1∩SC2∩...∩SCnS_{allowed} = S_{C_1} \cap S_{C_2} \cap ... \cap S_{C_n}Sallowed=SC1∩SC2∩...∩SCn
在实际应用中,我们通常将约束分为几种类型:
- 硬约束:必须始终满足,违反会导致系统立即拒绝执行
- 软约束:尽量满足,但在必要时可以违反(通常带有惩罚)
- 单调性约束:一旦满足,将始终满足(如资源使用上限)
- 瞬时约束:只在特定时刻需要满足(如操作前的权限检查)
3.1.2 约束引擎架构
一个实用的约束引擎通常包含以下核心组件:
让我们逐一介绍这些组件:
-
约束执行器(Constraint Executor):作为引擎的入口点,负责协调整个约束检查和执行过程。
-
约束检查器(Constraint Checker):负责具体的约束验证逻辑,评估AI代理的状态或计划行动是否满足所有约束。
-
约束管理器(Constraint Manager):管理约束的生命周期,包括添加、删除、更新约束,以及处理约束之间的依赖关系和冲突。
-
约束存储(Constraint Store):持久化存储约束定义的地方,可以是数据库、配置文件或专门的知识库。
-
约束监控器(Constraint Monitor):在AI代理执行过程中持续监控其状态,确保约束在整个执行过程中都得到满足,而不仅仅是在开始时检查。
3.1.3 约束引擎实现代码
下面是一个简化版的约束引擎实现,使用Python语言编写:
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Set, Union
import time
import threading
class ConstraintType(Enum):
HARD = "hard" # 硬约束,必须满足
SOFT = "soft" # 软约束,尽量满足
MONOTONIC = "monotonic" # 单调约束,一旦满足则始终满足
INSTANTANEOUS = "instantaneous" # 瞬时约束,只在特定时刻检查
class ConstraintStatus(Enum):
SATISFIED = "satisfied"
VIOLATED = "violated"
PENDING = "pending"
@dataclass
class ConstraintContext:
"""约束检查上下文,包含AI代理的当前状态和环境信息"""
agent_id: str
agent_state: Dict[str, Any]
environment: Dict[str, Any]
action: Optional[Dict[str, Any]] = None
timestamp: float = time.time()
class Constraint(ABC):
"""约束基类"""
def __init__(
self,
id: str,
name: str,
type: ConstraintType,
description: str = "",
priority: int = 0
):
self.id = id
self.name = name
self.type = type
self.description = description
self.priority = priority
self.last_check_time: Optional[float] = None
self.last_status: ConstraintStatus = ConstraintStatus.PENDING
@abstractmethod
def check(self, context: ConstraintContext) -> ConstraintStatus:
"""检查约束是否满足"""
pass
def __str__(self) -> str:
return f"Constraint(id={self.id}, name={self.name}, type={self.type})"
class ResourceConstraint(Constraint):
"""资源使用约束"""
def __init__(
self,
id: str,
name: str,
resource_type: str,
max_limit: float,
type: ConstraintType = ConstraintType.HARD,
description: str = "",
priority: int = 0
):
super().__init__(id, name, type, description, priority)
self.resource_type = resource_type
self.max_limit = max_limit
def check(self, context: ConstraintContext) -> ConstraintStatus:
self.last_check_time = context.timestamp
if self.resource_type not in context.agent_state:
self.last_status = ConstraintStatus.SATISFIED
return self.last_status
usage = context.agent_state[self.resource_type]
if usage > self.max_limit:
self.last_status = ConstraintStatus.VIOLATED
else:
self.last_status = ConstraintStatus.SATISFIED
return self.last_status
class ActionConstraint(Constraint):
"""行动约束,限制AI代理可以执行的特定行动"""
def __init__(
self,
id: str,
name: str,
forbidden_actions: Set[str],
type: ConstraintType = ConstraintType.HARD,
description: str = "",
priority: int = 0
):
super().__init__(id, name, type, description, priority)
self.forbidden_actions = forbidden_actions
def check(self, context: ConstraintContext) -> ConstraintStatus:
self.last_check_time = context.timestamp
if context.action is None or "action_type" not in context.action:
self.last_status = ConstraintStatus.SATISFIED
return self.last_status
action_type = context.action["action_type"]
if action_type in self.forbidden_actions:
self.last_status = ConstraintStatus.VIOLATED
else:
self.last_status = ConstraintStatus.SATISFIED
return self.last_status
class ConditionalConstraint(Constraint):
"""条件约束,基于特定条件应用的约束"""
def __init__(
self,
id: str,
name: str,
condition: Callable[[ConstraintContext], bool],
inner_constraint: Constraint,
type: ConstraintType = ConstraintType.HARD,
description: str = "",
priority: int = 0
):
super().__init__(id, name, type, description, priority)
self.condition = condition
self.inner_constraint = inner_constraint
def check(self, context: ConstraintContext) -> ConstraintStatus:
self.last_check_time = context.timestamp
if not self.condition(context):
self.last_status = ConstraintStatus.SATISFIED
return self.last_status
self.last_status = self.inner_constraint.check(context)
return self.last_status
class ConstraintStore:
"""约束存储,管理约束的增删改查"""
def __init__(self):
self._constraints: Dict[str, Constraint] = {}
def add(self, constraint: Constraint) -> None:
"""添加约束"""
self._constraints[constraint.id] = constraint
def remove(self, constraint_id: str) -> bool:
"""移除约束"""
if constraint_id in self._constraints:
del self._constraints[constraint_id]
return True
return False
def get(self, constraint_id: str) -> Optional[Constraint]:
"""获取特定约束"""
return self._constraints.get(constraint_id)
def get_all(self) -> List[Constraint]:
"""获取所有约束"""
return list(self._constraints.values())
def get_by_type(self, constraint_type: ConstraintType) -> List[Constraint]:
"""根据类型获取约束"""
return [c for c in self._constraints.values() if c.type == constraint_type]
class ConstraintChecker:
"""约束检查器,负责检查约束是否满足"""
def __init__(self, store: ConstraintStore):
self.store = store
def check_all(self, context: ConstraintContext) -> Dict[str, ConstraintStatus]:
"""检查所有约束"""
results = {}
constraints = self.store.get_all()
# 按优先级排序,优先级高的先检查
constraints.sort(key=lambda c: c.priority, reverse=True)
for constraint in constraints:
results[constraint.id] = constraint.check(context)
return results
def has_violation(self, results: Dict[str, ConstraintStatus]) -> bool:
"""检查是否有硬约束被违反"""
for constraint_id, status in results.items():
constraint = self.store.get(constraint_id)
if constraint and constraint.type == ConstraintType.HARD and status == ConstraintStatus.VIOLATED:
return True
return False
def get_violations(self, results: Dict[str, ConstraintStatus]) -> List[Constraint]:
"""获取所有被违反的约束"""
violations = []
for constraint_id, status in results.items():
constraint = self.store.get(constraint_id)
if constraint and status == ConstraintStatus.VIOLATED:
violations.append(constraint)
return violations
class ConstraintMonitor:
"""约束监控器,在执行过程中持续监控约束"""
def __init__(self, checker: ConstraintChecker, check_interval: float = 1.0):
self.checker = checker
self.check_interval = check_interval
self._context: Optional[ConstraintContext] = None
self._running = False
self._thread: Optional[threading.Thread] = None
self._violation_callbacks: List[Callable[[List[Constraint]], None]] = []
def start(self, context: ConstraintContext) -> None:
"""启动监控"""
self._context = context
self._running = True
self._thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._thread.start()
def stop(self) -> None:
"""停止监控"""
self._running = False
if self._thread:
self._thread.join(timeout=2.0)
def update_context(self, context: ConstraintContext) -> None:
"""更新监控上下文"""
self._context = context
def add_violation_callback(self, callback: Callable[[List[Constraint]], None]) -> None:
"""添加违规回调"""
self._violation_callbacks.append(callback)
def _monitor_loop(self) -> None:
"""监控循环"""
while self._running and self._context:
results = self.checker.check_all(self._context)
violations = self.checker.get_violations(results)
if violations:
for callback in self._violation_callbacks:
try:
callback(violations)
except Exception as e:
print(f"Error in violation callback: {e}")
time.sleep(self.check_interval)
class ConstraintEngine:
"""约束引擎,整合所有组件"""
def __init__(self):
self.store = ConstraintStore()
self.checker = ConstraintChecker(self.store)
self.monitor = ConstraintMonitor(self.checker)
def add_constraint(self, constraint: Constraint) -> None:
"""添加约束"""
self.store.add(constraint)
def remove_constraint(self, constraint_id: str) -> bool:
"""移除约束"""
return self.store.remove(constraint_id)
def check_action(self, context: ConstraintContext) -> tuple[bool, List[Constraint]]:
"""
检查行动是否满足所有约束
返回:
(是否允许执行, 违规约束列表)
"""
results = self.checker.check_all(context)
violations = self.checker.get_violations(results)
# 检查是否有硬约束被违反
has_hard_violation = any(
v.type == ConstraintType.HARD for v in violations
)
return not has_hard_violation, violations
def start_monitoring(self, context: ConstraintContext) -> None:
"""启动执行监控"""
self.monitor.start(context)
def stop_monitoring(self) -> None:
"""停止执行监控"""
self.monitor.stop()
def update_monitoring_context(self, context: ConstraintContext) -> None:
"""更新监控上下文"""
self.monitor.update_context(context)
def add_violation_callback(self, callback: Callable[[List[Constraint]], None]) -> None:
"""添加违规回调"""
self.monitor.add_violation_callback(callback)
这个实现提供了一个灵活的约束引擎框架,可以根据具体需求进行扩展。让我们来看一个使用示例:
# 创建约束引擎
engine = ConstraintEngine()
# 添加资源约束:API调用次数不超过100
api_constraint = ResourceConstraint(
id="api_limit",
name="API调用限制",
resource_type="api_calls",
max_limit=100,
type=ConstraintType.HARD,
description="限制API调用次数,防止滥用"
)
engine.add_constraint(api_constraint)
# 添加行动约束:禁止执行删除操作
delete_constraint = ActionConstraint(
id="no_delete",
name="禁止删除操作",
forbidden_actions={"delete", "remove", "erase"},
type=ConstraintType.HARD,
description="禁止执行任何删除操作"
)
engine.add_constraint(delete_constraint)
# 创建一个示例上下文
context = ConstraintContext(
agent_id="agent_001",
agent_state={"api_calls": 95},
environment={"time": "work_hours"},
action={"action_type": "query", "parameters": {"table": "users"}}
)
# 检查行动
allowed, violations = engine.check_action(context)
print(f"行动是否允许: {allowed}")
if violations:
print("违规约束:")
for v in violations:
print(f" - {v.name}")
# 现在尝试一个会违反约束的行动
context.action = {"action_type": "delete", "parameters": {"id": 123}}
allowed, violations = engine.check_action(context)
print(f"\n行动是否允许: {allowed}")
if violations:
print("违规约束:")
for v in violations:
print(f" - {v.name}")
# 添加一个违规回调
def on_violation(violations):
print(f"\n监控检测到违规:")
for v in violations:
print(f" - {v.name}")
engine.add_violation_callback(on_violation)
# 启动监控
print("\n启动监控...")
engine.start_monitoring(context)
# 模拟一段时间的执行
time.sleep(3)
# 更新上下文,API调用次数超过限制
context.agent_state["api_calls"] = 105
engine.update_monitoring_context(context)
# 继续监控一段时间
time.sleep(3)
# 停止监控
engine.stop_monitoring()
print("\n监控已停止")
3.2 规则引擎设计与实现
规则引擎位于约束引擎之上,提供了更加灵活和条件化的控制机制。它可以根据特定条件触发相应的行动或限制,比约束引擎更加动态和智能。
3.2.1 规则的数学模型
规则通常可以表示为"如果-那么"(If-Then)结构:
IF condition THEN action\text{IF } condition \text{ THEN } actionIF condition THEN action
其中,conditionconditioncondition 是一个关于当前状态的谓词,actionactionaction 是在条件满足时应该执行的操作。
从逻辑角度来看,规则系统可以被视为一种产生式系统(Production System),由三个主要部分组成:
- 工作内存(Working Memory):包含当前状态信息
- 规则库(Rule Base):包含所有规则的集合
- 推理引擎(Inference Engine):负责匹配规则并执行相应操作
推理引擎通常采用两种主要策略:
- 前向链(Forward Chaining):从已知事实出发,应用规则得出新结论
- 后向链(Backward Chaining):从目标出发,反向查找支持该目标的规则
在AI Agent控制场景中,我们通常使用前向链推理,因为我们需要根据当前状态主动应用规则。
3.2.2 规则引擎架构
一个实用的规则引擎通常包含以下组件:
让我们逐一介绍这些组件:
-
规则执行器(Rule Executor):作为引擎的入口点,协调整个规则应用过程。
-
推理引擎(Inference Engine):负责规则匹配和推理过程,实现前向链或后向链推理。
-
规则管理器(Rule Manager):管理规则的生命周期,包括添加、删除、更新规则,以及处理规则之间的依赖关系。
-
规则存储(Rule Store):持久化存储规则定义的地方。
-
工作内存(Working Memory):临时存储当前状态信息,供推理引擎使用。
-
冲突解决器(Conflict Resolver):当多个规则同时匹配时,决定应该优先应用哪个规则。
3.2.3 规则引擎实现代码
下面是一个简化版的规则引擎实现:
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Set, Union
import time
import re
class RulePriority(Enum):
LOW = 0
MEDIUM = 1
HIGH = 2
CRITICAL = 3
class RuleStatus(Enum):
INACTIVE = "inactive"
ACTIVE = "active"
TRIGGERED = "triggered"
EXECUTED = "executed"
ERROR = "error"
class ConflictResolutionStrategy(Enum):
FIRST_MATCH = "first_match" # 选择第一个匹配的规则
PRIORITY = "priority" # 选择优先级最高的规则
SPECIFICITY = "specificity" # 选择最具体的规则
RECENCY = "recency" # 选择最近添加或修改的规则
SALIENCE = "salience" # 基于规则的显著性评分
@dataclass
class RuleContext:
"""规则执行上下文"""
agent_id: str
state: Dict[str, Any] = field(default_factory=dict)
environment: Dict[str, Any] = field(default_factory=dict)
action: Optional[Dict[str, Any]] = None
history: List[Dict[str, Any]] = field(default_factory=list)
timestamp: float = time.time()
additional_data: Dict[str, Any] = field(default_factory=dict)
class Condition(ABC):
"""条件基类"""
@abstractmethod
def evaluate(self, context: RuleContext) -> bool:
"""评估条件是否满足"""
pass
@abstractmethod
def get_specificity(self) -> int:
"""获取条件的具体度,用于冲突解决"""
pass
class AlwaysTrueCondition(Condition):
"""始终为真的条件"""
def evaluate(self, context: RuleContext) -> bool:
return True
def get_specificity(self) -> int:
return 0
class AndCondition(Condition):
"""逻辑与条件"""
def __init__(self, *conditions: Condition):
self.conditions = conditions
def evaluate(self, context: RuleContext) -> bool:
return all(cond.evaluate(context) for cond in self.conditions)
def get_specificity(self) -> int:
return sum(cond.get_specificity() for cond in self.conditions)
class OrCondition(Condition):
"""逻辑或条件"""
def __init__(self, *conditions: Condition):
self.conditions = conditions
def evaluate(self, context: RuleContext) -> bool:
return any(cond.evaluate(context) for cond in self.conditions)
def get_specificity(self) -> int:
return max(cond.get_specificity() for cond in self.conditions) if self.conditions else 0
class NotCondition(Condition):
"""逻辑非条件"""
def __init__(self, condition: Condition):
self.condition = condition
def evaluate(self, context: RuleContext) -> bool:
return not self.condition.evaluate(context)
def get_specificity(self) -> int:
return self.condition.get_specificity()
class StateCondition(Condition):
"""状态条件,检查特定状态值"""
def __init__(
self,
key: str,
operator: str,
value: Any,
path: Optional[str] = None # 用于嵌套状态的点分隔路径,如 "user.profile.age"
):
self.key = key
self.operator = operator
self.value = value
self.path = path
def _get_value(self, context: RuleContext) -> Any:
"""从上下文中获取值"""
if self.path:
# 处理嵌套路径
parts = self.path.split('.')
current = context.state
for part in parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return None
return current
else:
return context.state.get(self.key)
def evaluate(self, context: RuleContext) -> bool:
actual_value = self._get_value(context)
if actual_value is None:
return False
operators = {
"==": lambda a, b: a == b,
"!=": lambda a, b: a != b,
">": lambda a, b: a > b,
">=": lambda a, b: a >= b,
"<": lambda a, b: a < b,
"<=": lambda a, b: a <= b,
"in": lambda a, b: a in b,
"not_in": lambda a, b: a not in b,
"contains": lambda a, b: b in a,
"not_contains": lambda a, b: b not in a,
"matches": lambda a, b: bool(re.match(b, str(a))),
"is_type": lambda a, b: isinstance(a, b),
}
if self.operator not in operators:
raise ValueError(f"未知操作符: {self.operator}")
try:
return operators[self.operator](actual_value, self.value)
except Exception:
return False
def get_specificity(self) -> int:
# 状态条件比简单条件更具体
return 10
class ActionCondition(Condition):
"""行动条件,检查AI代理计划执行的行动"""
def __init__(
self,
action_type: Optional[str] = None,
action_properties: Optional[Dict[str, Any]] = None
):
self.action_type = action_type
self.action_properties = action_properties
def evaluate(self, context: RuleContext) -> bool:
if context.action is None:
return False
# 检查行动类型
if self.action_type is not None and context.action.get("action_type") != self.action_type:
return False
# 检查行动属性
if self.action_properties:
for key, expected_value in self.action_properties.items():
actual_value = context.action.get(key)
if actual_value != expected_value:
return False
return True
def get_specificity(self) -> int:
# 行动条件的具体度取决于指定的属性数量
specificity = 5 # 基础具体度
if self.action_type:
specificity += 3
if self.action_properties:
specificity += len(self.action_properties) * 2
return specificity
class TimeCondition(Condition):
"""时间条件,基于时间因素触发"""
def __init__(
self,
start_hour: Optional[int] = None,
end_hour: Optional[int] = None,
days_of_week: Optional[List[int]] = None, # 0=周一, 6=周日
specific_times: Optional[List[float]] = None # 一天中的具体时间,单位为小时
):
self.start_hour = start_hour
self.end_hour = end_hour
self.days_of_week = days_of_week
self.specific_times = specific_times
def evaluate(self, context: RuleContext) -> bool:
t = time.localtime(context.timestamp)
hour = t.tm_hour
minute = t.tm_min
day_of_week = t.tm_wday # 0=周一, 6=周日
current_time = hour + minute / 60.0
# 检查时间段
if self.start_hour is not None and self.end_hour is not None:
if not (self.start_hour <= hour < self.end_hour):
return False
# 检查星期几
if self.days_of_week is not None and day_of_week not in self.days_of_week:
return False
# 检查具体时间
if self.specific_times is not None:
# 检查是否接近任何一个特定时间(10分钟内)
near_specific_time = any(
abs(current_time - specific_time) < 10/60.0
for specific_time in self.specific_times
)
if not near_specific_time:
return False
return True
def get_specificity(self) -> int:
specificity = 3 # 基础具体度
if self.start_hour is not None and self.end_hour is not None:
specificity += 3
if self.days_of_week:
specificity += 2
if self.specific_times:
specificity += 4
return specificity
class CustomCondition(Condition):
"""自定义条件,使用回调函数"""
def __init__(
self,
evaluator: Callable[[RuleContext], bool],
specificity: int = 5
):
self.evaluator = evaluator
self._specificity = specificity
def evaluate(self, context: RuleContext) -> bool:
return self.evaluator(context)
def get_specificity(self) -> int:
return self._specificity
class Action(ABC):
"""行动基类"""
@abstractmethod
def execute(self, context: RuleContext) -> Dict[str, Any]:
"""执行行动并返回结果"""
pass
class ModifyStateAction(Action):
"""修改状态的行动"""
def __init__(self, updates: Dict[str, Any]):
self.updates = updates
def execute(self, context: RuleContext) -> Dict[str, Any]:
# 在实际系统中,这里可能会直接修改上下文或返回修改
return {
"action": "modify_state",
"updates": self.updates,
"success": True
}
class RejectAction(Action):
"""拒绝当前请求的行动"""
def __init__(self, reason: str = ""):
self.reason = reason
def execute(self, context: RuleContext) -> Dict[str, Any]:
return {
"action": "reject",
"reason": self.reason,
"success": True
}
class ModifyRequestAction(Action):
"""修改请求的行动"""
def __init__(self, modifications: Dict[str, Any]):
self.modifications = modifications
def execute(self, context: RuleContext) -> Dict[str, Any]:
return {
"action": "modify_request",
"modifications": self.modifications,
"success": True
}
class LogAction(Action):
"""记录日志的行动"""
def __init__(self, message: str, level: str = "info"):
self.message = message
self.level = level
def execute(self, context: RuleContext) -> Dict[str, Any]:
# 在实际系统中,这里会写入日志系统
print(f"[{self.level.upper()}] {self.message}")
return {
"action": "log",
"message": self.message,
"level": self.level,
"success": True
}
class NotifyAction(Action):
"""发送通知的行动"""
def __init__(self, recipient: str, message: str):
self.recipient = recipient
self.message = message
def execute(self, context: RuleContext) -> Dict[str, Any]:
# 在实际系统中,这里会发送真实通知
print(f"通知 {self.recipient}: {self.message}")
return {
"action": "notify",
"recipient": self.recipient,
"message": self.message,
"success": True
}
class CustomAction(Action):
"""自定义行动,使用回调函数"""
def __init__(self, executor: Callable[[RuleContext], Dict[str, Any]]):
self.executor = executor
def execute(self, context: RuleContext) -> Dict[str, Any]:
return self.executor(context)
@dataclass
class Rule:
"""规则类"""
id: str
name: str
condition: Condition
actions: List[Action]
priority: RulePriority = RulePriority.MEDIUM
status: RuleStatus = RuleStatus.ACTIVE
description: str = ""
salience: int = 0 # 显著性评分,用于冲突解决
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
metadata: Dict[str, Any] = field(default_factory=dict)
def get_specificity(self) -> int:
"""获取规则的具体度"""
return self.condition.get_specificity()
def evaluate(self, context: RuleContext) -> bool:
"""评估规则条件是否满足"""
if self.status != RuleStatus.ACTIVE:
return False
return self.condition.evaluate(context)
def execute(self, context: RuleContext) -> List[Dict[str, Any]]:
"""执行规则的所有行动"""
results = []
for action in self.actions:
try:
result = action.execute(context)
results.append(result)
except Exception as e:
results.append({
"action": str(action),
"error": str(e),
"success": False
})
return results
class WorkingMemory:
"""工作内存,存储当前状态和事实"""
def __init__(self):
self._facts: Dict[str, Any] = {}
self._history: List[Dict[str, Any]] = []
def set(self, key: str, value: Any) -> None:
"""设置事实"""
self._facts[key] = value
def get(self, key: str, default: Any = None) -> Any:
"""获取事实"""
return self._facts.get(key, default)
def delete(self, key: str) -> bool:
"""删除事实"""
if key in self._facts:
del self._facts[key]
return True
return False
def get_all(self) -> Dict[str, Any]:
"""
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)