构建可控的 AI Agent Harness Engineering:约束、规则与政策引擎

关键词

  • AI Agent 控制
  • Harness Engineering
  • 约束引擎
  • 规则系统
  • 政策框架
  • 安全对齐
  • 可信AI

摘要

随着AI代理(Agent)系统在各个领域的广泛应用,如何确保这些系统在复杂环境中安全、可靠且符合人类价值观地运行,已成为AI领域的关键挑战。本文深入探讨"AI Agent Harness Engineering"——一套用于构建可控AI代理的工程方法,重点关注约束、规则与政策引擎的设计与实现。我们将从核心概念解析开始,通过生动比喻和技术原理分析,展示如何构建多层级的控制框架。文章包含详细的算法设计、代码实现和实际案例,为开发者提供构建安全可控AI代理系统的完整指南。


1. 背景介绍

1.1 AI Agent 时代的到来与挑战

想象一下,你拥有一位高度智能的私人助手,它可以帮你管理日程、处理邮件、甚至代表你进行简单的商务谈判。这个助手不仅理解你的指令,还能根据环境变化自主做出决策。这听起来像是科幻小说中的场景,但随着大语言模型(LLM)和自主代理技术的快速发展,这正在成为现实。

然而,随着AI代理能力的增强,一个关键问题浮出水面:我们如何确保这些自主系统的行为始终符合我们的期望、价值观和安全要求?

在传统的软件系统中,我们通过明确的代码逻辑来控制系统行为。但AI代理,特别是基于大语言模型的代理,具有高度的自主性和适应性,它们的决策过程往往是"黑盒"式的,难以预测和控制。这就引发了所谓的"控制问题"——如何在赋予AI系统强大能力的同时,确保它们安全可靠地服务于人类利益。

1.2 从"对齐"到"工程化控制"

AI安全研究领域已经提出了"对齐"(Alignment)概念,指的是确保AI系统的目标与人类价值观保持一致。然而,对齐研究更多是理论性的,而我们需要的是一套实用的工程方法,能够在实际系统中落地实施。

这就是"AI Agent Harness Engineering"应运而生的背景。"Harness"这个词有多重含义:它既指马具(用于控制和引导马匹),也指利用和开发(自然资源)。在AI语境下,它代表了一套工程方法,用于"驾驭"和"利用"AI代理的强大能力,同时确保它们在安全可控的范围内运行。

1.3 目标读者与核心问题

本文的目标读者包括:

  • AI应用开发者,希望构建安全可靠的代理系统
  • 企业技术决策者,需要评估和管理AI系统风险
  • AI安全研究者,对实用控制方法感兴趣
  • 对AI代理技术有兴趣的技术爱好者

我们将重点解决以下核心问题:

  1. 如何设计多层次的控制框架,平衡AI代理的自主性与可控性?
  2. 约束、规则和政策引擎各自的作用是什么?它们如何协同工作?
  3. 如何在实际系统中实现这些控制机制,同时不显著降低AI代理的性能和用户体验?
  4. 有哪些最佳实践和常见陷阱需要注意?

2. 核心概念解析

2.1 什么是 AI Agent Harness Engineering?

让我们用一个生动的比喻来理解这个概念。想象你正在驾驶一辆具有高级自动驾驶功能的汽车。这辆车非常智能,可以自动规划路线、避让障碍物、甚至根据交通状况调整驾驶风格。但无论它多么智能,你——作为人类驾驶员——仍然保持最终控制权。你可以随时介入,纠正车辆的行为,或者设定约束条件(比如最高车速)。

AI Agent Harness Engineering 就是要为AI代理系统构建这样一套"驾驶控制系统"。它不是要完全替代AI代理的自主决策,而是要建立一个框架,确保AI代理的行为始终在可接受的范围内,同时保留其适应环境和解决问题的能力。

2.2 核心概念:约束、规则与政策

在AI Agent Harness Engineering中,有三个核心概念需要理解:约束(Constraints)、规则(Rules)和政策(Policies)。虽然它们经常被互换使用,但在我们的框架中,它们代表了不同层级和类型的控制机制。

2.2.1 约束(Constraints)

约束是最底层、最刚性的控制机制。它们定义了AI代理绝对不能逾越的边界,就像汽车的物理限制(比如无法超过其机械设计的最高速度)或者道路上的物理障碍。

在技术实现上,约束通常表现为硬性限制:

  • 资源约束:AI代理不能使用超过指定限额的计算资源、API调用次数或资金
  • 操作约束:AI代理不能执行某些特定类型的操作(如删除系统文件、进行大额金融交易)
  • 输出约束:AI代理的输出必须符合特定格式或长度限制
