WeClaw 主动陪伴引擎实战:五大组件如何协力决策"何时"和"说什么"

系列文章第 21 篇 - 从事件驱动到 asyncio 非阻塞调度,揭秘 AI 主动关怀背后的决策引擎


📚 专栏信息

《从零到一构建跨平台 AI 助手:WeClaw 实战指南》专栏

专栏定位:面向开发者和技术决策者的实战专栏,用真实案例和完整代码带你理解如何构建生产级 AI 应用

本系列共 25 篇,分为八大模块

📖 模块一【通讯架构设计】(3 篇):混合通讯、设备绑定、请求路由
🔧 模块二【核心技术实现】(4 篇):WebSocket 路由、心跳重连、离线队列
🛡️ 模块三【安全与治理】(3 篇):密钥管理、Token 吊销、速率限制
🔍 模块四【调试与监控】(2 篇):全链路追踪、日志分析
💡 模块五【问题诊断实战】(3 篇):典型问题排查与修复
⚙️ 模块六【性能优化】(1 篇):启动速度、内存优化
🤖 模块七【主动陪伴系统】(3 篇):决策引擎、防骚扰机制、渐进式建档
🚀 模块八【架构演进史】(1 篇):从 0 到 1 的完整历程

  • 模块定位:主动陪伴系统 · 第 1 篇(共 3 篇)
  • 前置知识:了解 EventBus 发布-订阅模式、asyncio 基础
  • 关联文章:第 22 篇(防骚扰冷却机制)、第 23 篇(渐进式建档)

👨‍💻 作者与项目

作者简介:翁勇刚 WENG YONGGANG
新概念龙虾-WeClaw 开发团队负责人,一群专注于跨平台 AI 应用的实践者
理念:“让 AI 不只是被动工具,更是主动关怀用户的生活伙伴”

  • 💻 项目地址:https://github.com/wyg5208/weclaw.git
  • 🌐 官网地址:https://weclaw.link
  • 📝 作者 CSDN:https://blog.csdn.net/yweng18
  • 📦 PyPI:[待发布]
  • ⭐ 欢迎 Star⭐、Fork🍴、贡献代码🤝

在这里插入图片描述

📝 摘要

本文结构概览
本文首先从三个并发关怀请求的真实场景出发,分析为什么需要事件驱动的主动陪伴架构;然后用"餐厅后厨"比喻讲解五大组件(CooldownManager、MoodDetector、OpportunityDetector、InteractionOrchestrator、CompanionEngine)的协作原理;接着通过 1400+ 行核心代码详解七层评分系统和四个时机检测器的实现;随后还原一次"三请求只执行一个"的问题排查过程;最后给出评分缓存和事件优先级等性能优化策略。

背景:在构建 WeClaw 跨平台 AI 助手时,我们面临一个关键挑战:如何让 AI 主动关怀用户,而不是只被动等待用户提问?传统的定时轮询方案要么骚扰用户,要么错过最佳时机。

核心问题:如何设计一套决策系统,能够智能判断"何时"发起关怀(时机检测)、"说什么"关怀内容(主题选择)、同时避免过度骚扰(冷却控制)?

解决方案:采用五大组件协作的事件驱动架构。CooldownManager 控制频率,MoodDetector 感知情绪,OpportunityDetector 捕捉时机,InteractionOrchestrator 编排执行,CompanionEngine 统一调度决策。

关键成果

  • 支持 15+ 种关怀主题(早安问候、心情关怀、生日提醒等)
  • 七层评分系统实现精准的主动关怀时机判断
  • 防骚扰机制:每日配额 + 拒绝惩罚 + 连续忽略检测
  • 事件驱动:工具调用自动触发上下文关怀(旅行意图 → 攻略建议)

适合读者:有 Python 基础,对 AI 助手设计、事件驱动架构、asyncio 异步编程感兴趣的开发者

阅读时长:约 20 分钟

关键词CompanionEngine事件驱动asyncio主动关怀评分系统防骚扰EventBus


一、为什么需要"事件驱动"的主动陪伴?——从三个并发请求说起

1.1 场景重现:当三种关怀同时到来

想象这个早晨场景:

  • 8:00 AM:系统检测到"早安问候"的定时触发条件满足
  • 8:01 AM:用户刚才的消息里提到"昨晚没睡好,有点累",MoodDetector 识别出疲惫情绪
  • 8:02 AM:健康工具记录到血压 145/95,超过警戒线

三个关怀请求几乎同时到来:

┌─────────────────────────────────────────────────────────────┐
│  [08:00] 定时触发 → 早安问候                                  │
│  [08:01] 情绪检测 → 心情关怀(疲惫安慰)                        │
│  [08:02] 健康警报 → 就医建议(血压偏高)                        │
└─────────────────────────────────────────────────────────────┘

问题来了:AI 应该说什么?全说?选一个?怎么选?

如果全部发送,用户会被连续三条消息轰炸;如果只选一个,选哪个最合适?

1.2 方案对比:定时轮询 vs 硬性规则 vs 事件驱动

让我们看看三种主动关怀方案的优缺点:

