本文基于开源项目 Customer-Agent(Agno + PyQt6 + 拼多多商家 WebSocket),面向已经做过 RAG / 客服 Bot、准备接真实电商渠道的工程师。
重点不是 pip install,而是三条工程线里的设计巧思、参数怎么调、线上踩过什么坑
文中 Python / JSON 代码块均摘自仓库实现(略有删减注释),便于你对照 main 分支阅读。

标签Python PyQt6 RAG Agno 电商客服 系统设计 LanceDB
仓库:https://github.com/paidaxin-12138/customer-agent-agno


目录

  1. 背景与整体拆分
  2. 端到端:一条消息的生命周期
  3. 消息链与并发:责任链 + 双层限流
  4. 父子知识库:总部手册与分店补丁
  5. 商品同步、分块与「双 Manager」坑
  6. RAG 与 Tool:价库存绝不能只靠向量库
  7. 三层会话记忆:为什么 Agno 历史还不够
  8. 排队降级:进 LLM 之前的「认输」
  9. AI 回复兜底 v2:从二次 LLM 到 epoch + 150s
  10. 买家连发合并:避免只盯最后一个字
  11. 配置调参手册(含算例)
  12. 脱敏对话案例
  13. 压测结论与快速上手
  14. 结语

1. 背景与整体拆分

拼多多商家日常接待里,大量问题是重复的:价格、颜色、功率、物流、售后政策。纯人工成本高;纯关键词机器人又容易答非所问;纯大模型则爱编 SKU、爱编库存

Customer-Agent 的定位:

约束 我们的选择
数据与登录态 桌面本机 + Playwright 登商家后台,Cookie 本地保存
渠道真实性 WebSocket 收消息、mms.pinduoduo.com HTTP 发回复
价格/库存准确性 Tool 实时拉商品 API,RAG 只答「说明类」
高峰稳定性 排队降级(不进 LLM)+ Watchdog(进了 LLM 仍卡住则转人工)

工程上拆成三条可独立演进的主线:

消息编排  →  队列 + 责任链 + 同买家串行
知识检索  →  LanceDB + 父子库 inherit_key
稳定性    →  queue_degrade + ai_watchdog v2

下面按「生命周期 → 各模块设计 → 配置与案例」展开。


2. 端到端:一条消息的生命周期

理解兜底与降级,必须先看清时间轴上各阶段算不算进 Watchdog。下面是一条 TEXT 消息从进来到出站的典型阶段(与当前 main 分支代码一致)。

阶段 组件 是否启动 Watchdog 是否可能排队降级
A 拼多多 WebSocket 下行
B PDDChannel 解析为 ContextputMessageQueue
C MessageConsumer 从队列 get 取出
D 等待消费者 Semaphore(默认 28 槽)
E 等待同买家 Lockuser_key 维度)
F Handler 链:物流 → 图视频 → 关键词 → …
G 进入 AIReplyHandler.handle (在此判断)
H buyer_burst_merge 合并连发 已在 G 判断
I should_queue_degrade() 为真 → 发降级话术 已执行
J begin_watchdog_turn + schedule_watchdog T0
K async with ai_inflight() + 调 LLM 是(计时中)
L _send_reply 成功 → mark_delivered 停止 Watchdog

设计要点

  • 排队很久(C~E) 不算进 150s——那是容量问题,用降级在 G 点拦截,而不是让 Watchdog 误报。
  • T0 在 J,在抢 ai_inflight 之前——Semaphore / 同买家锁等待不计入 150s。
  • 若 G 点已降级,不会走到 J,也就没有 Watchdog 任务。

session_keyAIReplyHandler._get_session_key 生成,Watchdog、连发合并、运维埋点共用:

# Message/handlers/ai_handler.py
def _get_session_key(self, context: Context, metadata: Dict[str, Any]) -> Optional[str]:
    channel_name = str(metadata.get("channel_name") or "pinduoduo")
    shop_id = str(metadata.get("shop_id") or "")
    user_id = str(metadata.get("user_id") or "")
    buyer_uid = self._resolve_buyer_uid(context, metadata)
    if not all([shop_id, user_id, buyer_uid]):
        return None
    return f"{channel_name}:{shop_id}:{user_id}:{buyer_uid}"
# 例:pinduoduo:123456:789012:3456789012

3. 消息链与并发:责任链 + 双层限流

3.1 为什么不用 LangChain Chain

买家消息由 WebSocket 推送 驱动,不是一次 chain.run(user_input)。我们需要:

  • 按消息类型短路(改地址直接人工,不必过 AI);
  • 按渠道扩展(拼多多 Context、Cookie 刷新与 Agent 解耦);
  • asyncio 队列消费者 天然契合。

Message/handler_chain_factory.py 注册的顺序(handler_chain(use_ai=True, bot=...)):

# Handler 典型命中
1 OrderLogisticsHandler 改地址/收件人 → 人工;「物流到哪」→ 可选开放平台轨迹
2 ImageVideoHumanHandler 图片/视频 → 转人工(主链路不做实时识图)
3 KeywordDetectionHandler DB 关键词 / 「人工」类短语
4 AIReplyHandler 文本、商品咨询、订单卡片等
5 CatchAllHandler 兜底

每个 Handler:can_handlehandle → 返回 True 表示已处理,链截断

处理器链注册代码(Message/handler_chain_factory.py):