2.2.2 规则(Rules)

规则位于约束之上,是更加结构化和条件化的控制机制。如果说约束是"不能做什么",那么规则就是"在什么情况下应该/不应该做什么"。

规则可以类比为交通法规:红灯停、绿灯行、限速行驶等。它们更加灵活,可以根据具体情况应用,但仍然是相对明确和可执行的。

在AI系统中,规则可能表现为:

  • 如果用户询问竞争对手的敏感信息,应拒绝回答
  • 当执行金融交易时,必须获得用户的明确确认
  • 如果检测到特定类型的内容(如暴力、仇恨言论),应进行过滤
2.2.3 政策(Policies)

政策是最高层级、最抽象的控制机制。它们定义了AI代理应该遵循的原则、价值观和目标,类似于一个组织的使命宣言或道德准则。

政策不像规则那样具体明确,它们需要根据具体情境进行解释和应用。例如,一个"以用户最佳利益为行动准则"的政策,在不同情况下可能有不同的具体表现形式。

在AI系统中,政策可能包括:

  • 尊重用户隐私和数据所有权
  • 避免造成伤害("首先,不伤害"原则)
  • 促进公平和包容
  • 保持透明和可解释性

2.3 概念间的关系与层次结构

约束、规则和政策不是孤立存在的,它们形成了一个金字塔状的层次结构,共同作用于AI代理系统:

指导原则

具体实现

直接控制

行为反馈

行为反馈

行为反馈

政策层 Policies

规则层 Rules

约束层 Constraints

AI Agent

这个层次结构体现了几个重要原则:

  1. 自上而下的指导:政策指导规则的制定,规则指导约束的设置。每一层都是上一层的具体实现。

  2. 自下而上的执行:约束直接作用于AI代理,是最直接的控制机制;规则通过约束实现;政策则通过规则和约束间接发挥作用。

  3. 反馈循环:AI代理的行为会反馈到各个控制层,用于持续优化和调整控制机制。

为了更清晰地理解这三个概念的区别,我们可以通过以下维度进行对比:

维度 约束(Constraints) 规则(Rules) 政策(Policies)
性质 刚性边界 条件性指令 指导原则
具体程度 高度具体 中等具体 抽象
灵活性
执行方式 自动强制执行 条件触发执行 解释性应用
变更频率
主要目的 防止灾难性后果 规范常见行为 引导价值对齐
类比 物理围栏 交通法规 道德准则

2.4 实体关系与交互模型

现在让我们进一步探讨这些控制组件如何与AI代理系统交互,以及它们之间的关系。我们可以用实体关系图来表示:

指导制定

实现为

直接映射为

控制

指导

引导

产生

生成

优化

调整

更新

POLICY

string

id

string

name

string

description

int

priority

RULE

string

id

string

name

string

condition

string

action

int

priority

CONSTRAINT

string

id

string

name

string

type

string

limit

boolean

enforced

AGENT

string

id

string

name

string

type

string

version

BEHAVIOR

string

id

string

agent_id

timestamp

timestamp

string

action

string

context

FEEDBACK

string

id

string

behavior_id

int

rating

string

comment

string

source

这个ER图展示了各个实体及其关系:

  • 政策可以指导规则的制定,也可以直接映射为约束
  • 规则可以实现为具体的约束
  • 约束、规则和政策分别以不同方式作用于AI代理
  • AI代理产生行为,行为生成反馈
  • 反馈用于优化政策、调整规则和更新约束

最后,让我们来看一个更完整的交互关系图,展示整个控制系统的工作流程:

反馈收集器 行为监控器 约束引擎 规则引擎 政策引擎 AI Agent 用户 反馈收集器 行为监控器 约束引擎 规则引擎 政策引擎 AI Agent 用户 alt [行动符合所有控制要求] [行动违反控制要求] 发送请求/任务 查询适用政策 返回政策指导 检查相关规则 返回规则要求 验证约束条件 返回约束检查结果 基于控制信息规划行动 执行行动/返回结果 记录行为 返回拒绝/调整建议 记录违规尝试 发送行为数据 提供政策优化建议 提供规则调整建议 提供约束更新建议

这个序列图展示了一个完整的交互流程:

  1. 用户向AI代理发送请求
  2. AI代理依次查询政策、规则和约束引擎
  3. 基于控制信息,AI代理规划行动
  4. 如果行动符合要求,则执行并记录;否则拒绝并记录
  5. 行为数据被发送到反馈收集器
  6. 反馈用于持续优化各个控制层

3. 技术原理与实现

3.1 约束引擎设计与实现