方案 像什么?(比喻) 优点 缺点
定时轮询 闹钟按时响铃 实现简单,可预测 不考虑用户状态,容易骚扰;错过突发时机
硬性规则 红绿灯固定切换 逻辑清晰,易调试 缺乏灵活性;规则爆炸(组合情况太多)
事件驱动 餐厅服务员察言观色 响应及时,上下文感知 架构复杂;需要精心设计事件优先级
# ❌ 错误示范:定时轮询,不管用户状态
class BadTimerApproach:
    def __init__(self):
        # 每小时检查一次,不管用户在干什么
        schedule.every().hour.do(self.send_care)
    
    def send_care(self):
        # 问题 1:用户可能正在忙碌会议
        # 问题 2:连续发送会造成骚扰
        # 问题 3:错过"用户刚提到累了"这种即时时机
        self.send("今天过得怎么样?")
# ✅ 正确做法:事件驱动 + 多维度评分
class GoodEventDrivenApproach:
    async def on_event(self, event_type: str, data: Any):
        """响应各类事件,动态决策"""
        # 1. 情绪变化事件 → 可能需要心情关怀
        # 2. 工具调用事件 → 可能需要上下文关怀
        # 3. 定时事件 → 兜底的日常关怀
        
        # 计算此刻最适合的关怀主题
        best_topic = self.calculate_best_topic()
        
        # 检查是否满足触发条件
        if self.can_interact() and best_topic.score > threshold:
            await self.initiate_care(best_topic)

1.3 核心挑战:五组件协作的难题

要实现智能的主动陪伴,我们需要解决五个核心问题:

  1. 时机判断:何时发起关怀?(OpportunityDetector)
  2. 频率控制:如何避免骚扰?(CooldownManager)
  3. 情绪感知:用户当前心情如何?(MoodDetector)
  4. 执行编排:关怀消息怎么构建和发送?(InteractionOrchestrator)
  5. 统一调度:多个组件如何协同工作?(CompanionEngine)

这就引出了我们的五大组件架构——让五个"专家"各司其职,协力决策。


二、核心概念解析 —— 用"餐厅后厨"理解五大组件

2.1 什么是五大组件架构?

官方定义

五大组件架构是 WeClaw 主动陪伴系统的核心设计模式,通过职责分离将复杂的决策逻辑拆解为五个独立组件,各组件通过 EventBus 通信,由 CompanionEngine 统一调度。

大白话解释
想象一家高档餐厅的后厨,每个关怀消息就像一道菜。要把"菜"端到用户面前,需要五个角色配合:

┌─────────────────────────────────────────────────────────────────────┐
│                        餐厅后厨比喻                                  │
├─────────────────────────────────────────────────────────────────────┤
│  CooldownManager     = 厨师长      控制出菜节奏,防止上太快撑着客人     │
│  MoodDetector        = 食评家      观察客人表情,判断口味偏好           │
│  OpportunityDetector = 服务员      听到客人聊天内容,发现推荐时机       │
│  InteractionOrchestrator = 传菜员  把菜装盘、端上桌、记录反馈           │
│  CompanionEngine     = 餐厅经理    统筹全局,决定今天推什么特色菜       │
└─────────────────────────────────────────────────────────────────────┘

2.2 工作原理流程图:从事件到关怀的完整链路

                          ┌──────────────────┐
                          │  外部事件输入     │
                          │ (用户输入/工具调用/定时器) │
                          └────────┬─────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────┐
│                         EventBus 事件总线                            │
│    ┌─────────────┐  ┌─────────────┐  ┌─────────────┐               │
│    │ USER_INPUT  │  │ TOOL_CALL   │  │ CRON_TRIGGER│               │
│    └──────┬──────┘  └──────┬──────┘  └──────┬──────┘               │
└───────────┼────────────────┼────────────────┼───────────────────────┘
            │                │                │
            ▼                ▼                ▼
┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐
│   MoodDetector    │ │ OpportunityDetector│ │  CompanionEngine  │
│  (情绪检测)        │ │  (时机检测)         │ │  (定时调度)        │
│  ┌─────────────┐  │ │  ┌─────────────┐  │ │  ┌─────────────┐  │
│  │ 关键词匹配   │  │ │  │ 旅行意图检测 │  │ │  │ 30分钟轮询  │  │
│  │ 情绪得分计算 │  │ │  │ 大额支出检测 │  │ │  │ 时段主题匹配 │  │
│  └─────────────┘  │ │  │ 健康警报检测 │  │ │  └─────────────┘  │
│                   │ │  └─────────────┘  │ │                   │
└─────────┬─────────┘ └─────────┬─────────┘ └─────────┬─────────┘
          │                     │                     │
          │   mood_adjustment   │  contextual_care    │  cron_trigger
          │                     │                     │
          └──────────────────┐  │  ┌──────────────────┘
                             │  │  │
                             ▼  ▼  ▼
              ┌────────────────────────────────────┐
              │         CompanionEngine            │
              │     calculate_interaction_score()  │
              │  ┌──────────────────────────────┐  │
              │  │       七层评分系统            │  │
              │  │  1. 基础分 (priority * 10)   │  │
              │  │  2. 时段匹配 (+0~20)         │  │
              │  │  3. 冷却期检查 (0/pass)      │  │
              │  │  4. 紧急度加成 (+0~30)       │  │
              │  │  5. 特殊紧急 (+0~30)         │  │
              │  │  6. 用户空闲度 (+/-20)       │  │
              │  │  7. 情绪调节 (+/-15)         │  │
              │  └──────────────────────────────┘  │
              │                 │                  │
              │         score > 30?                │
              │            │    │                  │
              │         YES│    │NO                │
              │            ▼    ▼                  │
              └────────────┬────┬──────────────────┘
                           │    │
                      触发  │    │ 跳过
                           ▼    
              ┌────────────────────────────────────┐
              │    InteractionOrchestrator         │
              │         initiate_care()            │
              │  ┌──────────────────────────────┐  │
              │  │ 1. 获取交互锁                 │  │
              │  │ 2. 冷却检查                   │  │
              │  │ 3. 用户忙碌检查               │  │
              │  │ 4. 构建消息                   │  │
              │  │ 5. EventBus.emit()           │  │
              │  │ 6. 记录交互                   │  │
              │  │ 7. 释放锁                     │  │
              │  └──────────────────────────────┘  │
              └────────────────────────────────────┘
                               │
                               ▼
              ┌────────────────────────────────────┐
              │       CooldownManager              │
              │  ┌──────────────────────────────┐  │
              │  │ • 每日配额检查 (5次/天)       │  │
              │  │ • 拒绝惩罚期 (4小时)          │  │
              │  │ • 连续忽略计数 (≤2次)         │  │
              │  │ • 交互锁 (同时只允许1个)      │  │
              │  └──────────────────────────────┘  │
              └────────────────────────────────────┘
                               │
                               ▼
              ┌────────────────────────────────────┐
              │           UI 层响应                 │
              │   COMPANION_CARE_TRIGGERED 事件    │
              └────────────────────────────────────┘