def handler_chain(use_ai=True, businessHours=None, bot=None):
    handlers = []
    ol_handler = _get_order_logistics_handler()
    if ol_handler is not None:
        handlers.append(ol_handler)
    iv_handler = _get_image_video_handler()
    if iv_handler is not None:
        handlers.append(iv_handler)
    keyword_handler = _get_keyword_handler()
    if keyword_handler is not None:
        handlers.append(keyword_handler)
    if use_ai:
        handlers.append(_create_ai_handler(bot))
    handlers.append(CatchAllHandler())
    return handlers

3.2 双层并发控制

第一层:店铺队列 + 消费者 Semaphore

消费者初始化与同买家锁(Message/core/consumer.py):

class MessageConsumer:
    def __init__(self, queue_name: str, max_concurrent: int = 28):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        # 同一买家多条消息串行处理,避免多条并行 LLM 导致回复乱序
        self._buyer_seq_locks: Dict[str, asyncio.Lock] = {}

    async def _process_message(self, wrapper: MessageWrapper):
        user_key = self._extract_user_id(wrapper.context)
        lock = self._buyer_seq_locks.setdefault(user_key, asyncio.Lock())
        async with self.semaphore:
            async with lock:
                metadata = wrapper.to_metadata()
                # ... 注入 shop_id / user_id / from_uid ...
                for handler in self.handlers:
                    if handler.can_handle(wrapper.context):
                        success = await handler.handle(wrapper.context, metadata)
                        if success:
                            break  # 责任链截断
  • config.jsonchat.message_consumer_max_concurrent(默认 28);
  • 入站侧 PDDChannel 另有 Semaphore(默认 50),防止 WebSocket 回调把事件循环打满。

第二层:同买家 asyncio.Lock

注释写得很直白:避免多条并行 LLM 导致回复乱序、旧问新答叠在一起

压测验证(程序内 sleep 80ms、不调 LLM):

场景 峰值并行 结论
30 条 / 不同买家 / max=10 ≈10 接近 Semaphore 上限
15 条 / 同一买家 ≈1 锁生效,串行

巧思:全局可以 28 路并行,但单个买家永远串行——既提高吞吐,又保证对话顺序像真人微信。

3.3 人工协助总线(与降级、Watchdog 联动)

core/human_assist_bus.py 用 PyQt 信号把 asyncio 线程里的「需要人」事件抛到主界面弹窗:

class HumanAssistBus(QObject):
    assist_requested = pyqtSignal(dict)
    buyer_conversation_ended = pyqtSignal(dict)

常见 reason

reason 触发方
queue_degrade _handle_queue_degrade
ai_timeout Watchdog _run_watchdog
ai_failed _escalate_immediate(空回复、发送失败等)

escalate_to_human 里统一调用:

from core.human_assist_bus import emit_human_assist
emit_human_assist(reason, context, metadata, question)
ok = await handler._send_reply(context, notice, metadata)
if ok and session_key and epoch > 0:
    mark_delivered(session_key, epoch)

买家侧仍会收到占位话术(可配置),避免已转人工却长时间沉默。


4. 父子知识库:总部手册与分店补丁

4.1 业务场景

场景 期望行为
全店统一退货政策 所有店检索都能看到
A 店改某款灯的话术 A 店只看到新版,不要父版+子版各一条
A 店同步了商品 123 检索商品说明以 A 店为准;价格仍应走 API
B 店没改 仍用父库通用说明

我们没有建两个物理向量库,而是在 LanceDB + knowledge_docs.json 元数据 上用字段表达「继承」。模块头注释即设计文档(agent_knowledge.py):

"""
知识库层级(逻辑):
- 父知识库:`platform_shop_id` 为空 → 全店通用;
- 子知识库:绑定某拼多多 `platform_shop_id`;
- 可选 `inherit_key`:父子填同一键时,子库可声明覆盖意图;
- 仅当父条显式 `allow_child_override=true` 时,检索才会隐藏父条。
"""
_CURRENT_PLATFORM_SHOP_ID: ContextVar[Optional[str]] = ContextVar(
    "current_platform_shop_id", default=None
)

对话前 CustomerAgent 调用 set_platform_shop_context(shop_id),检索只返回「父库 + 本店子库」。

4.2 字段语义