约束引擎是整个控制系统的第一道防线,负责实施最刚性的边界限制。让我们深入探讨其设计原理和实现方法。

3.1.1 约束的数学模型

从数学角度来看,约束可以定义为对AI代理状态空间的限制。假设AI代理的状态可以表示为一个向量 s∈Ss \in SsS,其中 SSS 是所有可能状态的集合,那么约束 CCC 就定义了一个允许的状态子集 SC⊆SS_C \subseteq SSCS

C:S→{true,false}C: S \rightarrow \{true, false\}C:S{true,false}

其中 C(s)=trueC(s) = trueC(s)=true 当且仅当 s∈SCs \in S_CsSC,即状态 sss 满足约束 CCC

对于多个约束 C1,C2,...,CnC_1, C_2, ..., C_nC1,C2,...,Cn,AI代理的允许状态空间是所有约束允许状态的交集:

Sallowed=SC1∩SC2∩...∩SCnS_{allowed} = S_{C_1} \cap S_{C_2} \cap ... \cap S_{C_n}Sallowed=SC1SC2...SCn

在实际应用中,我们通常将约束分为几种类型:

  1. 硬约束:必须始终满足,违反会导致系统立即拒绝执行
  2. 软约束:尽量满足,但在必要时可以违反(通常带有惩罚)
  3. 单调性约束:一旦满足,将始终满足(如资源使用上限)
  4. 瞬时约束:只在特定时刻需要满足(如操作前的权限检查)
3.1.2 约束引擎架构

一个实用的约束引擎通常包含以下核心组件:

约束引擎

请求执行

检查约束

获取约束

读写

执行中监控

违规通知

批准/拒绝

更新状态

约束执行器

约束检查器

约束管理器

约束存储

约束监控器

AI Agent

让我们逐一介绍这些组件:

  1. 约束执行器(Constraint Executor):作为引擎的入口点,负责协调整个约束检查和执行过程。

  2. 约束检查器(Constraint Checker):负责具体的约束验证逻辑,评估AI代理的状态或计划行动是否满足所有约束。

  3. 约束管理器(Constraint Manager):管理约束的生命周期,包括添加、删除、更新约束,以及处理约束之间的依赖关系和冲突。

  4. 约束存储(Constraint Store):持久化存储约束定义的地方,可以是数据库、配置文件或专门的知识库。

  5. 约束监控器(Constraint Monitor):在AI代理执行过程中持续监控其状态,确保约束在整个执行过程中都得到满足,而不仅仅是在开始时检查。

3.1.3 约束引擎实现代码

下面是一个简化版的约束引擎实现,使用Python语言编写:

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Set, Union
import time
import threading


class ConstraintType(Enum):
    HARD = "hard"  # 硬约束,必须满足
    SOFT = "soft"  # 软约束,尽量满足
    MONOTONIC = "monotonic"  # 单调约束,一旦满足则始终满足
    INSTANTANEOUS = "instantaneous"  # 瞬时约束,只在特定时刻检查


class ConstraintStatus(Enum):
    SATISFIED = "satisfied"
    VIOLATED = "violated"
    PENDING = "pending"


@dataclass
class ConstraintContext:
    """约束检查上下文,包含AI代理的当前状态和环境信息"""
    agent_id: str
    agent_state: Dict[str, Any]
    environment: Dict[str, Any]
    action: Optional[Dict[str, Any]] = None
    timestamp: float = time.time()


class Constraint(ABC):
    """约束基类"""
    
    def __init__(
        self, 
        id: str, 
        name: str, 
        type: ConstraintType, 
        description: str = "",
        priority: int = 0
    ):
        self.id = id
        self.name = name
        self.type = type
        self.description = description
        self.priority = priority
        self.last_check_time: Optional[float] = None
        self.last_status: ConstraintStatus = ConstraintStatus.PENDING
    
    @abstractmethod
    def check(self, context: ConstraintContext) -> ConstraintStatus:
        """检查约束是否满足"""
        pass
    
    def __str__(self) -> str:
        return f"Constraint(id={self.id}, name={self.name}, type={self.type})"


class ResourceConstraint(Constraint):
    """资源使用约束"""
    
    def __init__(
        self, 
        id: str, 
        name: str, 
        resource_type: str,
        max_limit: float,
        type: ConstraintType = ConstraintType.HARD,
        description: str = "",
        priority: int = 0
    ):
        super().__init__(id, name, type, description, priority)
        self.resource_type = resource_type
        self.max_limit = max_limit
    
    def check(self, context: ConstraintContext) -> ConstraintStatus:
        self.last_check_time = context.timestamp
        
        if self.resource_type not in context.agent_state:
            self.last_status = ConstraintStatus.SATISFIED
            return self.last_status
        
        usage = context.agent_state[self.resource_type]
        if usage > self.max_limit:
            self.last_status = ConstraintStatus.VIOLATED
        else:
            self.last_status = ConstraintStatus.SATISFIED
        
        return self.last_status