关键步骤解读

  1. 事件捕获:EventBus 捕获用户输入、工具调用、定时触发等事件
  2. 并行处理:MoodDetector 分析情绪,OpportunityDetector 检测时机
  3. 评分决策:CompanionEngine 综合七层因素计算执行分数
  4. 执行编排:分数超过阈值后,InteractionOrchestrator 执行完整的关怀流程
  5. 冷却控制:CooldownManager 全程参与,确保不骚扰用户

2.3 对比表:单体架构 vs 五组件架构

维度 单体架构 五组件架构 区别说明
可维护性 差(God Class) 好(单一职责) 五组件各管一摊,改 MoodDetector 不影响 CooldownManager
可测试性 难(高耦合) 易(可独立测试) 可单独 Mock EventBus 测试 OpportunityDetector
扩展性 低(改一处动全身) 高(插件式扩展) 新增"天气关怀"只需注册新 CareTopic
调试复杂度 简单(单入口) 中等(需跟踪事件) 通过日志追踪 topic_id 可还原完整链路
性能开销 略高(事件分发) EventBus 分发开销 < 1ms,可忽略

为什么选择五组件架构?

因为主动陪伴系统的复杂性在于多维度决策——时机、情绪、频率、内容缺一不可。单体架构会导致 if-else 爆炸,而五组件架构让每个维度的逻辑内聚、易于演进。


三、实战代码详解 —— 核心实现逐行解析

3.1 数据结构:CareTopic 与 CareTopicRegistry

首先看关怀主题的数据结构定义:

# src/core/companion_topics.py

@dataclass
class CareTopic:
    """
    关怀主题数据类
    
    定义单个关怀主题的所有属性,包括触发条件、时间约束和对话模板。
    
    Attributes:
        topic_id: 唯一标识符
        name: 显示名称
        category: 主题分类 (health/finance/social/family/emotional/lifestyle)
        priority: 优先级 (1-10, 10为最高)
        min_interval_hours: 最小触发间隔(小时)
        best_time_slots: 最佳触发时段列表 ["morning", "afternoon", "evening"]
        prompt_template: 对话模板,支持 {变量} 替换
        requires_profile: 依赖的用户档案字段列表
        enabled: 是否启用
    """
    topic_id: str
    name: str
    category: str
    priority: int                                       # 关键:1-10 的优先级
    min_interval_hours: int                             # 关键:冷却期时长
    best_time_slots: list[str] = field(default_factory=list)
    prompt_template: str = ""
    requires_profile: list[str] = field(default_factory=list)
    enabled: bool = True

字段说明

  • priority: 优先级决定基础分数,生日提醒(9) > 心情关怀(7) > 早安问候(5)
  • min_interval_hours: 最小间隔,防止同一主题重复触发(早安=24h,心情=12h)
  • best_time_slots: 最佳时段,早安只在 morning,心情在 afternoon/evening

预定义主题示例

# src/core/companion_topics.py

class CareTopicRegistry:
    """关怀主题注册表"""
    
    def _register_defaults(self) -> None:
        """注册所有预定义的关怀主题"""
        default_topics = [
            # 日常关怀主题
            CareTopic(
                topic_id="morning_greeting",
                name="早安问候",
                category="lifestyle",
                priority=5,
                min_interval_hours=24,
                best_time_slots=["morning"],
                prompt_template="早上好呀!今天{weather_hint},{schedule_hint}有什么计划吗?"
            ),
            CareTopic(
                topic_id="mood_check",
                name="心情关怀",
                category="emotional",
                priority=7,                              # 关键:情感类优先级较高
                min_interval_hours=12,
                best_time_slots=["afternoon", "evening"],
                prompt_template="今天过得怎么样呀?最近心情如何?"
            ),
            CareTopic(
                topic_id="birthday_remind",
                name="生日提醒",
                category="social",
                priority=9,                              # 关键:最高优先级
                min_interval_hours=8760,                 # 一年才触发一次
                best_time_slots=["morning"],
                prompt_template="{person_name}的生日就在{days_left}天后了!要不要准备一下?",
                requires_profile=["social_contacts"]
            ),
            CareTopic(
                topic_id="medical_remind",
                name="就医提醒",
                category="health",
                priority=8,                              # 关键:健康警报优先级高
                min_interval_hours=168,
                best_time_slots=["morning"],
                prompt_template="注意到你最近的{metric_name}数据偏高,建议找时间去医院检查一下比较放心",
                requires_profile=["health_data"]
            ),
            # ... 更多主题
        ]
        
        for topic in default_topics:
            self.register(topic)