字段 父库 子库
platform_shop_id 拼多多店铺 ID
inherit_key 可选,如 policy:return 与父库相同才参与覆盖
allow_child_override 仅父条可设 true 子条上无意义(商品 sync 子条为 false
source manual / 导入 goods_sync

可见性(检索池 pool):

@staticmethod
def _doc_visible_for_shop(doc: Dict[str, Any], shop_id: Optional[str]) -> bool:
    raw = doc.get("platform_shop_id")
    if raw is None or str(raw).strip() == "":
        return True   # 父库:全店可见
    if shop_id is None or str(shop_id).strip() == "":
        return False  # 未绑店:不看任何子库(防串店)
    return str(raw).strip() == str(shop_id).strip()

父条是否允许被覆盖(默认否,兼容旧 JSON):

@staticmethod
def _parent_allows_child_override(doc: Dict[str, Any]) -> bool:
    v = doc.get("allow_child_override")
    if v is True:
        return True
    if v is False or v is None:
        return False
    return str(v).strip().lower() in ("1", "true", "yes", "on")

覆盖过滤(检索 topK 之后执行):

def _apply_parent_override_filter(self, ranked, pool, shop_id):
    override_keys = self._shop_override_inherit_keys(pool, shop_id)
    if not override_keys:
        return ranked
    out = []
    for score, d in ranked:
        ps = (d.get("platform_shop_id") or "").strip()
        ik = self._inherit_key(d)
        if not ps and ik and ik in override_keys and self._parent_allows_child_override(d):
            continue  # 丢弃父条,保留子库版本
        out.append((score, d))
    return out

4.3 巧思:默认 allow_child_override=false

踩坑回顾(真实线上逻辑演进)

早期若规则是「子库只要有 inherit_key 就隐藏父条」,会出现:

  • 子店为 goods:123 加了一条补充说明;
  • 父库 inherit_key=policy:safety 的通用安全须知也被误杀(键集合污染或误配);
  • 或父+子同键但未打算覆盖时,父条消失,检索变空。

现规则:只有父条显式 allow_child_override=true,且本店存在同 inherit_key 的子文档,检索才隐藏父条。旧数据默认 false,升级无感。

4.4 三个配置故事(帮助记忆)

故事 A:总部允许分店改退货话术

  • 父库:inherit_key=policy:returnallow_child_override=true,内容为通用 7 天无理由;
  • A 店子库:同 inherit_key,内容为「本店 15 天」;
  • A 店检索「能退吗」→ 只有 15 天,不会出现两条矛盾政策。

故事 B:分店同步商品,总部也有同 goods 父条

  • 父库:inherit_key=goods:111allow_child_override=true,总部统一详情;
  • A 店 goods_sync 子条:inherit_key=goods:111allow_child_override=false(子条字段);
  • A 店检索 → 子条参与 override_keys,父条被滤掉 → 只剩 A 店同步的正文

故事 C:子店加了 goods 键,但父条未允许覆盖

  • 父条 allow_child_override=false
  • 子店仍有 goods:111 子文档;
  • 检索可能父+子都在 topK——模型可能看到两份说明。
    运维建议:要么父条开 allow_child_override,要么父库不要用与 goods_sync 相同的 inherit_key

4.5 Agent 侧店铺上下文与 retriever

Agno 传空 knowledge={} 会导致检索恒为 None,因此用自定义 knowledge_retriever 接本地 search_knowledgeagent.py):

def _customer_agno_knowledge_retriever(km: "KnowledgeManager"):
    def _retriever(agent, query: str, num_documents=None, **kwargs):
        q = (query or "").strip()
        if not q:
            return None
        limit = 5
        if num_documents is not None:
            try:
                n = int(num_documents)
                if n > 0:
                    limit = n
            except (TypeError, ValueError):
                pass
        hits = km.search_knowledge(q, top_k=limit)
        # ... 转为 Agno Document 格式 ...
    return _retriever

UI 导入文档时可指定 platform_shop_id;不指定则进父库。


5. 商品同步、分块与「双 Manager」坑

5.1 同步流程(scripts/sync_goods_to_kb.py

  1. validate_pinduoduo_account:未在「用户管理」登录则直接失败(曾踩坑:API 失败却 UI 显示「成功 0 条」);
  2. delete_goods_sync_documents(shop_id):删掉本店旧 source=goods_sync,避免重复 chunk;
  3. 分页 get_product_list(字段 products,兼容 product_list);
  4. 每条 get_product_detail → 拼 Markdown → upsert_goods_sync_document

成功条件success=truesynced_count > 0;店铺零商品或 API 失败必须 success=false 并带 error

登录校验与失败返回(修复「0 条却显示成功」):

def validate_pinduoduo_account(shop_id: str, user_id: str) -> Optional[str]:
    acc = db_manager.get_account("pinduoduo", str(shop_id), str(user_id))
    if not acc:
        return (
            f"未找到已登录的拼多多账号(店铺 {shop_id} / 用户 {user_id})。"
            "请先在「用户管理」完成商家后台登录后再同步商品。"
        )
    return None

# sync_all_products 内
login_err = validate_pinduoduo_account(self.shop_id, self.user_id)
if login_err:
    return {"success": False, "error": login_err, "synced_count": 0, ...}

product_list = result.get("products") or result.get("product_list") or []

if total_synced == 0:
    return {
        "success": False,
        "error": "未同步任何商品(店铺可能暂无在售商品,或全部写入失败)",
        "synced_count": 0,
        ...
    }

5.2 写入字段(与检索的关系)

upsert_goods_sync_document 完整写入逻辑:

def upsert_goods_sync_document(self, *, platform_shop_id, goods_id, title, content) -> bool:
    doc_id = f"goods_sync_{sid}_{gid}"
    inherit_key = f"goods:{gid}"
    row = {
        "id": doc_id,
        "title": title,
        "content": content,
        "source": "goods_sync",
        "import_format": "markdown",
        "platform_shop_id": sid,
        "inherit_key": inherit_key,
        "allow_child_override": False,
    }
    if self._document_should_use_chunks(content):
        row["chunks"] = self._build_chunk_entries(content)
        row["embedding"] = row["chunks"][0].get("embedding") if row["chunks"] else ...
    else:
        row["embedding"] = self._embed_text(content)
    # 写入 documents[] + LanceDB ...

5.3 同步文档长什么样(节选)

# LIMEGIRL SUNone 美甲灯

**商品 ID**: 111127661
**拼单价**: ¥10.28

## SKU 规格(名称 / 价格 / 库存)

- **颜色: 白色 | 功率: 24W** | 价格: ¥10.28 | 库存: 120件 | SKU ID: 90001
- **颜色: 粉色 | 功率: 24W** | 价格: ¥10.28 | 库存: 0件 | SKU ID: 90002

## 商品详情

(详情正文;CLI 可加 --with-ocr 用 PaddleOCR 补图内文字,UI 默认不 OCR)

SKU 段落由 ProductManager 解析后写入(sync_goods_to_kb._build_product_document):

lines.extend(["", "## SKU 规格(名称 / 价格 / 库存)", ""])
for sku in detail.get("sku_list") or []:
    name = sku.get("sku_name") or "默认规格"
    sid = sku.get("sku_id")
    qty = sku.get("quantity")
    price_yuan = sku.get("price")
    row = f"- **{name}**"
    if price_yuan is not None:
        row += f" | 价格: ¥{price_yuan}"
    if qty is not None:
        row += f" | 库存: {qty}件"
    if sid is not None:
        row += f" | SKU ID: {sid}"
    lines.append(row)

5.4 长文档分块(与父子库正交)

类常量直接写在 NailLampKnowledgeManager 上(便于对照调参):

class NailLampKnowledgeManager:
    _SCORE_SNIPPET_CHARS = 12000      # 打分只看前 N 字,返回仍用全文
    _EMBED_QUERY_TEXT_MAX = 3800
    _CHUNK_LONG_DOC_THRESHOLD = 520   # 超过则分块 embedding
    _CHUNK_TARGET = 480
    _CHUNK_OVERLAP = 96
    _CHUNK_MIN_MERGE = 72
参数 作用
_CHUNK_LONG_DOC_THRESHOLD 超过则分块,避免整篇一个向量
_CHUNK_OVERLAP 块重叠,SKU 段不被截断在边界
_SCORE_SNIPPET_CHARS 超长导入时加速打分

踩坑:整篇文档一个向量 → 问「粉色有没有货」可能召回整篇灯介绍,分数却一般;分块后 SKU 段更容易排到前面。但库存数字仍可能过期 → 见下一节 Tool。

5.5 双 KnowledgeManager

路径 用途
UI / 同步脚本 NailLampKnowledgeManager 导入、商品 sync、界面列表
Agent 对话 LanceDBKnowledgeManageragent_knowledge_lancedb.py Agno knowledge_retriever

二者应指向同一 LanceDB 目录get_temp_path()/lancedb + knowledge_docs.json)。
踩坑:路径不一致 → 「知识库界面能搜到,AI 说不知道」。