class ActionConstraint(Constraint):
    """行动约束,限制AI代理可以执行的特定行动"""
    
    def __init__(
        self, 
        id: str, 
        name: str, 
        forbidden_actions: Set[str],
        type: ConstraintType = ConstraintType.HARD,
        description: str = "",
        priority: int = 0
    ):
        super().__init__(id, name, type, description, priority)
        self.forbidden_actions = forbidden_actions
    
    def check(self, context: ConstraintContext) -> ConstraintStatus:
        self.last_check_time = context.timestamp
        
        if context.action is None or "action_type" not in context.action:
            self.last_status = ConstraintStatus.SATISFIED
            return self.last_status
        
        action_type = context.action["action_type"]
        if action_type in self.forbidden_actions:
            self.last_status = ConstraintStatus.VIOLATED
        else:
            self.last_status = ConstraintStatus.SATISFIED
        
        return self.last_status


class ConditionalConstraint(Constraint):
    """条件约束,基于特定条件应用的约束"""
    
    def __init__(
        self, 
        id: str, 
        name: str, 
        condition: Callable[[ConstraintContext], bool],
        inner_constraint: Constraint,
        type: ConstraintType = ConstraintType.HARD,
        description: str = "",
        priority: int = 0
    ):
        super().__init__(id, name, type, description, priority)
        self.condition = condition
        self.inner_constraint = inner_constraint
    
    def check(self, context: ConstraintContext) -> ConstraintStatus:
        self.last_check_time = context.timestamp
        
        if not self.condition(context):
            self.last_status = ConstraintStatus.SATISFIED
            return self.last_status
        
        self.last_status = self.inner_constraint.check(context)
        return self.last_status


class ConstraintStore:
    """约束存储,管理约束的增删改查"""
    
    def __init__(self):
        self._constraints: Dict[str, Constraint] = {}
    
    def add(self, constraint: Constraint) -> None:
        """添加约束"""
        self._constraints[constraint.id] = constraint
    
    def remove(self, constraint_id: str) -> bool:
        """移除约束"""
        if constraint_id in self._constraints:
            del self._constraints[constraint_id]
            return True
        return False
    
    def get(self, constraint_id: str) -> Optional[Constraint]:
        """获取特定约束"""
        return self._constraints.get(constraint_id)
    
    def get_all(self) -> List[Constraint]:
        """获取所有约束"""
        return list(self._constraints.values())
    
    def get_by_type(self, constraint_type: ConstraintType) -> List[Constraint]:
        """根据类型获取约束"""
        return [c for c in self._constraints.values() if c.type == constraint_type]


class ConstraintChecker:
    """约束检查器,负责检查约束是否满足"""
    
    def __init__(self, store: ConstraintStore):
        self.store = store
    
    def check_all(self, context: ConstraintContext) -> Dict[str, ConstraintStatus]:
        """检查所有约束"""
        results = {}
        constraints = self.store.get_all()
        
        # 按优先级排序,优先级高的先检查
        constraints.sort(key=lambda c: c.priority, reverse=True)
        
        for constraint in constraints:
            results[constraint.id] = constraint.check(context)
        
        return results
    
    def has_violation(self, results: Dict[str, ConstraintStatus]) -> bool:
        """检查是否有硬约束被违反"""
        for constraint_id, status in results.items():
            constraint = self.store.get(constraint_id)
            if constraint and constraint.type == ConstraintType.HARD and status == ConstraintStatus.VIOLATED:
                return True
        return False
    
    def get_violations(self, results: Dict[str, ConstraintStatus]) -> List[Constraint]:
        """获取所有被违反的约束"""
        violations = []
        for constraint_id, status in results.items():
            constraint = self.store.get(constraint_id)
            if constraint and status == ConstraintStatus.VIOLATED:
                violations.append(constraint)
        return violations