设计亮点

  1. 数据驱动:新增关怀主题只需添加 CareTopic 实例,无需改代码逻辑
  2. 模板变量{weather_hint}{person_name} 支持个性化消息
  3. 依赖声明requires_profile 声明所需用户数据,缺失时可触发建档

3.2 核心方法 1:calculate_interaction_score() — 七层评分系统

这是 CompanionEngine 的核心决策方法,决定某个主题此刻是否应该触发:

# src/core/companion_engine.py

def calculate_interaction_score(self, topic: CareTopic) -> float:
    """计算主题此刻的执行分数。
    
    综合考虑优先级、时段匹配度、冷却期、紧急度、用户状态等因素。
    
    Args:
        topic: 关怀主题
        
    Returns:
        执行分数 (0-100),越高越应该执行
    """
    # ========== 第 1 层:基础分数 ==========
    # 优先级 * 10,范围 10-100
    score = topic.priority * 10
    
    # ========== 第 2 层:时段匹配度 +0~20 ==========
    current_slot = get_time_slot()  # "morning" / "afternoon" / "evening"
    if current_slot in topic.best_time_slots:
        score += 20                  # 关键:在最佳时段加 20 分
    elif topic.best_time_slots:
        score += 5                   # 不在最佳时段,但仍可执行
    
    # ========== 第 3 层:冷却期硬性检查 ==========
    hours = self._cooldown.hours_since(topic.topic_id)
    if hours < topic.min_interval_hours:
        logger.debug("主题 %s 仍在冷却期 (%.1f < %d 小时)", 
                    topic.topic_id, hours, topic.min_interval_hours)
        return 0                     # 关键:冷却期内直接返回 0 分
    
    # ========== 第 4 层:紧急度加成 +0~30 ==========
    # 基于距离冷却期结束的程度,越久没触发越紧急
    cooldown_ratio = min(hours / topic.min_interval_hours, 2.0)
    urgency_bonus = min((cooldown_ratio - 1) * 15, 30)
    score += urgency_bonus
    
    # ========== 第 5 层:特殊紧急情况 +0~30 ==========
    # 如生日临近
    if topic.topic_id == "birthday_remind":
        context = self._build_context(topic)
        days_left = context.get("days_left_raw", 999)
        if days_left <= 3:
            score += 30              # 关键:生日 3 天内加 30 分
        elif days_left <= 7:
            score += 15
    
    # ========== 第 6 层:用户空闲度 +/-0~20 ==========
    if self._orchestrator._is_user_idle_for(5):
        score += 10                  # 空闲 5 分钟以上
    if self._orchestrator._is_user_idle_for(15):
        score += 10                  # 空闲 15 分钟以上
    if self._orchestrator._is_user_busy():
        score -= 20                  # 关键:用户正忙减 20 分
    
    # ========== 第 7 层:情绪调节 +/-0~15 ==========
    mood_adjustment = self._mood_detector.get_mood_adjusted_topic_score(
        topic, self._current_mood
    )
    score += mood_adjustment
    
    # ========== 最终检查:每日预算 ==========
    if not self._cooldown.can_interact():
        logger.debug("每日预算已用完或处于冷却期")
        return 0
    
    return max(0, min(100, score))   # 注意:限制在 0-100 范围

评分公式可视化

最终分数 = 基础分(priority×10)
         + 时段匹配(+0~20)
         + 紧急度(+0~30)
         + 特殊紧急(+0~30)
         + 用户空闲度(+/-20)
         + 情绪调节(+/-15)
         - 冷却期内则直接归零
         - 每日预算用完则直接归零

示例:早安问候在 8:00 AM
- 基础分:5 × 10 = 50
- 时段匹配:morning ∈ ["morning"] → +20
- 紧急度:假设 26 小时未触发,ratio=1.08 → +1.2
- 用户空闲:空闲 10 分钟 → +10
- 情绪:中性 → +0
= 50 + 20 + 1.2 + 10 = 81.2 分 ✅ 超过阈值 30,触发!

3.3 核心方法 2:OpportunityDetector 的四个检测器

OpportunityDetector 通过监听 EventBus 的 TOOL_CALL 事件,从用户行为中发现关怀时机:

# src/core/companion_engine.py

class OpportunityDetector:
    """通过 EventBus 监听用户行为,检测关怀时机。"""
    
    def _setup_listeners(self) -> None:
        """设置事件监听器。"""
        # 关键:监听工具调用事件,优先级 200(较高)
        self._event_bus.on(
            EventType.TOOL_CALL,
            self._on_tool_call,
            priority=200
        )
        # 监听 Agent 响应事件
        self._event_bus.on(
            EventType.AGENT_RESPONSE,
            self._on_agent_response,
            priority=200
        )
    
    async def _on_tool_call(self, event_type: str, data: Any) -> None:
        """处理工具调用事件。"""
        if self._engine is None:
            return
        
        # 安全获取工具名和参数
        tool_name = getattr(data, "tool_name", None) or (
            data.get("tool_name") if isinstance(data, dict) else None
        )
        arguments = getattr(data, "arguments", {}) or (
            data.get("arguments", {}) if isinstance(data, dict) else {}
        )
        
        if not tool_name:
            return
        
        logger.debug("检测到工具调用: %s", tool_name)
        
        # 关键:并行检查四种关怀时机
        try:
            await self._check_inference_rules(tool_name, arguments)
            await self._check_travel_intent(tool_name, arguments)
            await self._check_budget_alert(tool_name, arguments)
            await self._check_health_concern(tool_name, arguments)
        except Exception as e:
            logger.error("时机检测异常: %s", e)
