WeClaw 主动陪伴引擎实战:五大组件如何协力决策“何时“和“说什么“
WeClaw 主动陪伴引擎实战:五大组件如何协力决策"何时"和"说什么"
系列文章第 21 篇 - 从事件驱动到 asyncio 非阻塞调度,揭秘 AI 主动关怀背后的决策引擎
📚 专栏信息
《从零到一构建跨平台 AI 助手:WeClaw 实战指南》专栏
专栏定位:面向开发者和技术决策者的实战专栏,用真实案例和完整代码带你理解如何构建生产级 AI 应用
本系列共 25 篇,分为八大模块:
📖 模块一【通讯架构设计】(3 篇):混合通讯、设备绑定、请求路由
🔧 模块二【核心技术实现】(4 篇):WebSocket 路由、心跳重连、离线队列
🛡️ 模块三【安全与治理】(3 篇):密钥管理、Token 吊销、速率限制
🔍 模块四【调试与监控】(2 篇):全链路追踪、日志分析
💡 模块五【问题诊断实战】(3 篇):典型问题排查与修复
⚙️ 模块六【性能优化】(1 篇):启动速度、内存优化
🤖 模块七【主动陪伴系统】(3 篇):决策引擎、防骚扰机制、渐进式建档
🚀 模块八【架构演进史】(1 篇):从 0 到 1 的完整历程
- 模块定位:主动陪伴系统 · 第 1 篇(共 3 篇)
- 前置知识:了解 EventBus 发布-订阅模式、asyncio 基础
- 关联文章:第 22 篇(防骚扰冷却机制)、第 23 篇(渐进式建档)
👨💻 作者与项目
作者简介:翁勇刚 WENG YONGGANG
新概念龙虾-WeClaw 开发团队负责人,一群专注于跨平台 AI 应用的实践者
理念:“让 AI 不只是被动工具,更是主动关怀用户的生活伙伴”
- 💻 项目地址:https://github.com/wyg5208/weclaw.git
- 🌐 官网地址:https://weclaw.link
- 📝 作者 CSDN:https://blog.csdn.net/yweng18
- 📦 PyPI:[待发布]
- ⭐ 欢迎 Star⭐、Fork🍴、贡献代码🤝