class ConstraintMonitor:
    """约束监控器,在执行过程中持续监控约束"""
    
    def __init__(self, checker: ConstraintChecker, check_interval: float = 1.0):
        self.checker = checker
        self.check_interval = check_interval
        self._context: Optional[ConstraintContext] = None
        self._running = False
        self._thread: Optional[threading.Thread] = None
        self._violation_callbacks: List[Callable[[List[Constraint]], None]] = []
    
    def start(self, context: ConstraintContext) -> None:
        """启动监控"""
        self._context = context
        self._running = True
        self._thread = threading.Thread(target=self._monitor_loop, daemon=True)
        self._thread.start()
    
    def stop(self) -> None:
        """停止监控"""
        self._running = False
        if self._thread:
            self._thread.join(timeout=2.0)
    
    def update_context(self, context: ConstraintContext) -> None:
        """更新监控上下文"""
        self._context = context
    
    def add_violation_callback(self, callback: Callable[[List[Constraint]], None]) -> None:
        """添加违规回调"""
        self._violation_callbacks.append(callback)
    
    def _monitor_loop(self) -> None:
        """监控循环"""
        while self._running and self._context:
            results = self.checker.check_all(self._context)
            violations = self.checker.get_violations(results)
            
            if violations:
                for callback in self._violation_callbacks:
                    try:
                        callback(violations)
                    except Exception as e:
                        print(f"Error in violation callback: {e}")
            
            time.sleep(self.check_interval)


class ConstraintEngine:
    """约束引擎,整合所有组件"""
    
    def __init__(self):
        self.store = ConstraintStore()
        self.checker = ConstraintChecker(self.store)
        self.monitor = ConstraintMonitor(self.checker)
    
    def add_constraint(self, constraint: Constraint) -> None:
        """添加约束"""
        self.store.add(constraint)
    
    def remove_constraint(self, constraint_id: str) -> bool:
        """移除约束"""
        return self.store.remove(constraint_id)
    
    def check_action(self, context: ConstraintContext) -> tuple[bool, List[Constraint]]:
        """
        检查行动是否满足所有约束
        
        返回:
            (是否允许执行, 违规约束列表)
        """
        results = self.checker.check_all(context)
        violations = self.checker.get_violations(results)
        
        # 检查是否有硬约束被违反
        has_hard_violation = any(
            v.type == ConstraintType.HARD for v in violations
        )
        
        return not has_hard_violation, violations
    
    def start_monitoring(self, context: ConstraintContext) -> None:
        """启动执行监控"""
        self.monitor.start(context)
    
    def stop_monitoring(self) -> None:
        """停止执行监控"""
        self.monitor.stop()
    
    def update_monitoring_context(self, context: ConstraintContext) -> None:
        """更新监控上下文"""
        self.monitor.update_context(context)
    
    def add_violation_callback(self, callback: Callable[[List[Constraint]], None]) -> None:
        """添加违规回调"""
        self.monitor.add_violation_callback(callback)

这个实现提供了一个灵活的约束引擎框架,可以根据具体需求进行扩展。让我们来看一个使用示例:

# 创建约束引擎
engine = ConstraintEngine()

# 添加资源约束:API调用次数不超过100
api_constraint = ResourceConstraint(
    id="api_limit",
    name="API调用限制",
    resource_type="api_calls",
    max_limit=100,
    type=ConstraintType.HARD,
    description="限制API调用次数,防止滥用"
)
engine.add_constraint(api_constraint)

# 添加行动约束:禁止执行删除操作
delete_constraint = ActionConstraint(
    id="no_delete",
    name="禁止删除操作",
    forbidden_actions={"delete", "remove", "erase"},
    type=ConstraintType.HARD,
    description="禁止执行任何删除操作"
)
engine.add_constraint(delete_constraint)

# 创建一个示例上下文
context = ConstraintContext(
    agent_id="agent_001",
    agent_state={"api_calls": 95},
    environment={"time": "work_hours"},
    action={"action_type": "query", "parameters": {"table": "users"}}
)

# 检查行动
allowed, violations = engine.check_action(context)
print(f"行动是否允许: {allowed}")
if violations:
    print("违规约束:")
    for v in violations:
        print(f"  - {v.name}")

# 现在尝试一个会违反约束的行动
context.action = {"action_type": "delete", "parameters": {"id": 123}}
allowed, violations = engine.check_action(context)
print(f"\n行动是否允许: {allowed}")
if violations:
    print("违规约束:")
    for v in violations:
        print(f"  - {v.name}")

# 添加一个违规回调
def on_violation(violations):
    print(f"\n监控检测到违规:")
    for v in violations:
        print(f"  - {v.name}")

engine.add_violation_callback(on_violation)

# 启动监控
print("\n启动监控...")
engine.start_monitoring(context)

# 模拟一段时间的执行
time.sleep(3)

# 更新上下文,API调用次数超过限制
context.agent_state["api_calls"] = 105
engine.update_monitoring_context(context)

# 继续监控一段时间
time.sleep(3)