检测器 1:旅行意图检测
# src/core/companion_engine.py

async def _check_travel_intent(self, tool_name: str, arguments: dict) -> None:
    """检查旅行意图 → 建议旅游攻略。"""
    if tool_name != "search":
        return
    
    query = str(arguments.get("query", "")).lower()
    travel_keywords = ["机票", "酒店", "旅游", "景点", "攻略", "订票", "住宿"]
    
    # 关键:关键词匹配检测旅行意图
    if any(kw in query for kw in travel_keywords):
        logger.info("检测到旅行意图: %s", query)
        await self._engine.suggest_contextual_care(
            "suggest_travel_guide",
            {"query": query, "detected_keywords": [kw for kw in travel_keywords if kw in query]}
        )
检测器 2:大额支出检测
# src/core/companion_engine.py

async def _check_budget_alert(self, tool_name: str, arguments: dict) -> None:
    """检查大额支出 → 预算提醒。"""
    if tool_name != "finance":
        return
    
    amount = arguments.get("amount", 0)
    try:
        amount = float(amount)
    except (TypeError, ValueError):
        return
    
    # 关键:大额支出阈值(可配置)
    threshold = 1000
    if amount >= threshold:
        logger.info("检测到大额支出: %.2f", amount)
        await self._engine.suggest_contextual_care(
            "budget_alert",
            {"amount": amount, "threshold": threshold}
        )
检测器 3:健康警报检测
# src/core/companion_engine.py

async def _check_health_concern(self, tool_name: str, arguments: dict) -> None:
    """检查异常健康指标 → 就医建议。"""
    if tool_name != "health":
        return
    
    # 检查血压
    bp_systolic = arguments.get("bp_systolic")
    if bp_systolic is not None:
        try:
            bp_sys = int(bp_systolic)
            if bp_sys >= 140:                           # 关键:高血压阈值
                logger.info("检测到高血压: %d", bp_sys)
                await self._engine.suggest_contextual_care(
                    "health_alert",
                    {"metric_name": "血压", "value": bp_sys, "threshold": 140}
                )
                return
        except (TypeError, ValueError):
            pass
    
    # 检查血糖
    blood_glucose = arguments.get("blood_glucose")
    if blood_glucose is not None:
        try:
            glucose = float(blood_glucose)
            if glucose >= 7.0:                          # 关键:高血糖阈值
                logger.info("检测到高血糖: %.1f", glucose)
                await self._engine.suggest_contextual_care(
                    "health_alert",
                    {"metric_name": "血糖", "value": glucose, "threshold": 7.0}
                )
        except (TypeError, ValueError):
            pass
检测器 4:通用推断规则检测
# src/core/companion_engine.py

async def _check_inference_rules(self, tool_name: str, arguments: dict) -> None:
    """检查通用推断规则。"""
    if self._engine is None:
        return
    
    # 将参数转为字符串进行关键词匹配
    args_text = " ".join(str(v) for v in arguments.values() if v)
    
    for rule in INFERENCE_RULES:
        if rule.tool != tool_name:
            continue
        
        # 关键:检查关键词匹配
        matched_keywords = [kw for kw in rule.keyword_match if kw in args_text]
        if not matched_keywords:
            continue
        
        logger.info("推断规则匹配: tool=%s, keywords=%s, infer=%s", 
                   tool_name, matched_keywords, rule.infer)
        
        # 如果有关联动作,触发上下文关怀
        if rule.action:
            await self._engine.suggest_contextual_care(
                rule.action,
                {
                    "inferred": rule.infer,
                    "confidence": rule.confidence,
                    "matched_keywords": matched_keywords,
                }
            )

推断规则配置示例

# src/core/companion_topics.py

INFERENCE_RULES: list[InferenceRule] = [
    InferenceRule(
        tool="finance",
        keyword_match=["幼儿园", "学费", "奶粉", "尿布", "童装", "儿童"],
        infer={"has_children": "true"},
        confidence=0.8
    ),
    InferenceRule(
        tool="search",
        keyword_match=["机票", "酒店", "旅游", "景点", "攻略"],
        infer={"travel_intent": "true"},
        confidence=0.7,
        action="suggest_travel_guide"  # 关键:触发旅行攻略建议
    ),
    InferenceRule(
        tool="health",
        keyword_match=["高血压", "血压偏高"],
        infer={"hypertension_risk": "true"},
        confidence=0.9,
        action="schedule_bp_monitoring"  # 关键:触发血压监测提醒
    ),
]

3.4 核心方法 3:InteractionOrchestrator.initiate_care() — 交互完整生命周期

# src/core/companion_engine.py