6. RAG 与 Tool:价库存绝不能只靠向量库

6.1 分工表

买家问法 应走 原因
多少钱、有没有白色、库存 get_shop_products / get_product_skus 实时 Cookie API
怎么用、保修、话术、PDF LanceDB RAG 静态说明
「发我看看」 send_goods_link 平台商品卡片 send_mallGoodsCard,不是文本 URL

Agent 内 _KNOWLEDGE_GROUNDING 是写进 instructions 的硬约束(节选):

_KNOWLEDGE_GROUNDING: List[str] = [
    "当买家询问商品相关信息(价格、规格、库存、款式、颜色等)时,"
    "必须优先使用 get_shop_products 或 get_product_skus(goods_id) 查询;"
    "无需先同步知识库。禁止凭空猜测或编造商品信息。",
    "不要引导买家「再发图」「发照片」来辨认商品:本链路中 AI 无法查看聊天图片。",
    "禁止使用「转人工客服」「转人工」… 统一改为「我去问问产品经理」。",
    "【三层记忆】输入中含【长期摘要】【任务状态】【短期记忆】…",
]

6.2 CustomerAgent 装配(节选)

from Agent.CustomerAgent.agent_knowledge_lancedb import (
    LanceDBKnowledgeManager as KnowledgeManager,
    set_platform_shop_context,
)
from Agent.CustomerAgent.tools.get_product_list import get_shop_products, get_product_skus
from Agent.CustomerAgent.tools.send_goods_link import send_goods_link

self._agent = Agent(
    db=SqliteDb(db_file=db_path),
    knowledge=None,
    knowledge_retriever=_customer_agno_knowledge_retriever(self.knowledge_manager),
    model=OpenAILike(**model_kw),
    tools=[
        transfer_conversation,
        send_goods_link,       # send_mallGoodsCard 商品卡片
        get_shop_products,
        get_product_skus,
    ],
    search_knowledge=True,
    instructions=_KNOWLEDGE_GROUNDING + ...,
)

出站发消息在 AIReplyHandler._send_reply 里走拼多多 HTTP(与知识库无关):

from Channel.pinduoduo.utils.API.send_message import SendMessage
sender = SendMessage(shop_id, user_id)
result = await asyncio.to_thread(sender.send_text, from_uid, reply)
if isinstance(result, dict) and result.get("success"):
    await asyncio.to_thread(persist_ai_message, ...)
    return True

6.3 Cookie 是一切的根

能力 Cookie 失效后
WebSocket 收消息 可能断连
SendMessage.send_text 失败
ProductManager 列表/详情 失败
知识库 RAG 仍可能正常(误导:以为系统还活着)

运维第一反应应是「用户管理 → 重新 Playwright 登录」,而不是调 embedding。


7. 三层会话记忆:为什么 Agno 历史还不够

database/models.pyChatSession 扩展字段:

class ChatSession(Base):
    __tablename__ = "chat_sessions"
    ...
    ai_mode = Column(Boolean, default=True, comment="是否 AI 自动回复")
    task_state_json = Column(Text, nullable=True)
    long_term_summary = Column(Text, nullable=True)
    memory_summary_through_id = Column(Integer, default=0)

conversation_memory.py 中的任务状态结构:

@dataclass
class TaskState:
    intent: str = "general"
    slots: Dict[str, str] = field(default_factory=dict)
    pending_confirm: List[str] = field(default_factory=list)
    flow_node: str = "general"