# 停止监控
engine.stop_monitoring()
print("\n监控已停止")

3.2 规则引擎设计与实现

规则引擎位于约束引擎之上,提供了更加灵活和条件化的控制机制。它可以根据特定条件触发相应的行动或限制,比约束引擎更加动态和智能。

3.2.1 规则的数学模型

规则通常可以表示为"如果-那么"(If-Then)结构:

IF condition THEN action\text{IF } condition \text{ THEN } actionIF condition THEN action

其中,conditionconditioncondition 是一个关于当前状态的谓词,actionactionaction 是在条件满足时应该执行的操作。

从逻辑角度来看,规则系统可以被视为一种产生式系统(Production System),由三个主要部分组成:

  1. 工作内存(Working Memory):包含当前状态信息
  2. 规则库(Rule Base):包含所有规则的集合
  3. 推理引擎(Inference Engine):负责匹配规则并执行相应操作

推理引擎通常采用两种主要策略:

  1. 前向链(Forward Chaining):从已知事实出发,应用规则得出新结论
  2. 后向链(Backward Chaining):从目标出发,反向查找支持该目标的规则

在AI Agent控制场景中,我们通常使用前向链推理,因为我们需要根据当前状态主动应用规则。

3.2.2 规则引擎架构

一个实用的规则引擎通常包含以下组件:

规则引擎

状态更新

请求规则应用

获取规则

读写

执行推理

读取

冲突规则

选定规则

执行结果

规则决策

更新

规则执行器

推理引擎

规则管理器

规则存储

工作内存

冲突解决器

AI Agent

让我们逐一介绍这些组件:

  1. 规则执行器(Rule Executor):作为引擎的入口点,协调整个规则应用过程。

  2. 推理引擎(Inference Engine):负责规则匹配和推理过程,实现前向链或后向链推理。

  3. 规则管理器(Rule Manager):管理规则的生命周期,包括添加、删除、更新规则,以及处理规则之间的依赖关系。

  4. 规则存储(Rule Store):持久化存储规则定义的地方。

  5. 工作内存(Working Memory):临时存储当前状态信息,供推理引擎使用。

  6. 冲突解决器(Conflict Resolver):当多个规则同时匹配时,决定应该优先应用哪个规则。

3.2.3 规则引擎实现代码

下面是一个简化版的规则引擎实现:

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Set, Union
import time
import re


class RulePriority(Enum):
    LOW = 0
    MEDIUM = 1
    HIGH = 2
    CRITICAL = 3


class RuleStatus(Enum):
    INACTIVE = "inactive"
    ACTIVE = "active"
    TRIGGERED = "triggered"
    EXECUTED = "executed"
    ERROR = "error"


class ConflictResolutionStrategy(Enum):
    FIRST_MATCH = "first_match"  # 选择第一个匹配的规则
    PRIORITY = "priority"  # 选择优先级最高的规则
    SPECIFICITY = "specificity"  # 选择最具体的规则
    RECENCY = "recency"  # 选择最近添加或修改的规则
    SALIENCE = "salience"  # 基于规则的显著性评分


@dataclass
class RuleContext:
    """规则执行上下文"""
    agent_id: str
    state: Dict[str, Any] = field(default_factory=dict)
    environment: Dict[str, Any] = field(default_factory=dict)
    action: Optional[Dict[str, Any]] = None
    history: List[Dict[str, Any]] = field(default_factory=list)
    timestamp: float = time.time()
    additional_data: Dict[str, Any] = field(default_factory=dict)


class Condition(ABC):
    """条件基类"""
    
    @abstractmethod
    def evaluate(self, context: RuleContext) -> bool:
        """评估条件是否满足"""
        pass
    
    @abstractmethod
    def get_specificity(self) -> int:
        """获取条件的具体度,用于冲突解决"""
        pass


class AlwaysTrueCondition(Condition):
    """始终为真的条件"""
    
    def evaluate(self, context: RuleContext) -> bool:
        return True
    
    def get_specificity(self) -> int:
        return 0


class AndCondition(Condition):
    """逻辑与条件"""
    
    def __init__(self, *conditions: Condition):
        self.conditions = conditions
    
    def evaluate(self, context: RuleContext) -> bool:
        return all(cond.evaluate(context) for cond in self.conditions)
    
    def get_specificity(self) -> int:
        return sum(cond.get_specificity() for cond in self.conditions)


class OrCondition(Condition):
    """逻辑或条件"""
    
    def __init__(self, *conditions: Condition):
        self.conditions = conditions
    
    def evaluate(self, context: RuleContext) -> bool:
        return any(cond.evaluate(context) for cond in self.conditions)
    
    def get_specificity(self) -> int:
        return max(cond.get_specificity() for cond in self.conditions) if self.conditions else 0