class InteractionOrchestrator:
    """管理主动交互的执行流程。"""
    
    async def initiate_care(self, topic: CareTopic, context: dict[str, Any]) -> dict[str, Any]:
        """发起一次主动关怀。
        
        Args:
            topic: 关怀主题
            context: 上下文数据
            
        Returns:
            结果记录:
            - topic_id: 主题 ID
            - message: 消息内容
            - outcome: 结果 (triggered/deferred/blocked)
        """
        result = {
            "topic_id": topic.topic_id,
            "message": "",
            "outcome": "blocked",
        }
        
        # ========== 步骤 1:尝试获取交互锁 ==========
        # 关键:同一时间只允许一个主动交互
        if not await self._cooldown.acquire_interaction_lock(topic.topic_id):
            logger.debug("无法获取交互锁,跳过: %s", topic.topic_id)
            result["outcome"] = "blocked"
            return result
        
        try:
            # ========== 步骤 2:冷却检查 ==========
            if not self._cooldown.can_interact():
                logger.debug("冷却检查未通过,跳过: %s", topic.topic_id)
                result["outcome"] = "blocked"
                return result
            
            # ========== 步骤 3:用户忙碌检查 ==========
            # 30 秒内有输入认为用户正忙
            if self._is_user_busy():
                logger.debug("用户正忙,延迟关怀: %s", topic.topic_id)
                result["outcome"] = "deferred"
                return result
            
            # ========== 步骤 4:构建消息 ==========
            message = self._build_message(topic, context)
            result["message"] = message
            
            # ========== 步骤 5:通过 EventBus 发布关怀触发事件 ==========
            # UI 层订阅此事件来显示关怀消息
            await self._event_bus.emit(
                self.COMPANION_CARE_TRIGGERED,
                {
                    "topic_id": topic.topic_id,
                    "topic_name": topic.name,
                    "category": topic.category,
                    "message": message,
                    "priority": topic.priority,
                    "timestamp": datetime.now().isoformat(),
                }
            )
            
            # ========== 步骤 6:记录交互 ==========
            self._cooldown.record_interaction(topic.topic_id, "triggered")
            result["outcome"] = "triggered"
            
            logger.info("主动关怀已触发: %s - %s", topic.topic_id, message[:50])
            
        finally:
            # ========== 步骤 7:释放锁 ==========
            # 关键:无论成功失败,都要释放锁
            self._cooldown.release_interaction_lock()
        
        return result

3.5 易错点与最佳实践

易错点 1:asyncio.Lock 不可重入
# ❌ 错误示范:在已持有锁的情况下再次 acquire
async def bad_nested_lock():
    await lock.acquire()
    # ... 一些逻辑 ...
    await lock.acquire()  # 死锁!asyncio.Lock 不可重入
    lock.release()
    lock.release()

# ✅ 正确做法:使用 locked() 检查或重构逻辑
async def good_lock_check():
    if lock.locked():
        logger.debug("锁已被占用,跳过")
        return False
    
    await lock.acquire()
    try:
        # 业务逻辑
        pass
    finally:
        lock.release()
    return True
易错点 2:EventBus 优先级设置
# ⚠️ 注意:优先级数值越大越先执行
self._event_bus.on(
    EventType.TOOL_CALL,
    self._on_tool_call,
    priority=200  # 关键:设置较高优先级,确保时机检测在其他处理之前
)

# 优先级参考:
# 200+ : 系统级监控(时机检测、审计日志)
# 100  : 核心业务处理
# 0    : 默认优先级
# -100 : 后处理(清理、统计)
最佳实践清单
  1. 锁的使用:始终用 try-finally 确保锁释放
  2. 事件优先级:监控类订阅者用高优先级(200+)
  3. 冷却期设计:不同主题设置不同间隔(早安 24h,心情 12h)
  4. 日志追踪:每个关键步骤打印 topic_id,便于链路追踪

四、问题诊断与修复 —— 从"三请求只执行一个"到完美并发

4.1 问题现象:请求被"吞掉"

用户报告

“早上同时触发了早安问候、心情关怀、健康提醒三个请求,但只看到了一条消息。其他两条去哪了?”

服务器日志

2026-03-22 08:00:01 | companion | INFO | 定时触发评分: morning_greeting = 81.2
2026-03-22 08:00:01 | companion | INFO | 主动关怀已触发: morning_greeting
2026-03-22 08:01:02 | companion | DEBUG | 无法获取交互锁,跳过: mood_check
2026-03-22 08:02:03 | companion | DEBUG | 冷却检查未通过,跳过: medical_remind

问题:三个请求,两个被跳过!

4.2 排查步骤:追踪评分系统的决策过程

1️⃣ 检查交互锁状态

# 查看 CooldownManager.acquire_interaction_lock()
async def acquire_interaction_lock(self, topic_id: str) -> bool:
    if self._interaction_lock.locked():
        logger.debug("交互锁已被占用,topic_id: %s", self._current_topic_id)
        return False  # ✅ 问题所在:mood_check 在 morning_greeting 还没完成时就尝试获取锁
    
    await self._interaction_lock.acquire()
    self._current_topic_id = topic_id
    return True

2️⃣ 检查冷却状态

# 查看 CooldownManager.can_interact()
def can_interact(self) -> bool:
    # 每日配额检查
    daily_count = self.get_daily_count()
    if daily_count >= self.daily_budget:
        return False  # ❌ morning_greeting 触发后 count+1,但还没到上限
    
    # 拒绝惩罚冷却期检查
    # ...
    return True  # ✅ 这里不是问题

3️⃣ 发现问题链路