@dataclass
class LongTermSummary:
    user_requests: List[str] = field(default_factory=list)
    confirmed: List[str] = field(default_factory=list)
    open_issues: List[str] = field(default_factory=list)
配置 作用
短期 chat.memory.short_term_rounds(6~12) 最近原文轮次
任务状态 task_state_json intent / slots / 待确认
长期摘要 long_term_summary 更早事实压缩

build_layered_prompt() 输出形如 【短期记忆】…【任务状态】…【长期摘要】… + 本轮买家话,再传给 arun
agent.py 另有 _NATURAL_STYLE_INSTRUCTIONS,压制「每条都自我介绍」的仿写倾向。

巧思:电商买家是burst 式连发 + 短句;只靠模型窗口里的最后一条 user 消息,极易答偏 → 需要 DB 拉取 + 连发合并(第 10 节)。


8. 排队降级:进 LLM 之前的「认输」

8.1 动机

大促时 28 路 AI 仍可能排队。若 estimated_wait > 2 分钟 才进 LLM,买家体验差于立刻一句安抚 + 可选弹人工窗

降级发生在 ai_inflight 之前,不占 LLM 槽位、不烧 token。

8.2 公式与实现(Message/ai_queue_load.py

核心类(建议对照仓库全文阅读):

class AIQueueLoadTracker:
    def __init__(self) -> None:
        self._lock = asyncio.Lock()
        self._active = 0
        self._window: Deque[float] = deque(maxlen=100)

    def effective_duration_sec(self) -> float:
        cap = _cfg_float("chat.queue_p95_cap_sec", 30.0, 5.0, 120.0)
        prior = _cfg_float("chat.queue_prior_duration_sec", 8.0, 1.0, 60.0)
        min_samples = _cfg_int("chat.queue_stats_min_samples", 10, 1, 100)
        recent_n = _cfg_int("chat.queue_stats_recent_size", 20, 5, 100)

        if len(self._window) < min_samples:
            return min(prior, cap)

        vals = list(self._window)
        raw_p95 = _percentile(sorted(vals), 95.0)
        recent = vals[-recent_n:] if len(vals) >= recent_n else vals
        recent_med = statistics.median(recent) if recent else prior
        return min(raw_p95, recent_med * 2.0, cap)

    def estimated_wait_sec(self) -> float:
        return (self._active + 1) * self.effective_duration_sec()

    def should_queue_degrade(self) -> bool:
        if not bool(get_config("chat.queue_degrade_enabled", True)):
            return False
        threshold = _cfg_float("chat.queue_degrade_threshold_sec", 120.0, 30.0, 600.0)
        return self.estimated_wait_sec() > threshold

    @asynccontextmanager
    async def ai_inflight(self):
        async with self._lock:
            self._active += 1
        try:
            yield
        finally:
            async with self._lock:
                self._active = max(0, self._active - 1)

成功回复后写入窗口(仅 send 成功才记):

# ai_handler.handle 内
if success:
    tracker.record_success_duration(time.perf_counter() - ai_t0)

为什么不用「消息在 Queue 里等了多久」?
队列前面可能是 Keyword 毫秒级返回,也可能是 AI 卡 90s;深度 × 未知类型 估不准。active × 单任务耗时 更接近「我现在再进 AI 要排多久」。

8.3 数值算例

假设:effective_duration_sec = 12s(P95 已稳定),active_ai_tasks = 9threshold = 120

estimated_wait = (9 + 1) × 12 = 120s  →  边界,不降级(需 >120)
active = 10 → estimated_wait = 132s  →  降级

冷启动:active=14effective=8(先验)→ (15)×8=120,再来一条就降级。
踩坑:刚启动、样本少时,用 8s 先验可能偏敏感;营业一段时间 P95 上升后反而更「宽容」。

8.4 降级时发生什么

async def _handle_queue_degrade(self, context, metadata, processed_content: str) -> bool:
    notice = self._degrade_notice()  # chat.queue_degrade_notice 或默认文案
    if bool(config.get("chat.queue_degrade_emit_assist", True)):
        emit_human_assist("queue_degrade", context, metadata, processed_content or notice)
    ok = await self._send_reply(context, notice, metadata)
    self.logger.info(
        "排队降级: estimated_wait={:.1f}s active={} effective={:.1f}s",
        get_ai_queue_tracker().estimated_wait_sec(),
        get_ai_queue_tracker().active_tasks,
        get_ai_queue_tracker().effective_duration_sec(),
    )
    return ok

handle 里,降级判断位于 burst 合并之后、Watchdog 之前

tracker = get_ai_queue_tracker()
if tracker.should_queue_degrade():
    return await self._handle_queue_degrade(context, metadata, processed_content)
# 以下才会 begin_watchdog_turn / ai_inflight

日志示例:

排队降级: estimated_wait=132.0s active=10 effective=12.0s

8.5 与 Watchdog 的分工

机制 触发点 是否调 LLM
排队降级 进 AI 前,负载过高
Watchdog 进 AI 后,150s 未 mark_delivered 已调过,不再调第二次
LLM 传输重试 ai_inflight 内,网络瞬断 同一次请求重试 1 次

9. AI 回复兜底 v2:从二次 LLM 到 epoch + 150s

9.1 v1 为什么被推翻

v1 做法 线上问题
30s 发提醒 与业务话术重复、打扰买家
90s 再调一次 AI 双倍 token;可能与第一次回复矛盾
从入队开始计时 Semaphore / 同买家锁等待算进超时 → 误转人工
转人工后未挡发送 买家收到「人工句 + AI 长文」叠罗汉

9.2 v2 三条铁律

  1. T0 = schedule_watchdog() 调用时刻(begin_watchdog_turn 之后、ai_inflight 之前)。
  2. 150sai_watchdog_escalate_sec)内未 mark_deliveredescalate_to_human不再第二次 LLM
  3. epoch:同 session_key 新消息 → epoch++,cancel 旧 asyncio.Task,防旧定时器误伤新回合。