class NotCondition(Condition):
    """逻辑非条件"""
    
    def __init__(self, condition: Condition):
        self.condition = condition
    
    def evaluate(self, context: RuleContext) -> bool:
        return not self.condition.evaluate(context)
    
    def get_specificity(self) -> int:
        return self.condition.get_specificity()


class StateCondition(Condition):
    """状态条件,检查特定状态值"""
    
    def __init__(
        self, 
        key: str, 
        operator: str, 
        value: Any,
        path: Optional[str] = None  # 用于嵌套状态的点分隔路径,如 "user.profile.age"
    ):
        self.key = key
        self.operator = operator
        self.value = value
        self.path = path
    
    def _get_value(self, context: RuleContext) -> Any:
        """从上下文中获取值"""
        if self.path:
            # 处理嵌套路径
            parts = self.path.split('.')
            current = context.state
            for part in parts:
                if isinstance(current, dict) and part in current:
                    current = current[part]
                else:
                    return None
            return current
        else:
            return context.state.get(self.key)
    
    def evaluate(self, context: RuleContext) -> bool:
        actual_value = self._get_value(context)
        
        if actual_value is None:
            return False
        
        operators = {
            "==": lambda a, b: a == b,
            "!=": lambda a, b: a != b,
            ">": lambda a, b: a > b,
            ">=": lambda a, b: a >= b,
            "<": lambda a, b: a < b,
            "<=": lambda a, b: a <= b,
            "in": lambda a, b: a in b,
            "not_in": lambda a, b: a not in b,
            "contains": lambda a, b: b in a,
            "not_contains": lambda a, b: b not in a,
            "matches": lambda a, b: bool(re.match(b, str(a))),
            "is_type": lambda a, b: isinstance(a, b),
        }
        
        if self.operator not in operators:
            raise ValueError(f"未知操作符: {self.operator}")
        
        try:
            return operators[self.operator](actual_value, self.value)
        except Exception:
            return False
    
    def get_specificity(self) -> int:
        # 状态条件比简单条件更具体
        return 10


class ActionCondition(Condition):
    """行动条件,检查AI代理计划执行的行动"""
    
    def __init__(
        self, 
        action_type: Optional[str] = None,
        action_properties: Optional[Dict[str, Any]] = None
    ):
        self.action_type = action_type
        self.action_properties = action_properties
    
    def evaluate(self, context: RuleContext) -> bool:
        if context.action is None:
            return False
        
        # 检查行动类型
        if self.action_type is not None and context.action.get("action_type") != self.action_type:
            return False
        
        # 检查行动属性
        if self.action_properties:
            for key, expected_value in self.action_properties.items():
                actual_value = context.action.get(key)
                if actual_value != expected_value:
                    return False
        
        return True
    
    def get_specificity(self) -> int:
        # 行动条件的具体度取决于指定的属性数量
        specificity = 5  # 基础具体度
        if self.action_type:
            specificity += 3
        if self.action_properties:
            specificity += len(self.action_properties) * 2
        return specificity


class TimeCondition(Condition):
    """时间条件,基于时间因素触发"""
    
    def __init__(
        self, 
        start_hour: Optional[int] = None,
        end_hour: Optional[int] = None,
        days_of_week: Optional[List[int]] = None,  # 0=周一, 6=周日
        specific_times: Optional[List[float]] = None  # 一天中的具体时间,单位为小时
    ):
        self.start_hour = start_hour
        self.end_hour = end_hour
        self.days_of_week = days_of_week
        self.specific_times = specific_times
    
    def evaluate(self, context: RuleContext) -> bool:
        t = time.localtime(context.timestamp)
        hour = t.tm_hour
        minute = t.tm_min
        day_of_week = t.tm_wday  # 0=周一, 6=周日
        current_time = hour + minute / 60.0
        
        # 检查时间段
        if self.start_hour is not None and self.end_hour is not None:
            if not (self.start_hour <= hour < self.end_hour):
                return False
        
        # 检查星期几
        if self.days_of_week is not None and day_of_week not in self.days_of_week:
            return False
        
        # 检查具体时间
        if self.specific_times is not None:
            # 检查是否接近任何一个特定时间(10分钟内)
            near_specific_time = any(
                abs(current_time - specific_time) < 10/60.0 
                for specific_time in self.specific_times
            )
            if not near_specific_time:
                return False
        
        return True
    
    def get_specificity(self) -> int:
        specificity = 3  # 基础具体度
        if self.start_hour is not None and self.end_hour is not None:
            specificity += 3
        if self.days_of_week:
            specificity += 2
        if self.specific_times:
            specificity += 4
        return specificity