┌───────────────────────────────────────────────────────────────────┐
│  时间线                                                           │
├───────────────────────────────────────────────────────────────────┤
│  08:00:01  morning_greeting 获取锁,开始执行                       │
│            │                                                      │
│  08:01:02  │ mood_check 尝试获取锁 → 失败(锁被占用)→ blocked     │
│            │                                                      │
│  08:01:05  morning_greeting 完成,释放锁                           │
│            │                                                      │
│  08:02:03  medical_remind 尝试获取锁 → 成功                        │
│            但 can_interact() 返回 False → blocked                 │
│            原因:daily_count=1,但冷却期检查...                    │
└───────────────────────────────────────────────────────────────────┘

根本原因

  1. 交互锁串行化:同一时间只允许一个关怀执行,后续请求被阻塞
  2. 没有请求队列:被阻塞的请求直接丢弃,而不是排队等待
  3. 时机窗口过短:mood_check 在 morning_greeting 执行期间到来,错过了

4.3 修复方案:请求队列 + 延迟执行

修复 1:增加请求队列

# ✅ 修改后:添加待处理请求队列
class InteractionOrchestrator:
    def __init__(self, cooldown: CooldownManager, event_bus: EventBus):
        self._cooldown = cooldown
        self._event_bus = event_bus
        self._pending_requests: list[tuple[CareTopic, dict]] = []  # 新增
    
    async def initiate_care(self, topic: CareTopic, context: dict) -> dict:
        if not await self._cooldown.acquire_interaction_lock(topic.topic_id):
            # 关键:无法获取锁时,加入队列而不是直接丢弃
            self._pending_requests.append((topic, context))
            logger.debug("加入待处理队列: %s (队列长度: %d)", 
                        topic.topic_id, len(self._pending_requests))
            return {"topic_id": topic.topic_id, "outcome": "queued"}
        
        try:
            # ... 原有执行逻辑 ...
            pass
        finally:
            self._cooldown.release_interaction_lock()
            # 关键:执行完成后,处理队列中的下一个请求
            await self._process_pending()
    
    async def _process_pending(self) -> None:
        """处理待处理队列中的请求"""
        if not self._pending_requests:
            return
        
        # 等待一小段时间,避免连续骚扰
        await asyncio.sleep(60)  # 1分钟间隔
        
        if self._pending_requests and self._cooldown.can_interact():
            topic, context = self._pending_requests.pop(0)
            await self.initiate_care(topic, context)

修复 2:评分时考虑队列状态

# ✅ 修改后:在评分时降低低优先级请求的分数
def calculate_interaction_score(self, topic: CareTopic) -> float:
    score = topic.priority * 10
    # ... 原有评分逻辑 ...
    
    # 新增:如果队列中已有高优先级请求,降低当前请求分数
    pending_high_priority = any(
        t.priority > topic.priority 
        for t, _ in self._orchestrator._pending_requests
    )
    if pending_high_priority:
        score -= 10  # 关键:让高优先级请求先执行
    
    return max(0, min(100, score))

4.4 验证结果

✅ 步骤 1:08:00:01 morning_greeting 获取锁,执行
✅ 步骤 2:08:01:02 mood_check 加入队列 (outcome=queued)
✅ 步骤 3:08:01:05 morning_greeting 完成,触发 _process_pending
✅ 步骤 4:08:02:05 mood_check 从队列取出,执行(间隔 1 分钟)
✅ 步骤 5:08:02:08 medical_remind 健康警报优先级高,插队执行