9.3 核心代码路径(AIReplyHandler.handle 节选)

processed_content = self.preprocessor.process(context.content, context.type)
processed_content = build_merged_buyer_query_for_ai(...)  # 连发合并

session_key = self._get_session_key(context, metadata)
tracker = get_ai_queue_tracker()
if tracker.should_queue_degrade():
    return await self._handle_queue_degrade(context, metadata, processed_content)

epoch = await begin_watchdog_turn(session_key)
if session_key and epoch:
    schedule_watchdog(self, context, metadata, processed_content, session_key, epoch)

ai_t0 = time.perf_counter()
async with tracker.ai_inflight():
    reply = await self._get_ai_reply_with_sync_retry(processed_content, context)

if is_escalated(session_key, epoch):
    self.logger.info("会话已转人工,跳过发送 AI 正文")
    return True

if self._is_invalid_ai_content(reply):
    return await self._escalate_immediate(..., reason="ai_failed")

success = await self._send_reply(context, reply, metadata)
if success:
    tracker.record_success_duration(time.perf_counter() - ai_t0)
    mark_delivered(session_key, epoch)

无效回复判定(占位符 / 空串):

_FAILURE_PLACEHOLDER_MARKERS = (
    "抱歉,我现在无法回复",
    "AI客服初始化失败",
)

def _is_invalid_ai_content(content: Optional[str]) -> bool:
    if not content or not str(content).strip():
        return True
    return any(m in str(content).strip() for m in _FAILURE_PLACEHOLDER_MARKERS)

mark_delivered 语义:本轮已对买家有一次有效出站(含转人工占位句)。escalate_to_human 发送成功也会 mark_delivered,避免 Watchdog 二次触发。