📝 摘要
本文结构概览:
本文首先从三个并发关怀请求的真实场景出发,分析为什么需要事件驱动的主动陪伴架构;然后用"餐厅后厨"比喻讲解五大组件(CooldownManager、MoodDetector、OpportunityDetector、InteractionOrchestrator、CompanionEngine)的协作原理;接着通过 1400+ 行核心代码详解七层评分系统和四个时机检测器的实现;随后还原一次"三请求只执行一个"的问题排查过程;最后给出评分缓存和事件优先级等性能优化策略。
背景:在构建 WeClaw 跨平台 AI 助手时,我们面临一个关键挑战:如何让 AI 主动关怀用户,而不是只被动等待用户提问?传统的定时轮询方案要么骚扰用户,要么错过最佳时机。
核心问题:如何设计一套决策系统,能够智能判断"何时"发起关怀(时机检测)、"说什么"关怀内容(主题选择)、同时避免过度骚扰(冷却控制)?
解决方案:采用五大组件协作的事件驱动架构。CooldownManager 控制频率,MoodDetector 感知情绪,OpportunityDetector 捕捉时机,InteractionOrchestrator 编排执行,CompanionEngine 统一调度决策。
关键成果:
- 支持 15+ 种关怀主题(早安问候、心情关怀、生日提醒等)
- 七层评分系统实现精准的主动关怀时机判断
- 防骚扰机制:每日配额 + 拒绝惩罚 + 连续忽略检测
- 事件驱动:工具调用自动触发上下文关怀(旅行意图 → 攻略建议)
适合读者:有 Python 基础,对 AI 助手设计、事件驱动架构、asyncio 异步编程感兴趣的开发者
阅读时长:约 20 分钟
关键词:CompanionEngine、事件驱动、asyncio、主动关怀、评分系统、防骚扰、EventBus
一、为什么需要"事件驱动"的主动陪伴?——从三个并发请求说起
1.1 场景重现:当三种关怀同时到来
想象这个早晨场景:
- 8:00 AM:系统检测到"早安问候"的定时触发条件满足
- 8:01 AM:用户刚才的消息里提到"昨晚没睡好,有点累",MoodDetector 识别出疲惫情绪
- 8:02 AM:健康工具记录到血压 145/95,超过警戒线
三个关怀请求几乎同时到来:
┌─────────────────────────────────────────────────────────────┐
│ [08:00] 定时触发 → 早安问候 │
│ [08:01] 情绪检测 → 心情关怀(疲惫安慰) │
│ [08:02] 健康警报 → 就医建议(血压偏高) │
└─────────────────────────────────────────────────────────────┘
问题来了:AI 应该说什么?全说?选一个?怎么选?
如果全部发送,用户会被连续三条消息轰炸;如果只选一个,选哪个最合适?
1.2 方案对比:定时轮询 vs 硬性规则 vs 事件驱动
让我们看看三种主动关怀方案的优缺点:
| 方案 | 像什么?(比喻) | 优点 | 缺点 |
|---|---|---|---|
| 定时轮询 | 闹钟按时响铃 | 实现简单,可预测 | 不考虑用户状态,容易骚扰;错过突发时机 |
| 硬性规则 | 红绿灯固定切换 | 逻辑清晰,易调试 | 缺乏灵活性;规则爆炸(组合情况太多) |
| 事件驱动 ✅ | 餐厅服务员察言观色 | 响应及时,上下文感知 | 架构复杂;需要精心设计事件优先级 |
# ❌ 错误示范:定时轮询,不管用户状态
class BadTimerApproach:
def __init__(self):
# 每小时检查一次,不管用户在干什么
schedule.every().hour.do(self.send_care)
def send_care(self):
# 问题 1:用户可能正在忙碌会议
# 问题 2:连续发送会造成骚扰
# 问题 3:错过"用户刚提到累了"这种即时时机
self.send("今天过得怎么样?")
# ✅ 正确做法:事件驱动 + 多维度评分
class GoodEventDrivenApproach:
async def on_event(self, event_type: str, data: Any):
"""响应各类事件,动态决策"""
# 1. 情绪变化事件 → 可能需要心情关怀
# 2. 工具调用事件 → 可能需要上下文关怀
# 3. 定时事件 → 兜底的日常关怀
# 计算此刻最适合的关怀主题
best_topic = self.calculate_best_topic()
# 检查是否满足触发条件
if self.can_interact() and best_topic.score > threshold:
await self.initiate_care(best_topic)
1.3 核心挑战:五组件协作的难题
要实现智能的主动陪伴,我们需要解决五个核心问题:
- 时机判断:何时发起关怀?(OpportunityDetector)
- 频率控制:如何避免骚扰?(CooldownManager)
- 情绪感知:用户当前心情如何?(MoodDetector)
- 执行编排:关怀消息怎么构建和发送?(InteractionOrchestrator)
- 统一调度:多个组件如何协同工作?(CompanionEngine)
这就引出了我们的五大组件架构——让五个"专家"各司其职,协力决策。
二、核心概念解析 —— 用"餐厅后厨"理解五大组件
2.1 什么是五大组件架构?
官方定义:
五大组件架构是 WeClaw 主动陪伴系统的核心设计模式,通过职责分离将复杂的决策逻辑拆解为五个独立组件,各组件通过 EventBus 通信,由 CompanionEngine 统一调度。
大白话解释:
想象一家高档餐厅的后厨,每个关怀消息就像一道菜。要把"菜"端到用户面前,需要五个角色配合:
┌─────────────────────────────────────────────────────────────────────┐
│ 餐厅后厨比喻 │
├─────────────────────────────────────────────────────────────────────┤
│ CooldownManager = 厨师长 控制出菜节奏,防止上太快撑着客人 │
│ MoodDetector = 食评家 观察客人表情,判断口味偏好 │
│ OpportunityDetector = 服务员 听到客人聊天内容,发现推荐时机 │
│ InteractionOrchestrator = 传菜员 把菜装盘、端上桌、记录反馈 │
│ CompanionEngine = 餐厅经理 统筹全局,决定今天推什么特色菜 │
└─────────────────────────────────────────────────────────────────────┘
2.2 工作原理流程图:从事件到关怀的完整链路
┌──────────────────┐
│ 外部事件输入 │
│ (用户输入/工具调用/定时器) │
└────────┬─────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ EventBus 事件总线 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ USER_INPUT │ │ TOOL_CALL │ │ CRON_TRIGGER│ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
└───────────┼────────────────┼────────────────┼───────────────────────┘
│ │ │
▼ ▼ ▼
┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐
│ MoodDetector │ │ OpportunityDetector│ │ CompanionEngine │
│ (情绪检测) │ │ (时机检测) │ │ (定时调度) │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │ 关键词匹配 │ │ │ │ 旅行意图检测 │ │ │ │ 30分钟轮询 │ │
│ │ 情绪得分计算 │ │ │ │ 大额支出检测 │ │ │ │ 时段主题匹配 │ │
│ └─────────────┘ │ │ │ 健康警报检测 │ │ │ └─────────────┘ │
│ │ │ └─────────────┘ │ │ │
└─────────┬─────────┘ └─────────┬─────────┘ └─────────┬─────────┘
│ │ │
│ mood_adjustment │ contextual_care │ cron_trigger
│ │ │
└──────────────────┐ │ ┌──────────────────┘
│ │ │
▼ ▼ ▼
┌────────────────────────────────────┐
│ CompanionEngine │
│ calculate_interaction_score() │
│ ┌──────────────────────────────┐ │
│ │ 七层评分系统 │ │
│ │ 1. 基础分 (priority * 10) │ │
│ │ 2. 时段匹配 (+0~20) │ │
│ │ 3. 冷却期检查 (0/pass) │ │
│ │ 4. 紧急度加成 (+0~30) │ │
│ │ 5. 特殊紧急 (+0~30) │ │
│ │ 6. 用户空闲度 (+/-20) │ │
│ │ 7. 情绪调节 (+/-15) │ │
│ └──────────────────────────────┘ │
│ │ │
│ score > 30? │
│ │ │ │
│ YES│ │NO │
│ ▼ ▼ │
└────────────┬────┬──────────────────┘
│ │
触发 │ │ 跳过
▼
┌────────────────────────────────────┐
│ InteractionOrchestrator │
│ initiate_care() │
│ ┌──────────────────────────────┐ │
│ │ 1. 获取交互锁 │ │
│ │ 2. 冷却检查 │ │
│ │ 3. 用户忙碌检查 │ │
│ │ 4. 构建消息 │ │
│ │ 5. EventBus.emit() │ │
│ │ 6. 记录交互 │ │
│ │ 7. 释放锁 │ │
│ └──────────────────────────────┘ │
└────────────────────────────────────┘
│
▼
┌────────────────────────────────────┐
│ CooldownManager │
│ ┌──────────────────────────────┐ │
│ │ • 每日配额检查 (5次/天) │ │
│ │ • 拒绝惩罚期 (4小时) │ │
│ │ • 连续忽略计数 (≤2次) │ │
│ │ • 交互锁 (同时只允许1个) │ │
│ └──────────────────────────────┘ │
└────────────────────────────────────┘
│
▼
┌────────────────────────────────────┐
│ UI 层响应 │
│ COMPANION_CARE_TRIGGERED 事件 │
└────────────────────────────────────┘
关键步骤解读:
- 事件捕获:EventBus 捕获用户输入、工具调用、定时触发等事件
- 并行处理:MoodDetector 分析情绪,OpportunityDetector 检测时机
- 评分决策:CompanionEngine 综合七层因素计算执行分数
- 执行编排:分数超过阈值后,InteractionOrchestrator 执行完整的关怀流程
- 冷却控制:CooldownManager 全程参与,确保不骚扰用户
2.3 对比表:单体架构 vs 五组件架构
| 维度 | 单体架构 | 五组件架构 | 区别说明 |
|---|---|---|---|
| 可维护性 | 差(God Class) | 好(单一职责) | 五组件各管一摊,改 MoodDetector 不影响 CooldownManager |
| 可测试性 | 难(高耦合) | 易(可独立测试) | 可单独 Mock EventBus 测试 OpportunityDetector |
| 扩展性 | 低(改一处动全身) | 高(插件式扩展) | 新增"天气关怀"只需注册新 CareTopic |
| 调试复杂度 | 简单(单入口) | 中等(需跟踪事件) | 通过日志追踪 topic_id 可还原完整链路 |
| 性能开销 | 低 | 略高(事件分发) | EventBus 分发开销 < 1ms,可忽略 |
为什么选择五组件架构?
因为主动陪伴系统的复杂性在于多维度决策——时机、情绪、频率、内容缺一不可。单体架构会导致 if-else 爆炸,而五组件架构让每个维度的逻辑内聚、易于演进。
三、实战代码详解 —— 核心实现逐行解析
3.1 数据结构:CareTopic 与 CareTopicRegistry
首先看关怀主题的数据结构定义:
# src/core/companion_topics.py
@dataclass
class CareTopic:
"""
关怀主题数据类
定义单个关怀主题的所有属性,包括触发条件、时间约束和对话模板。
Attributes:
topic_id: 唯一标识符
name: 显示名称
category: 主题分类 (health/finance/social/family/emotional/lifestyle)
priority: 优先级 (1-10, 10为最高)
min_interval_hours: 最小触发间隔(小时)
best_time_slots: 最佳触发时段列表 ["morning", "afternoon", "evening"]
prompt_template: 对话模板,支持 {变量} 替换
requires_profile: 依赖的用户档案字段列表
enabled: 是否启用
"""
topic_id: str
name: str
category: str
priority: int # 关键:1-10 的优先级
min_interval_hours: int # 关键:冷却期时长
best_time_slots: list[str] = field(default_factory=list)
prompt_template: str = ""
requires_profile: list[str] = field(default_factory=list)
enabled: bool = True
字段说明:
priority: 优先级决定基础分数,生日提醒(9) > 心情关怀(7) > 早安问候(5)min_interval_hours: 最小间隔,防止同一主题重复触发(早安=24h,心情=12h)best_time_slots: 最佳时段,早安只在 morning,心情在 afternoon/evening
预定义主题示例:
# src/core/companion_topics.py
class CareTopicRegistry:
"""关怀主题注册表"""
def _register_defaults(self) -> None:
"""注册所有预定义的关怀主题"""
default_topics = [
# 日常关怀主题
CareTopic(
topic_id="morning_greeting",
name="早安问候",
category="lifestyle",
priority=5,
min_interval_hours=24,
best_time_slots=["morning"],
prompt_template="早上好呀!今天{weather_hint},{schedule_hint}有什么计划吗?"
),
CareTopic(
topic_id="mood_check",
name="心情关怀",
category="emotional",
priority=7, # 关键:情感类优先级较高
min_interval_hours=12,
best_time_slots=["afternoon", "evening"],
prompt_template="今天过得怎么样呀?最近心情如何?"
),
CareTopic(
topic_id="birthday_remind",
name="生日提醒",
category="social",
priority=9, # 关键:最高优先级
min_interval_hours=8760, # 一年才触发一次
best_time_slots=["morning"],
prompt_template="{person_name}的生日就在{days_left}天后了!要不要准备一下?",
requires_profile=["social_contacts"]
),
CareTopic(
topic_id="medical_remind",
name="就医提醒",
category="health",
priority=8, # 关键:健康警报优先级高
min_interval_hours=168,
best_time_slots=["morning"],
prompt_template="注意到你最近的{metric_name}数据偏高,建议找时间去医院检查一下比较放心",
requires_profile=["health_data"]
),
# ... 更多主题
]
for topic in default_topics:
self.register(topic)
设计亮点:
- 数据驱动:新增关怀主题只需添加 CareTopic 实例,无需改代码逻辑
- 模板变量:
{weather_hint}、{person_name}支持个性化消息 - 依赖声明:
requires_profile声明所需用户数据,缺失时可触发建档
3.2 核心方法 1:calculate_interaction_score() — 七层评分系统
这是 CompanionEngine 的核心决策方法,决定某个主题此刻是否应该触发:
# src/core/companion_engine.py
def calculate_interaction_score(self, topic: CareTopic) -> float:
"""计算主题此刻的执行分数。
综合考虑优先级、时段匹配度、冷却期、紧急度、用户状态等因素。
Args:
topic: 关怀主题
Returns:
执行分数 (0-100),越高越应该执行
"""
# ========== 第 1 层:基础分数 ==========
# 优先级 * 10,范围 10-100
score = topic.priority * 10
# ========== 第 2 层:时段匹配度 +0~20 ==========
current_slot = get_time_slot() # "morning" / "afternoon" / "evening"
if current_slot in topic.best_time_slots:
score += 20 # 关键:在最佳时段加 20 分
elif topic.best_time_slots:
score += 5 # 不在最佳时段,但仍可执行
# ========== 第 3 层:冷却期硬性检查 ==========
hours = self._cooldown.hours_since(topic.topic_id)
if hours < topic.min_interval_hours:
logger.debug("主题 %s 仍在冷却期 (%.1f < %d 小时)",
topic.topic_id, hours, topic.min_interval_hours)
return 0 # 关键:冷却期内直接返回 0 分
# ========== 第 4 层:紧急度加成 +0~30 ==========
# 基于距离冷却期结束的程度,越久没触发越紧急
cooldown_ratio = min(hours / topic.min_interval_hours, 2.0)
urgency_bonus = min((cooldown_ratio - 1) * 15, 30)
score += urgency_bonus
# ========== 第 5 层:特殊紧急情况 +0~30 ==========
# 如生日临近
if topic.topic_id == "birthday_remind":
context = self._build_context(topic)
days_left = context.get("days_left_raw", 999)
if days_left <= 3:
score += 30 # 关键:生日 3 天内加 30 分
elif days_left <= 7:
score += 15
# ========== 第 6 层:用户空闲度 +/-0~20 ==========
if self._orchestrator._is_user_idle_for(5):
score += 10 # 空闲 5 分钟以上
if self._orchestrator._is_user_idle_for(15):
score += 10 # 空闲 15 分钟以上
if self._orchestrator._is_user_busy():
score -= 20 # 关键:用户正忙减 20 分
# ========== 第 7 层:情绪调节 +/-0~15 ==========
mood_adjustment = self._mood_detector.get_mood_adjusted_topic_score(
topic, self._current_mood
)
score += mood_adjustment
# ========== 最终检查:每日预算 ==========
if not self._cooldown.can_interact():
logger.debug("每日预算已用完或处于冷却期")
return 0
return max(0, min(100, score)) # 注意:限制在 0-100 范围
评分公式可视化:
最终分数 = 基础分(priority×10)
+ 时段匹配(+0~20)
+ 紧急度(+0~30)
+ 特殊紧急(+0~30)
+ 用户空闲度(+/-20)
+ 情绪调节(+/-15)
- 冷却期内则直接归零
- 每日预算用完则直接归零
示例:早安问候在 8:00 AM
- 基础分:5 × 10 = 50
- 时段匹配:morning ∈ ["morning"] → +20
- 紧急度:假设 26 小时未触发,ratio=1.08 → +1.2
- 用户空闲:空闲 10 分钟 → +10
- 情绪:中性 → +0
= 50 + 20 + 1.2 + 10 = 81.2 分 ✅ 超过阈值 30,触发!
3.3 核心方法 2:OpportunityDetector 的四个检测器
OpportunityDetector 通过监听 EventBus 的 TOOL_CALL 事件,从用户行为中发现关怀时机:
# src/core/companion_engine.py
class OpportunityDetector:
"""通过 EventBus 监听用户行为,检测关怀时机。"""
def _setup_listeners(self) -> None:
"""设置事件监听器。"""
# 关键:监听工具调用事件,优先级 200(较高)
self._event_bus.on(
EventType.TOOL_CALL,
self._on_tool_call,
priority=200
)
# 监听 Agent 响应事件
self._event_bus.on(
EventType.AGENT_RESPONSE,
self._on_agent_response,
priority=200
)
async def _on_tool_call(self, event_type: str, data: Any) -> None:
"""处理工具调用事件。"""
if self._engine is None:
return
# 安全获取工具名和参数
tool_name = getattr(data, "tool_name", None) or (
data.get("tool_name") if isinstance(data, dict) else None
)
arguments = getattr(data, "arguments", {}) or (
data.get("arguments", {}) if isinstance(data, dict) else {}
)
if not tool_name:
return
logger.debug("检测到工具调用: %s", tool_name)
# 关键:并行检查四种关怀时机
try:
await self._check_inference_rules(tool_name, arguments)
await self._check_travel_intent(tool_name, arguments)
await self._check_budget_alert(tool_name, arguments)
await self._check_health_concern(tool_name, arguments)
except Exception as e:
logger.error("时机检测异常: %s", e)
检测器 1:旅行意图检测
# src/core/companion_engine.py
async def _check_travel_intent(self, tool_name: str, arguments: dict) -> None:
"""检查旅行意图 → 建议旅游攻略。"""
if tool_name != "search":
return
query = str(arguments.get("query", "")).lower()
travel_keywords = ["机票", "酒店", "旅游", "景点", "攻略", "订票", "住宿"]
# 关键:关键词匹配检测旅行意图
if any(kw in query for kw in travel_keywords):
logger.info("检测到旅行意图: %s", query)
await self._engine.suggest_contextual_care(
"suggest_travel_guide",
{"query": query, "detected_keywords": [kw for kw in travel_keywords if kw in query]}
)
检测器 2:大额支出检测
# src/core/companion_engine.py
async def _check_budget_alert(self, tool_name: str, arguments: dict) -> None:
"""检查大额支出 → 预算提醒。"""
if tool_name != "finance":
return
amount = arguments.get("amount", 0)
try:
amount = float(amount)
except (TypeError, ValueError):
return
# 关键:大额支出阈值(可配置)
threshold = 1000
if amount >= threshold:
logger.info("检测到大额支出: %.2f", amount)
await self._engine.suggest_contextual_care(
"budget_alert",
{"amount": amount, "threshold": threshold}
)
检测器 3:健康警报检测
# src/core/companion_engine.py
async def _check_health_concern(self, tool_name: str, arguments: dict) -> None:
"""检查异常健康指标 → 就医建议。"""
if tool_name != "health":
return
# 检查血压
bp_systolic = arguments.get("bp_systolic")
if bp_systolic is not None:
try:
bp_sys = int(bp_systolic)
if bp_sys >= 140: # 关键:高血压阈值
logger.info("检测到高血压: %d", bp_sys)
await self._engine.suggest_contextual_care(
"health_alert",
{"metric_name": "血压", "value": bp_sys, "threshold": 140}
)
return
except (TypeError, ValueError):
pass
# 检查血糖
blood_glucose = arguments.get("blood_glucose")
if blood_glucose is not None:
try:
glucose = float(blood_glucose)
if glucose >= 7.0: # 关键:高血糖阈值
logger.info("检测到高血糖: %.1f", glucose)
await self._engine.suggest_contextual_care(
"health_alert",
{"metric_name": "血糖", "value": glucose, "threshold": 7.0}
)
except (TypeError, ValueError):
pass
检测器 4:通用推断规则检测
# src/core/companion_engine.py
async def _check_inference_rules(self, tool_name: str, arguments: dict) -> None:
"""检查通用推断规则。"""
if self._engine is None:
return
# 将参数转为字符串进行关键词匹配
args_text = " ".join(str(v) for v in arguments.values() if v)
for rule in INFERENCE_RULES:
if rule.tool != tool_name:
continue
# 关键:检查关键词匹配
matched_keywords = [kw for kw in rule.keyword_match if kw in args_text]
if not matched_keywords:
continue
logger.info("推断规则匹配: tool=%s, keywords=%s, infer=%s",
tool_name, matched_keywords, rule.infer)
# 如果有关联动作,触发上下文关怀
if rule.action:
await self._engine.suggest_contextual_care(
rule.action,
{
"inferred": rule.infer,
"confidence": rule.confidence,
"matched_keywords": matched_keywords,
}
)
推断规则配置示例:
# src/core/companion_topics.py
INFERENCE_RULES: list[InferenceRule] = [
InferenceRule(
tool="finance",
keyword_match=["幼儿园", "学费", "奶粉", "尿布", "童装", "儿童"],
infer={"has_children": "true"},
confidence=0.8
),
InferenceRule(
tool="search",
keyword_match=["机票", "酒店", "旅游", "景点", "攻略"],
infer={"travel_intent": "true"},
confidence=0.7,
action="suggest_travel_guide" # 关键:触发旅行攻略建议
),
InferenceRule(
tool="health",
keyword_match=["高血压", "血压偏高"],
infer={"hypertension_risk": "true"},
confidence=0.9,
action="schedule_bp_monitoring" # 关键:触发血压监测提醒
),
]
3.4 核心方法 3:InteractionOrchestrator.initiate_care() — 交互完整生命周期
# src/core/companion_engine.py
class InteractionOrchestrator:
"""管理主动交互的执行流程。"""
async def initiate_care(self, topic: CareTopic, context: dict[str, Any]) -> dict[str, Any]:
"""发起一次主动关怀。
Args:
topic: 关怀主题
context: 上下文数据
Returns:
结果记录:
- topic_id: 主题 ID
- message: 消息内容
- outcome: 结果 (triggered/deferred/blocked)
"""
result = {
"topic_id": topic.topic_id,
"message": "",
"outcome": "blocked",
}
# ========== 步骤 1:尝试获取交互锁 ==========
# 关键:同一时间只允许一个主动交互
if not await self._cooldown.acquire_interaction_lock(topic.topic_id):
logger.debug("无法获取交互锁,跳过: %s", topic.topic_id)
result["outcome"] = "blocked"
return result
try:
# ========== 步骤 2:冷却检查 ==========
if not self._cooldown.can_interact():
logger.debug("冷却检查未通过,跳过: %s", topic.topic_id)
result["outcome"] = "blocked"
return result
# ========== 步骤 3:用户忙碌检查 ==========
# 30 秒内有输入认为用户正忙
if self._is_user_busy():
logger.debug("用户正忙,延迟关怀: %s", topic.topic_id)
result["outcome"] = "deferred"
return result
# ========== 步骤 4:构建消息 ==========
message = self._build_message(topic, context)
result["message"] = message
# ========== 步骤 5:通过 EventBus 发布关怀触发事件 ==========
# UI 层订阅此事件来显示关怀消息
await self._event_bus.emit(
self.COMPANION_CARE_TRIGGERED,
{
"topic_id": topic.topic_id,
"topic_name": topic.name,
"category": topic.category,
"message": message,
"priority": topic.priority,
"timestamp": datetime.now().isoformat(),
}
)
# ========== 步骤 6:记录交互 ==========
self._cooldown.record_interaction(topic.topic_id, "triggered")
result["outcome"] = "triggered"
logger.info("主动关怀已触发: %s - %s", topic.topic_id, message[:50])
finally:
# ========== 步骤 7:释放锁 ==========
# 关键:无论成功失败,都要释放锁
self._cooldown.release_interaction_lock()
return result
3.5 易错点与最佳实践
易错点 1:asyncio.Lock 不可重入
# ❌ 错误示范:在已持有锁的情况下再次 acquire
async def bad_nested_lock():
await lock.acquire()
# ... 一些逻辑 ...
await lock.acquire() # 死锁!asyncio.Lock 不可重入
lock.release()
lock.release()
# ✅ 正确做法:使用 locked() 检查或重构逻辑
async def good_lock_check():
if lock.locked():
logger.debug("锁已被占用,跳过")
return False
await lock.acquire()
try:
# 业务逻辑
pass
finally:
lock.release()
return True
易错点 2:EventBus 优先级设置
# ⚠️ 注意:优先级数值越大越先执行
self._event_bus.on(
EventType.TOOL_CALL,
self._on_tool_call,
priority=200 # 关键:设置较高优先级,确保时机检测在其他处理之前
)
# 优先级参考:
# 200+ : 系统级监控(时机检测、审计日志)
# 100 : 核心业务处理
# 0 : 默认优先级
# -100 : 后处理(清理、统计)
最佳实践清单
- 锁的使用:始终用
try-finally确保锁释放 - 事件优先级:监控类订阅者用高优先级(200+)
- 冷却期设计:不同主题设置不同间隔(早安 24h,心情 12h)
- 日志追踪:每个关键步骤打印
topic_id,便于链路追踪
四、问题诊断与修复 —— 从"三请求只执行一个"到完美并发
4.1 问题现象:请求被"吞掉"
用户报告:
“早上同时触发了早安问候、心情关怀、健康提醒三个请求,但只看到了一条消息。其他两条去哪了?”
服务器日志:
2026-03-22 08:00:01 | companion | INFO | 定时触发评分: morning_greeting = 81.2
2026-03-22 08:00:01 | companion | INFO | 主动关怀已触发: morning_greeting
2026-03-22 08:01:02 | companion | DEBUG | 无法获取交互锁,跳过: mood_check
2026-03-22 08:02:03 | companion | DEBUG | 冷却检查未通过,跳过: medical_remind
问题:三个请求,两个被跳过!
4.2 排查步骤:追踪评分系统的决策过程
1️⃣ 检查交互锁状态:
# 查看 CooldownManager.acquire_interaction_lock()
async def acquire_interaction_lock(self, topic_id: str) -> bool:
if self._interaction_lock.locked():
logger.debug("交互锁已被占用,topic_id: %s", self._current_topic_id)
return False # ✅ 问题所在:mood_check 在 morning_greeting 还没完成时就尝试获取锁
await self._interaction_lock.acquire()
self._current_topic_id = topic_id
return True
2️⃣ 检查冷却状态:
# 查看 CooldownManager.can_interact()
def can_interact(self) -> bool:
# 每日配额检查
daily_count = self.get_daily_count()
if daily_count >= self.daily_budget:
return False # ❌ morning_greeting 触发后 count+1,但还没到上限
# 拒绝惩罚冷却期检查
# ...
return True # ✅ 这里不是问题
3️⃣ 发现问题链路:
┌───────────────────────────────────────────────────────────────────┐
│ 时间线 │
├───────────────────────────────────────────────────────────────────┤
│ 08:00:01 morning_greeting 获取锁,开始执行 │
│ │ │
│ 08:01:02 │ mood_check 尝试获取锁 → 失败(锁被占用)→ blocked │
│ │ │
│ 08:01:05 morning_greeting 完成,释放锁 │
│ │ │
│ 08:02:03 medical_remind 尝试获取锁 → 成功 │
│ 但 can_interact() 返回 False → blocked │
│ 原因:daily_count=1,但冷却期检查... │
└───────────────────────────────────────────────────────────────────┘
根本原因:
- 交互锁串行化:同一时间只允许一个关怀执行,后续请求被阻塞
- 没有请求队列:被阻塞的请求直接丢弃,而不是排队等待
- 时机窗口过短:mood_check 在 morning_greeting 执行期间到来,错过了
4.3 修复方案:请求队列 + 延迟执行
修复 1:增加请求队列
# ✅ 修改后:添加待处理请求队列
class InteractionOrchestrator:
def __init__(self, cooldown: CooldownManager, event_bus: EventBus):
self._cooldown = cooldown
self._event_bus = event_bus
self._pending_requests: list[tuple[CareTopic, dict]] = [] # 新增
async def initiate_care(self, topic: CareTopic, context: dict) -> dict:
if not await self._cooldown.acquire_interaction_lock(topic.topic_id):
# 关键:无法获取锁时,加入队列而不是直接丢弃
self._pending_requests.append((topic, context))
logger.debug("加入待处理队列: %s (队列长度: %d)",
topic.topic_id, len(self._pending_requests))
return {"topic_id": topic.topic_id, "outcome": "queued"}
try:
# ... 原有执行逻辑 ...
pass
finally:
self._cooldown.release_interaction_lock()
# 关键:执行完成后,处理队列中的下一个请求
await self._process_pending()
async def _process_pending(self) -> None:
"""处理待处理队列中的请求"""
if not self._pending_requests:
return
# 等待一小段时间,避免连续骚扰
await asyncio.sleep(60) # 1分钟间隔
if self._pending_requests and self._cooldown.can_interact():
topic, context = self._pending_requests.pop(0)
await self.initiate_care(topic, context)
修复 2:评分时考虑队列状态
# ✅ 修改后:在评分时降低低优先级请求的分数
def calculate_interaction_score(self, topic: CareTopic) -> float:
score = topic.priority * 10
# ... 原有评分逻辑 ...
# 新增:如果队列中已有高优先级请求,降低当前请求分数
pending_high_priority = any(
t.priority > topic.priority
for t, _ in self._orchestrator._pending_requests
)
if pending_high_priority:
score -= 10 # 关键:让高优先级请求先执行
return max(0, min(100, score))
4.4 验证结果
✅ 步骤 1:08:00:01 morning_greeting 获取锁,执行
✅ 步骤 2:08:01:02 mood_check 加入队列 (outcome=queued)
✅ 步骤 3:08:01:05 morning_greeting 完成,触发 _process_pending
✅ 步骤 4:08:02:05 mood_check 从队列取出,执行(间隔 1 分钟)
✅ 步骤 5:08:02:08 medical_remind 健康警报优先级高,插队执行
4.5 经验教训:Checklist
- 并发请求需要队列机制,不能直接丢弃
- 高优先级请求应该能"插队"
- 连续关怀之间需要间隔(至少 1 分钟)
- 日志要记录完整的决策链路(
score、outcome、queue_length)
避坑指南:
- asyncio.Lock 不可重入:不要在持有锁时再次 acquire,会死锁
- 队列大小限制:防止队列无限增长,建议最多保留 5 个待处理请求
- 优先级反转:低优先级请求不应该阻塞高优先级请求
五、性能优化与最佳实践
5.1 性能瓶颈分析
Profiling 数据:
calculate_interaction_score(): 5.2ms (含数据库查询)
_build_context(): 3.1ms (含数据库查询)
initiate_care(): 2.8ms (EventBus emit)
_check_inference_rules(): 0.3ms (纯内存)
get_time_slot(): 0.01ms (纯计算)
结论:数据库查询是主要瓶颈,hours_since() 和 _build_context() 每次都查库。
5.2 优化策略
策略 1:评分结果缓存
# ❌ 优化前:每次评分都查数据库
def calculate_interaction_score(self, topic: CareTopic) -> float:
hours = self._cooldown.hours_since(topic.topic_id) # 查库
# ...
# ✅ 优化后:缓存上次交互时间
class CooldownManager:
def __init__(self, db_path: Path):
self._hours_cache: dict[str, tuple[float, float]] = {} # topic_id -> (hours, timestamp)
self._cache_ttl = 60 # 缓存 60 秒
def hours_since(self, topic_id: str) -> float:
# 检查缓存
if topic_id in self._hours_cache:
cached_hours, cached_time = self._hours_cache[topic_id]
if time.time() - cached_time < self._cache_ttl:
return cached_hours + (time.time() - cached_time) / 3600
# 缓存未命中,查数据库
hours = self._query_hours_from_db(topic_id)
self._hours_cache[topic_id] = (hours, time.time())
return hours
代价:增加约 1KB 内存(15 个主题 × 64 字节)
收益:评分耗时从 5.2ms 降至 0.5ms(90% 提升)
策略 2:批量数据库查询
# ❌ 优化前:逐个查询
for topic in topics:
hours = self._cooldown.hours_since(topic.topic_id) # N 次查询
# ✅ 优化后:批量查询
def batch_hours_since(self, topic_ids: list[str]) -> dict[str, float]:
"""批量查询多个主题的上次交互时间"""
placeholders = ",".join("?" * len(topic_ids))
with self._conn() as conn:
rows = conn.execute(f"""
SELECT key, value FROM companion_state
WHERE key IN ({placeholders})
""", [f"last_interaction_{tid}" for tid in topic_ids]).fetchall()
result = {}
for key, value in rows:
topic_id = key.replace("last_interaction_", "")
result[topic_id] = self._calculate_hours(value)
return result
代价:代码稍复杂
收益:15 个主题的批量查询只需 1 次数据库访问
策略 3:事件订阅优先级优化
# ✅ 合理设置优先级,避免无效计算
class OpportunityDetector:
def _setup_listeners(self) -> None:
# 时机检测用高优先级,在其他处理之前完成
self._event_bus.on(EventType.TOOL_CALL, self._on_tool_call, priority=200)
class MoodDetector:
def _setup_listeners(self) -> None:
# 情绪检测用中等优先级
self._event_bus.on(EventType.USER_INPUT, self._on_user_input, priority=100)
class AuditLogger:
def _setup_listeners(self) -> None:
# 审计日志用低优先级,最后执行
self._event_bus.on(EventType.TOOL_CALL, self._on_tool_call, priority=-100)
5.3 最佳实践总结
Do’s(推荐做法):
- ✅ 使用缓存减少数据库查询(TTL 60 秒足够)
- ✅ 批量查询代替循环单查
- ✅ 为事件订阅设置合理的优先级
- ✅ 在日志中打印完整的评分明细,便于调试
- ✅ 使用
try-finally确保锁的释放
Don’ts(避免做法):
- ❌ 在评分方法中进行复杂的 I/O 操作
- ❌ 忽略并发请求的队列管理
- ❌ 所有事件订阅用相同优先级(会导致执行顺序不确定)
- ❌ 在 asyncio.Lock 内部再次 acquire(死锁风险)
- ❌ 硬编码阈值(应该使用配置文件)
黄金法则:
让评分计算尽量快(< 1ms),让执行逻辑尽量稳(锁 + 队列 + 重试)。
六、总结与展望
6.1 核心要点回顾
本文讲解了 WeClaw 主动陪伴系统的五大组件架构:
5 个核心组件:
- CooldownManager(厨师长):控制出菜节奏,防止骚扰
- MoodDetector(食评家):感知用户情绪,调整关怀内容
- OpportunityDetector(服务员):监听用户行为,捕捉关怀时机
- InteractionOrchestrator(传菜员):执行关怀流程,管理生命周期
- CompanionEngine(餐厅经理):统一调度,七层评分决策
1 个核心公式:
主动陪伴决策 =
七层评分(时段+紧急度+情绪+空闲度)
× 冷却控制(配额+惩罚+忽略)
× 事件驱动(工具调用+用户输入)
6.2 下一步学习方向
前置知识:
- ✅ Python asyncio 异步编程
- ✅ EventBus 发布-订阅模式
- ✅ SQLite 基本操作
后续主题:
- 📖 下一篇:《第 22 篇:防骚扰冷却机制深度解析 — 每日配额、拒绝惩罚与动态调整》
- 🔜 下下一篇:《第 23 篇:渐进式用户建档 — 如何在自然对话中收集用户信息》
扩展阅读:
6.3 互动环节
思考题:
- 如果用户连续 3 次忽略主动关怀,系统应该如何调整策略?(提示:动态配额调整)
- 当检测到用户情绪低落(negative)时,应该优先触发哪类关怀主题?为什么?
- 如何设计一个"紧急关怀"机制,让某些高优先级关怀(如健康警报)可以突破冷却限制?
讨论话题:
在你的 AI 助手项目中,你是如何实现"主动"功能的?是定时轮询还是事件驱动?遇到过哪些骚扰用户的问题?欢迎在评论区分享你的经验!
下期预告:《第 22 篇:防骚扰冷却机制深度解析》
- 🛡️ 每日配额的动态调整算法(正反馈 → 增加,负反馈 → 减少)
- ⏰ 拒绝惩罚期的设计思路(为什么是 4 小时?)
- 📊 连续忽略检测与用户活跃度分析
- 💾 状态持久化:跨进程的冷却状态同步
敬请期待!
附录 A:完整代码清单
| 文件路径 | 行数 | 作用 |
|---|---|---|
src/core/companion_engine.py |
1409 行 | 五大组件核心实现 |
src/core/companion_topics.py |
439 行 | CareTopic 数据类、注册表、推断规则 |
src/core/events.py |
226 行 | 事件类型定义(含 COMPANION_* 事件) |
总代码量:约 2074 行
关键方法:18 个(calculate_interaction_score、initiate_care、check* 等)
预定义主题:15 个(早安、心情、生日、健康等)
附录 B:参考资料
- Python asyncio 官方文档
- 事件驱动架构 | Martin Fowler
- SQLite 性能优化
- 上一篇:《第 20 篇:[主题待定]》
- 下一篇:《第 22 篇:防骚扰冷却机制深度解析》
版权声明:本文为 CSDN 博主「翁勇刚」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/yweng18/article/details/xxxxxx(待发布后更新)
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)