4.5 经验教训:Checklist

  • 并发请求需要队列机制,不能直接丢弃
  • 高优先级请求应该能"插队"
  • 连续关怀之间需要间隔(至少 1 分钟)
  • 日志要记录完整的决策链路(scoreoutcomequeue_length

避坑指南

  1. asyncio.Lock 不可重入:不要在持有锁时再次 acquire,会死锁
  2. 队列大小限制:防止队列无限增长,建议最多保留 5 个待处理请求
  3. 优先级反转:低优先级请求不应该阻塞高优先级请求

五、性能优化与最佳实践

5.1 性能瓶颈分析

Profiling 数据

calculate_interaction_score():  5.2ms   (含数据库查询)
_build_context():               3.1ms   (含数据库查询)
initiate_care():                2.8ms   (EventBus emit)
_check_inference_rules():       0.3ms   (纯内存)
get_time_slot():                0.01ms  (纯计算)

结论:数据库查询是主要瓶颈,hours_since()_build_context() 每次都查库。

5.2 优化策略

策略 1:评分结果缓存
# ❌ 优化前:每次评分都查数据库
def calculate_interaction_score(self, topic: CareTopic) -> float:
    hours = self._cooldown.hours_since(topic.topic_id)  # 查库
    # ...

# ✅ 优化后:缓存上次交互时间
class CooldownManager:
    def __init__(self, db_path: Path):
        self._hours_cache: dict[str, tuple[float, float]] = {}  # topic_id -> (hours, timestamp)
        self._cache_ttl = 60  # 缓存 60 秒
    
    def hours_since(self, topic_id: str) -> float:
        # 检查缓存
        if topic_id in self._hours_cache:
            cached_hours, cached_time = self._hours_cache[topic_id]
            if time.time() - cached_time < self._cache_ttl:
                return cached_hours + (time.time() - cached_time) / 3600
        
        # 缓存未命中,查数据库
        hours = self._query_hours_from_db(topic_id)
        self._hours_cache[topic_id] = (hours, time.time())
        return hours

代价:增加约 1KB 内存(15 个主题 × 64 字节)
收益:评分耗时从 5.2ms 降至 0.5ms(90% 提升)

策略 2:批量数据库查询
# ❌ 优化前:逐个查询
for topic in topics:
    hours = self._cooldown.hours_since(topic.topic_id)  # N 次查询

# ✅ 优化后:批量查询
def batch_hours_since(self, topic_ids: list[str]) -> dict[str, float]:
    """批量查询多个主题的上次交互时间"""
    placeholders = ",".join("?" * len(topic_ids))
    with self._conn() as conn:
        rows = conn.execute(f"""
            SELECT key, value FROM companion_state 
            WHERE key IN ({placeholders})
        """, [f"last_interaction_{tid}" for tid in topic_ids]).fetchall()
    
    result = {}
    for key, value in rows:
        topic_id = key.replace("last_interaction_", "")
        result[topic_id] = self._calculate_hours(value)
    
    return result

代价:代码稍复杂
收益:15 个主题的批量查询只需 1 次数据库访问

策略 3:事件订阅优先级优化
# ✅ 合理设置优先级,避免无效计算
class OpportunityDetector:
    def _setup_listeners(self) -> None:
        # 时机检测用高优先级,在其他处理之前完成
        self._event_bus.on(EventType.TOOL_CALL, self._on_tool_call, priority=200)

class MoodDetector:
    def _setup_listeners(self) -> None:
        # 情绪检测用中等优先级
        self._event_bus.on(EventType.USER_INPUT, self._on_user_input, priority=100)

class AuditLogger:
    def _setup_listeners(self) -> None:
        # 审计日志用低优先级,最后执行
        self._event_bus.on(EventType.TOOL_CALL, self._on_tool_call, priority=-100)

5.3 最佳实践总结

Do’s(推荐做法):

  • ✅ 使用缓存减少数据库查询(TTL 60 秒足够)
  • ✅ 批量查询代替循环单查
  • ✅ 为事件订阅设置合理的优先级
  • ✅ 在日志中打印完整的评分明细,便于调试
  • ✅ 使用 try-finally 确保锁的释放

Don’ts(避免做法):

  • ❌ 在评分方法中进行复杂的 I/O 操作
  • ❌ 忽略并发请求的队列管理
  • ❌ 所有事件订阅用相同优先级(会导致执行顺序不确定)
  • ❌ 在 asyncio.Lock 内部再次 acquire(死锁风险)
  • ❌ 硬编码阈值(应该使用配置文件)

黄金法则

让评分计算尽量快(< 1ms),让执行逻辑尽量稳(锁 + 队列 + 重试)。


六、总结与展望

6.1 核心要点回顾

本文讲解了 WeClaw 主动陪伴系统的五大组件架构:

5 个核心组件

  1. CooldownManager(厨师长):控制出菜节奏,防止骚扰
  2. MoodDetector(食评家):感知用户情绪,调整关怀内容
  3. OpportunityDetector(服务员):监听用户行为,捕捉关怀时机
  4. InteractionOrchestrator(传菜员):执行关怀流程,管理生命周期
  5. CompanionEngine(餐厅经理):统一调度,七层评分决策

1 个核心公式

主动陪伴决策 = 
    七层评分(时段+紧急度+情绪+空闲度) 
    × 冷却控制(配额+惩罚+忽略) 
    × 事件驱动(工具调用+用户输入)

6.2 下一步学习方向

前置知识

  • ✅ Python asyncio 异步编程
  • ✅ EventBus 发布-订阅模式
  • ✅ SQLite 基本操作

后续主题

  • 📖 下一篇:《第 22 篇:防骚扰冷却机制深度解析 — 每日配额、拒绝惩罚与动态调整》
  • 🔜 下下一篇:《第 23 篇:渐进式用户建档 — 如何在自然对话中收集用户信息》

扩展阅读

6.3 互动环节

思考题

  1. 如果用户连续 3 次忽略主动关怀,系统应该如何调整策略?(提示:动态配额调整)
  2. 当检测到用户情绪低落(negative)时,应该优先触发哪类关怀主题?为什么?
  3. 如何设计一个"紧急关怀"机制,让某些高优先级关怀(如健康警报)可以突破冷却限制?

讨论话题

在你的 AI 助手项目中,你是如何实现"主动"功能的?是定时轮询还是事件驱动?遇到过哪些骚扰用户的问题?欢迎在评论区分享你的经验!


下期预告:《第 22 篇:防骚扰冷却机制深度解析》

  • 🛡️ 每日配额的动态调整算法(正反馈 → 增加,负反馈 → 减少)
  • ⏰ 拒绝惩罚期的设计思路(为什么是 4 小时?)
  • 📊 连续忽略检测与用户活跃度分析
  • 💾 状态持久化:跨进程的冷却状态同步

敬请期待!


附录 A:完整代码清单

文件路径 行数 作用
src/core/companion_engine.py 1409 行 五大组件核心实现
src/core/companion_topics.py 439 行 CareTopic 数据类、注册表、推断规则
src/core/events.py 226 行 事件类型定义(含 COMPANION_* 事件)

总代码量:约 2074 行
关键方法:18 个(calculate_interaction_score、initiate_care、check* 等)
预定义主题:15 个(早安、心情、生日、健康等)


附录 B:参考资料

  1. Python asyncio 官方文档
  2. 事件驱动架构 | Martin Fowler
  3. SQLite 性能优化
  4. 上一篇:《第 20 篇:[主题待定]》
  5. 下一篇:《第 22 篇:防骚扰冷却机制深度解析》

版权声明:本文为 CSDN 博主「翁勇刚」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/yweng18/article/details/xxxxxx(待发布后更新)

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