9.4 Watchdog 内部(ai_reply_watchdog.py

模块头注释即 v2 契约:

"""
AI 未及时回复兜底(v2):自 T0 起仅等待 escalate_sec(默认 150s),未 mark_delivered 则转人工。
不发起第二次 AI 调用。
"""

epoch 递增并取消旧任务:

async def begin_watchdog_turn(session_key: Optional[str]) -> int:
    if not session_key or not _watchdog_enabled():
        return 0
    async with _lock:
        old = _tasks.pop(session_key, None)
        _epoch[session_key] = _epoch.get(session_key, 0) + 1
        e = _epoch[session_key]
    if old is not None and not old.done():
        old.cancel()
    return e

等待循环与超时转人工:

async def _sleep_until_delivered(deadline, session_key, epoch) -> bool:
    """返回 True 表示超时且仍未 delivered,应转人工。"""
    while time.monotonic() < deadline:
        if _is_delivered(session_key, epoch):
            return False
        if _epoch.get(session_key, 0) != epoch:
            return False  # 新回合已 begin,本 task 作废
        await asyncio.sleep(min(1.0, max(0.05, deadline - time.monotonic())))
    return _is_delivered(session_key, epoch) is False and _epoch.get(session_key, 0) == epoch

async def _run_watchdog(handler, context, metadata, processed_query, session_key, epoch):
    deadline = time.monotonic() + _escalate_after_sec()  # 默认 150,限制 30~3600
    if not await _sleep_until_delivered(deadline, session_key, epoch):
        return
    await escalate_to_human(..., reason="ai_timeout", question=processed_query)

is_escalated 供发送前拦截:

def is_escalated(session_key: Optional[str], epoch: int) -> bool:
    if not session_key or epoch <= 0:
        return False
    return _escalated_epoch.get(session_key, 0) >= epoch

单元测试(test/test_ai_reply_watchdog.py):

@pytest.mark.asyncio
async def test_watchdog_epoch_increments_and_mark_delivered():
    key = "pinduoduo:test_shop:test_seller:test_buyer"
    e1 = await w.begin_watchdog_turn(key)
    e2 = await w.begin_watchdog_turn(key)
    assert e2 == e1 + 1
    w.mark_delivered(key, e2)
    assert w._is_delivered(key, e2)

9.5 传输层重试(不是业务重试)

瞬时网络错误遍历异常链判断(ai_handler.pyagent.py 各有一份,避免 import 拉起 LanceDB):

def _is_transient_llm_transport_error(exc: BaseException) -> bool:
    while cur is not None:
        if isinstance(cur, (BrokenPipeError, ConnectionResetError, asyncio.TimeoutError)):
            return True
        if isinstance(cur, OSError) and cur.errno in (errno.EPIPE, errno.ECONNRESET, ...):
            return True
        if type(cur).__name__ in ("ReadTimeout", "ConnectError", "RemoteProtocolError", ...):
            return True
        cur = cur.__cause__ or cur.__context__
    return False

async def _get_ai_reply_with_sync_retry(self, query, context):
    max_tries = 2 if config.get("chat.llm_sync_retry_enabled", True) else 1
    delay = float(config.get("chat.llm_sync_retry_delay_sec", 1.5))
    for attempt in range(1, max_tries + 1):
        try:
            content = await self._call_bot_once(query, context)
            if not self._is_invalid_ai_content(content):
                return content
        except Exception as e:
            if attempt < max_tries and _is_transient_llm_transport_error(e):
                await asyncio.sleep(delay)
                continue
            return None

踩坑:不要把「模型答不好」做成自动重问——成本高且可能重复发;v2 把「业务失败」统一走 _escalate_immediate

9.6 完整踩坑清单

现象 可能原因 处理
高峰大量误转人工 T0 开在 Semaphore 前 对齐 J 点:scheduleai_inflight
慢模型从不转人工 T0 开在 LLM 返回后 同上
转人工后又发 AI 长文 未检查 is_escalated 发送前判断 epoch
LLM 成功但买家没收到 send_text 失败未 mark 150s 后转人工(符合预期)
买家连问两句,第一句被旧 Watchdog 杀 未 cancel 旧 task 检查 begin_watchdog_turn
降级与 Watchdog 重复烧 token 降级路径未进 ai_inflight 正常;若仍调 LLM 查配置

9.7 ai_mode 与人工模式

def _is_ai_mode_enabled(self, context, metadata) -> bool:
    sess = db_manager.get_chat_session_by_buyer(int(acc["id"]), str(buyer_uid), "active")
    if not sess:
        return True
    return bool(sess.get("ai_mode", True))

async def handle(self, context, metadata):
    if not self._is_ai_mode_enabled(context, metadata):
        await self._maybe_send_manual_mode_notice(context, metadata)
        await self.log_message(context, "AI跳过", "会话处于人工模式(ai_mode=False)")
        return True
    # ... 降级 / Watchdog / LLM ...

manual_mode_send_notice 默认 false;开启后同一 session_key 180s 内只发一次占位,避免人工接待时刷屏。


10. 买家连发合并:避免只盯最后一个字

10.1 问题

买家常见输入序列:

[10:01:01] 在吗
[10:01:03] 这个多少钱
[10:01:05] 白色的

若只把 白色的 送给模型,极易答「您想了解什么白色」——上下文丢失

10.2 算法(utils/buyer_burst_merge.py

核心合并函数:

def merge_trailing_buyer_burst(rows, *, gap_seconds=45.0, max_parts=40) -> str:
    """rows: 时间正序 旧→新。从最后一条买家消息向前链式合并。"""
    i = n - 1
    while i >= 0 and (rows[i].get("sender_type") or "") != "customer":
        i -= 1
    if i < 0:
        return ""

    chain_new_to_old: List[int] = [i]
    cur = i
    while cur > 0:
        prev = cur - 1
        if (rows[prev].get("sender_type") or "") != "customer":
            break
        t_newer, t_older = _parse_dt(rows[cur]), _parse_dt(rows[prev])
        if t_newer and t_older and (t_newer - t_older).total_seconds() > gap_seconds:
            break
        chain_new_to_old.append(prev)
        cur = prev
        if len(chain_new_to_old) >= max_parts:
            break

    chain_new_to_old.reverse()
    return "".join(str(rows[j].get("content") or "") for j in chain_new_to_old).strip()

build_merged_buyer_query_for_ai 从 SQLite 拉最近消息再合并;失败则回退当前条 processed_fallback

handle 里的调用:

gap = float(config.get("chat.buyer_burst_merge_gap_sec", 45) or 45)
max_parts = int(config.get("chat.buyer_burst_merge_max_parts", 40) or 40)
processed_content = build_merged_buyer_query_for_ai(
    processed_content, context, metadata, gap_seconds=gap, max_parts=max_parts,
)

单元测试(test/test_buyer_burst_merge.py):

def test_merge_single_char_burst():
    rows = [
        _row("ai", "hi", t0),
        _row("customer", "你", t0 + timedelta(seconds=1)),
        _row("customer", "好", t0 + timedelta(seconds=2)),
        _row("customer", "啊", t0 + timedelta(seconds=3)),
    ]
    assert merge_trailing_buyer_burst(rows, gap_seconds=45) == "你好啊"

def test_merge_stops_at_ai():
    # 中间夹客服回复 → 只保留最后一 burst「新」
    assert merge_trailing_buyer_burst(rows, gap_seconds=45) == "新"

合并先于 should_queue_degrade 与 Watchdog,保证送进 LLM 的是整段意图。

10.3 与记忆层的关系

机制 粒度
burst merge 尾部几十秒内连发拼一句
短期记忆 6~12 轮结构化历史
长期摘要 更早事实

三者叠加,才能扛住「半句 + 半句」的拼多多聊天习惯。


11. 配置调参手册(含算例)

config.jsonchat 段完整示例(config.json.example):

"chat": {
  "manual_mode_send_notice": false,
  "ai_watchdog_enabled": true,
  "ai_watchdog_escalate_sec": 150,
  "ai_watchdog_escalate_notice": "",
  "queue_degrade_enabled": true,
  "queue_degrade_threshold_sec": 120,
  "queue_degrade_notice": "感谢亲亲选择我们的产品,当前咨询较多请耐心等待;如需人工请直接回复「人工」。",
  "queue_degrade_emit_assist": true,
  "queue_p95_cap_sec": 30,
  "queue_stats_window_size": 100,
  "queue_stats_recent_size": 20,
  "queue_prior_duration_sec": 8,
  "queue_stats_min_samples": 10,
  "llm_sync_retry_enabled": true,
  "llm_sync_retry_delay_sec": 1.5,
  "message_consumer_max_concurrent": 28,
  "memory": {
    "enabled": true,
    "short_term_rounds": 10,
    "short_term_rounds_min": 6,
    "short_term_rounds_max": 12
  }
}

config.py 默认里还有 buyer_burst_merge_gap_sec: 45buyer_burst_merge_max_parts: 40(未写入 example 时以代码默认为准)。

默认 建议
message_consumer_max_concurrent 28 LLM 约 7 req/s 时,28 并行≈4s 平均排队;模型更快可略增
queue_degrade_threshold_sec 120 敏感服务可 90;宁可多降级也别空等 3 分钟
queue_prior_duration_sec 8 冷启动敏感可提到 10~12
queue_p95_cap_sec 30 防止一次 90s 卡顿长期抬高预估
queue_stats_min_samples 10 样本少时用先验
ai_watchdog_escalate_sec 150 模型 timeout 35s + 重试,留足 2~3 轮传输
ai_watchdog_escalate_notice 空则用内置 宜短,与降级话术区分
queue_degrade_emit_assist true 高峰建议开,运营能看到队列
llm_sync_retry_enabled true 仅网络瞬断
buyer_burst_merge_gap_sec 45 impatient 买家可 60
memory.short_term_rounds 10 token 紧可 6

调参步骤

  1. 营业高峰看日志 排队降级: estimated_wait=... 频率;
  2. 若几乎从不降级但买家抱怨慢 → 降 threshold 或略降 message_consumer_max_concurrent 逼更早降级(反直觉:降并发有时让 estimated 更准);
  3. 若误降级多 → 提高 queue_stats_min_samplesprior,等 P95 稳定;
  4. Watchdog 误报 → 先核对 T0,再加大 escalate_sec;漏报 → 减小 escalate_sec 并查 send_text 失败率。

12. 脱敏对话案例

案例 1:价库存走 Tool,说明走 RAG

买家: 48w 白色的还有吗 多少钱

处理链:
  Keyword 未命中
  burst merge(若连发已合并)
  未触发 queue_degrade
  Agent: get_product_skus(goods_id=…) 或 get_shop_products
  RAG 检索「48W 功率说明」类文档

AI(示意): 白色 48W 目前有货,拼单价 xx 元;需要可发商品卡片~

对应代码路径(简化):

# handler_chain: KeywordDetectionHandler.can_handle → False
# AIReplyHandler.handle → build_merged_buyer_query_for_ai → should_queue_degrade → False
# begin_watchdog_turn + schedule_watchdog
# bot.async_reply → Agno 调 get_product_skus / search_knowledge
# _send_reply → mark_delivered

案例 2:高峰降级

背景: active_ai_tasks=12, effective_duration_sec=11 → estimated_wait=143s > 120

买家: 在吗

结果:
  不发 LLM
  买家收到 queue_degrade_notice
  可选弹人工协助(queue_degrade)
  无 Watchdog(未进入 schedule_watchdog)

案例 3:LLM 卡住 → Watchdog

背景: 未降级,T0 已 schedule;LLM 或 send 卡住 >150s

结果:
  escalate_to_human(ai_timeout)
  买家收到 ai_watchdog_escalate_notice(默认「稍等下 这边上报一下呢亲亲」)
  mark_delivered → 即使之后 LLM 返回,is_escalated 也会挡掉 AI 正文

案例 4:改地址 → 不进 AI

买家: 帮我把收货地址改成上海市…

OrderLogisticsHandler 命中 → 转人工 + 原因写入 assist
AIReplyHandler 不再执行

13. 压测结论与快速上手

13.1 压测(辅助容量规划)

类型 条件 结果
程序内 Handler sleep 80ms,30 不同买家 峰值并行 ≈ Semaphore 上限
程序内 15 条同买家 并行 ≈ 1
直连 LLM API DashScope 兼容,阶梯 1~30 并发 成功率 100%(示例环境)
直连 LLM API 30 并发持续 30s 7.1 req/s,P50 延迟约 2.5~3.9s

结论:瓶颈在单条 LLM 延迟,不是 Python 调度。message_consumer_max_concurrentqueue_degrade_threshold_sec 应随模型 SLA 调整。

仓库内测试:

uv run python -m pytest test/test_ai_reply_watchdog.py -v
uv run python -m pytest test/test_buyer_burst_merge.py -v
uv run python -m pytest test/ -q

13.2 快速上手

git clone https://github.com/paidaxin-12138/customer-agent-agno.git
cd customer-agent-agno
uv sync
uv run playwright install chromium
cp config.json.example config.json
# 填写 llm、embedder、pinduoduo.shop_id / user_id
uv run python app.py

推荐顺序:用户管理登录设置检查模型与开放平台 → 知识库(可选导入/商品同步)→ AI 测试 → 开启自动接待。

13.3 故障排查

现象 优先检查
商品同步 0 条却显示成功 已修复:须 validate_pinduoduo_account + API success + synced_count>0
AI 不说话 llm.api_keymodel_namelogs/app.log
界面能搜、AI 不能 LanceDB 路径是否双 Manager 一致
物流查不到 pinduoduo_open.enabled 与 token
Cookie 过期 收消息/发消息/商品 API 全挂 → 重新登录

14. 结语

三条线合在一起,解决的是三类不同失败模式:

父子知识库  →  答得像「这家店」、不像背错总部的稿
排队降级    →  高峰别让用户干等 LLM 槽位
Watchdog v2 →  进了 LLM 仍失败时,让人接手,且别烧第二次 token

若你正在做垂直电商客服,建议至少亲手核对这三处是否与线上一致:allow_child_override 默认 falseT0 在 ai_inflight降级公式用 active×P95。欢迎 Star、Issue 与 PR。


说明:实现以仓库 main 为准;敏感配置勿提交 Git。转载请注明出处与项目链接。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