class CustomCondition(Condition):
    """自定义条件,使用回调函数"""
    
    def __init__(
        self, 
        evaluator: Callable[[RuleContext], bool],
        specificity: int = 5
    ):
        self.evaluator = evaluator
        self._specificity = specificity
    
    def evaluate(self, context: RuleContext) -> bool:
        return self.evaluator(context)
    
    def get_specificity(self) -> int:
        return self._specificity


class Action(ABC):
    """行动基类"""
    
    @abstractmethod
    def execute(self, context: RuleContext) -> Dict[str, Any]:
        """执行行动并返回结果"""
        pass


class ModifyStateAction(Action):
    """修改状态的行动"""
    
    def __init__(self, updates: Dict[str, Any]):
        self.updates = updates
    
    def execute(self, context: RuleContext) -> Dict[str, Any]:
        # 在实际系统中,这里可能会直接修改上下文或返回修改
        return {
            "action": "modify_state",
            "updates": self.updates,
            "success": True
        }


class RejectAction(Action):
    """拒绝当前请求的行动"""
    
    def __init__(self, reason: str = ""):
        self.reason = reason
    
    def execute(self, context: RuleContext) -> Dict[str, Any]:
        return {
            "action": "reject",
            "reason": self.reason,
            "success": True
        }


class ModifyRequestAction(Action):
    """修改请求的行动"""
    
    def __init__(self, modifications: Dict[str, Any]):
        self.modifications = modifications
    
    def execute(self, context: RuleContext) -> Dict[str, Any]:
        return {
            "action": "modify_request",
            "modifications": self.modifications,
            "success": True
        }


class LogAction(Action):
    """记录日志的行动"""
    
    def __init__(self, message: str, level: str = "info"):
        self.message = message
        self.level = level
    
    def execute(self, context: RuleContext) -> Dict[str, Any]:
        # 在实际系统中,这里会写入日志系统
        print(f"[{self.level.upper()}] {self.message}")
        return {
            "action": "log",
            "message": self.message,
            "level": self.level,
            "success": True
        }


class NotifyAction(Action):
    """发送通知的行动"""
    
    def __init__(self, recipient: str, message: str):
        self.recipient = recipient
        self.message = message
    
    def execute(self, context: RuleContext) -> Dict[str, Any]:
        # 在实际系统中,这里会发送真实通知
        print(f"通知 {self.recipient}: {self.message}")
        return {
            "action": "notify",
            "recipient": self.recipient,
            "message": self.message,
            "success": True
        }


class CustomAction(Action):
    """自定义行动,使用回调函数"""
    
    def __init__(self, executor: Callable[[RuleContext], Dict[str, Any]]):
        self.executor = executor
    
    def execute(self, context: RuleContext) -> Dict[str, Any]:
        return self.executor(context)


@dataclass
class Rule:
    """规则类"""
    id: str
    name: str
    condition: Condition
    actions: List[Action]
    priority: RulePriority = RulePriority.MEDIUM
    status: RuleStatus = RuleStatus.ACTIVE
    description: str = ""
    salience: int = 0  # 显著性评分,用于冲突解决
    created_at: float = field(default_factory=time.time)
    updated_at: float = field(default_factory=time.time)
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def get_specificity(self) -> int:
        """获取规则的具体度"""
        return self.condition.get_specificity()
    
    def evaluate(self, context: RuleContext) -> bool:
        """评估规则条件是否满足"""
        if self.status != RuleStatus.ACTIVE:
            return False
        return self.condition.evaluate(context)
    
    def execute(self, context: RuleContext) -> List[Dict[str, Any]]:
        """执行规则的所有行动"""
        results = []
        for action in self.actions:
            try:
                result = action.execute(context)
                results.append(result)
            except Exception as e:
                results.append({
                    "action": str(action),
                    "error": str(e),
                    "success": False
                })
        return results


class WorkingMemory:
    """工作内存,存储当前状态和事实"""
    
    def __init__(self):
        self._facts: Dict[str, Any] = {}
        self._history: List[Dict[str, Any]] = []
    
    def set(self, key: str, value: Any) -> None:
        """设置事实"""
        self._facts[key] = value
    
    def get(self, key: str, default: Any = None) -> Any:
        """获取事实"""
        return self._facts.get(key, default)
    
    def delete(self, key: str) -> bool:
        """删除事实"""
        if key in self._facts:
            del self._facts[key]
            return True
        return False
    
    def get_all(self) -> Dict[str, Any]:
        """
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