拼多多 AI 客服桌面端:父子知识库、排队降级与兜底机制的设计与踩坑
本文基于开源项目 Customer-Agent(Agno + PyQt6 + 拼多多商家 WebSocket),面向已经做过 RAG / 客服 Bot、准备接真实电商渠道的工程师。
重点不是pip install,而是三条工程线里的设计巧思、参数怎么调、线上踩过什么坑。
文中 Python / JSON 代码块均摘自仓库实现(略有删减注释),便于你对照main分支阅读。
标签:Python PyQt6 RAG Agno 电商客服 系统设计 LanceDB
仓库:https://github.com/paidaxin-12138/customer-agent-agno
目录
- 背景与整体拆分
- 端到端:一条消息的生命周期
- 消息链与并发:责任链 + 双层限流
- 父子知识库:总部手册与分店补丁
- 商品同步、分块与「双 Manager」坑
- RAG 与 Tool:价库存绝不能只靠向量库
- 三层会话记忆:为什么 Agno 历史还不够
- 排队降级:进 LLM 之前的「认输」
- AI 回复兜底 v2:从二次 LLM 到 epoch + 150s
- 买家连发合并:避免只盯最后一个字
- 配置调参手册(含算例)
- 脱敏对话案例
- 压测结论与快速上手
- 结语
1. 背景与整体拆分
拼多多商家日常接待里,大量问题是重复的:价格、颜色、功率、物流、售后政策。纯人工成本高;纯关键词机器人又容易答非所问;纯大模型则爱编 SKU、爱编库存。
Customer-Agent 的定位:
| 约束 | 我们的选择 |
|---|---|
| 数据与登录态 | 桌面本机 + Playwright 登商家后台,Cookie 本地保存 |
| 渠道真实性 | WebSocket 收消息、mms.pinduoduo.com HTTP 发回复 |
| 价格/库存准确性 | Tool 实时拉商品 API,RAG 只答「说明类」 |
| 高峰稳定性 | 排队降级(不进 LLM)+ Watchdog(进了 LLM 仍卡住则转人工) |
工程上拆成三条可独立演进的主线:
消息编排 → 队列 + 责任链 + 同买家串行
知识检索 → LanceDB + 父子库 inherit_key
稳定性 → queue_degrade + ai_watchdog v2
下面按「生命周期 → 各模块设计 → 配置与案例」展开。
2. 端到端:一条消息的生命周期
理解兜底与降级,必须先看清时间轴上各阶段算不算进 Watchdog。下面是一条 TEXT 消息从进来到出站的典型阶段(与当前 main 分支代码一致)。
| 阶段 | 组件 | 是否启动 Watchdog | 是否可能排队降级 |
|---|---|---|---|
| A | 拼多多 WebSocket 下行 | 否 | 否 |
| B | PDDChannel 解析为 Context,put 进 MessageQueue |
否 | 否 |
| C | MessageConsumer 从队列 get 取出 |
否 | 否 |
| D | 等待消费者 Semaphore(默认 28 槽) | 否 | 否 |
| E | 等待同买家 Lock(user_key 维度) |
否 | 否 |
| F | Handler 链:物流 → 图视频 → 关键词 → … | 否 | 否 |
| G | 进入 AIReplyHandler.handle |
否 | 是(在此判断) |
| H | buyer_burst_merge 合并连发 |
否 | 已在 G 判断 |
| I | should_queue_degrade() 为真 → 发降级话术 |
否 | 已执行 |
| J | begin_watchdog_turn + schedule_watchdog |
T0 | 否 |
| K | async with ai_inflight() + 调 LLM |
是(计时中) | 否 |
| L | _send_reply 成功 → mark_delivered |
停止 Watchdog | 否 |
设计要点:
- 排队很久(C~E) 不算进 150s——那是容量问题,用降级在 G 点拦截,而不是让 Watchdog 误报。
- T0 在 J,在抢
ai_inflight之前——Semaphore / 同买家锁等待不计入 150s。 - 若 G 点已降级,不会走到 J,也就没有 Watchdog 任务。
session_key 由 AIReplyHandler._get_session_key 生成,Watchdog、连发合并、运维埋点共用:
# Message/handlers/ai_handler.py
def _get_session_key(self, context: Context, metadata: Dict[str, Any]) -> Optional[str]:
channel_name = str(metadata.get("channel_name") or "pinduoduo")
shop_id = str(metadata.get("shop_id") or "")
user_id = str(metadata.get("user_id") or "")
buyer_uid = self._resolve_buyer_uid(context, metadata)
if not all([shop_id, user_id, buyer_uid]):
return None
return f"{channel_name}:{shop_id}:{user_id}:{buyer_uid}"
# 例:pinduoduo:123456:789012:3456789012
3. 消息链与并发:责任链 + 双层限流
3.1 为什么不用 LangChain Chain
买家消息由 WebSocket 推送 驱动,不是一次 chain.run(user_input)。我们需要:
- 按消息类型短路(改地址直接人工,不必过 AI);
- 按渠道扩展(拼多多
Context、Cookie 刷新与 Agent 解耦); - 与 asyncio 队列消费者 天然契合。
Message/handler_chain_factory.py 注册的顺序(handler_chain(use_ai=True, bot=...)):
| # | Handler | 典型命中 |
|---|---|---|
| 1 | OrderLogisticsHandler |
改地址/收件人 → 人工;「物流到哪」→ 可选开放平台轨迹 |
| 2 | ImageVideoHumanHandler |
图片/视频 → 转人工(主链路不做实时识图) |
| 3 | KeywordDetectionHandler |
DB 关键词 / 「人工」类短语 |
| 4 | AIReplyHandler |
文本、商品咨询、订单卡片等 |
| 5 | CatchAllHandler |
兜底 |
每个 Handler:can_handle → handle → 返回 True 表示已处理,链截断。
处理器链注册代码(Message/handler_chain_factory.py):
def handler_chain(use_ai=True, businessHours=None, bot=None):
handlers = []
ol_handler = _get_order_logistics_handler()
if ol_handler is not None:
handlers.append(ol_handler)
iv_handler = _get_image_video_handler()
if iv_handler is not None:
handlers.append(iv_handler)
keyword_handler = _get_keyword_handler()
if keyword_handler is not None:
handlers.append(keyword_handler)
if use_ai:
handlers.append(_create_ai_handler(bot))
handlers.append(CatchAllHandler())
return handlers
3.2 双层并发控制
第一层:店铺队列 + 消费者 Semaphore
消费者初始化与同买家锁(Message/core/consumer.py):
class MessageConsumer:
def __init__(self, queue_name: str, max_concurrent: int = 28):
self.semaphore = asyncio.Semaphore(max_concurrent)
# 同一买家多条消息串行处理,避免多条并行 LLM 导致回复乱序
self._buyer_seq_locks: Dict[str, asyncio.Lock] = {}
async def _process_message(self, wrapper: MessageWrapper):
user_key = self._extract_user_id(wrapper.context)
lock = self._buyer_seq_locks.setdefault(user_key, asyncio.Lock())
async with self.semaphore:
async with lock:
metadata = wrapper.to_metadata()
# ... 注入 shop_id / user_id / from_uid ...
for handler in self.handlers:
if handler.can_handle(wrapper.context):
success = await handler.handle(wrapper.context, metadata)
if success:
break # 责任链截断
config.json→chat.message_consumer_max_concurrent(默认 28);- 入站侧
PDDChannel另有 Semaphore(默认 50),防止 WebSocket 回调把事件循环打满。
第二层:同买家 asyncio.Lock
注释写得很直白:避免多条并行 LLM 导致回复乱序、旧问新答叠在一起。
压测验证(程序内 sleep 80ms、不调 LLM):
| 场景 | 峰值并行 | 结论 |
|---|---|---|
| 30 条 / 不同买家 / max=10 | ≈10 | 接近 Semaphore 上限 |
| 15 条 / 同一买家 | ≈1 | 锁生效,串行 |
巧思:全局可以 28 路并行,但单个买家永远串行——既提高吞吐,又保证对话顺序像真人微信。
3.3 人工协助总线(与降级、Watchdog 联动)
core/human_assist_bus.py 用 PyQt 信号把 asyncio 线程里的「需要人」事件抛到主界面弹窗:
class HumanAssistBus(QObject):
assist_requested = pyqtSignal(dict)
buyer_conversation_ended = pyqtSignal(dict)
常见 reason:
| reason | 触发方 |
|---|---|
queue_degrade |
_handle_queue_degrade |
ai_timeout |
Watchdog _run_watchdog |
ai_failed |
_escalate_immediate(空回复、发送失败等) |
escalate_to_human 里统一调用:
from core.human_assist_bus import emit_human_assist
emit_human_assist(reason, context, metadata, question)
ok = await handler._send_reply(context, notice, metadata)
if ok and session_key and epoch > 0:
mark_delivered(session_key, epoch)
买家侧仍会收到占位话术(可配置),避免已转人工却长时间沉默。
4. 父子知识库:总部手册与分店补丁
4.1 业务场景
| 场景 | 期望行为 |
|---|---|
| 全店统一退货政策 | 所有店检索都能看到 |
| A 店改某款灯的话术 | A 店只看到新版,不要父版+子版各一条 |
| A 店同步了商品 123 | 检索商品说明以 A 店为准;价格仍应走 API |
| B 店没改 | 仍用父库通用说明 |
我们没有建两个物理向量库,而是在 LanceDB + knowledge_docs.json 元数据 上用字段表达「继承」。模块头注释即设计文档(agent_knowledge.py):
"""
知识库层级(逻辑):
- 父知识库:`platform_shop_id` 为空 → 全店通用;
- 子知识库:绑定某拼多多 `platform_shop_id`;
- 可选 `inherit_key`:父子填同一键时,子库可声明覆盖意图;
- 仅当父条显式 `allow_child_override=true` 时,检索才会隐藏父条。
"""
_CURRENT_PLATFORM_SHOP_ID: ContextVar[Optional[str]] = ContextVar(
"current_platform_shop_id", default=None
)
对话前 CustomerAgent 调用 set_platform_shop_context(shop_id),检索只返回「父库 + 本店子库」。
4.2 字段语义
| 字段 | 父库 | 子库 |
|---|---|---|
platform_shop_id |
空 | 拼多多店铺 ID |
inherit_key |
可选,如 policy:return |
与父库相同才参与覆盖 |
allow_child_override |
仅父条可设 true |
子条上无意义(商品 sync 子条为 false) |
source |
manual / 导入 |
goods_sync 等 |
可见性(检索池 pool):
@staticmethod
def _doc_visible_for_shop(doc: Dict[str, Any], shop_id: Optional[str]) -> bool:
raw = doc.get("platform_shop_id")
if raw is None or str(raw).strip() == "":
return True # 父库:全店可见
if shop_id is None or str(shop_id).strip() == "":
return False # 未绑店:不看任何子库(防串店)
return str(raw).strip() == str(shop_id).strip()
父条是否允许被覆盖(默认否,兼容旧 JSON):
@staticmethod
def _parent_allows_child_override(doc: Dict[str, Any]) -> bool:
v = doc.get("allow_child_override")
if v is True:
return True
if v is False or v is None:
return False
return str(v).strip().lower() in ("1", "true", "yes", "on")
覆盖过滤(检索 topK 之后执行):
def _apply_parent_override_filter(self, ranked, pool, shop_id):
override_keys = self._shop_override_inherit_keys(pool, shop_id)
if not override_keys:
return ranked
out = []
for score, d in ranked:
ps = (d.get("platform_shop_id") or "").strip()
ik = self._inherit_key(d)
if not ps and ik and ik in override_keys and self._parent_allows_child_override(d):
continue # 丢弃父条,保留子库版本
out.append((score, d))
return out
4.3 巧思:默认 allow_child_override=false
踩坑回顾(真实线上逻辑演进):
早期若规则是「子库只要有 inherit_key 就隐藏父条」,会出现:
- 子店为
goods:123加了一条补充说明; - 父库
inherit_key=policy:safety的通用安全须知也被误杀(键集合污染或误配); - 或父+子同键但未打算覆盖时,父条消失,检索变空。
现规则:只有父条显式 allow_child_override=true,且本店存在同 inherit_key 的子文档,检索才隐藏父条。旧数据默认 false,升级无感。
4.4 三个配置故事(帮助记忆)
故事 A:总部允许分店改退货话术
- 父库:
inherit_key=policy:return,allow_child_override=true,内容为通用 7 天无理由; - A 店子库:同
inherit_key,内容为「本店 15 天」; - A 店检索「能退吗」→ 只有 15 天,不会出现两条矛盾政策。
故事 B:分店同步商品,总部也有同 goods 父条
- 父库:
inherit_key=goods:111,allow_child_override=true,总部统一详情; - A 店
goods_sync子条:inherit_key=goods:111,allow_child_override=false(子条字段); - A 店检索 → 子条参与
override_keys,父条被滤掉 → 只剩 A 店同步的正文。
故事 C:子店加了 goods 键,但父条未允许覆盖
- 父条
allow_child_override=false; - 子店仍有
goods:111子文档; - 检索可能父+子都在 topK——模型可能看到两份说明。
运维建议:要么父条开allow_child_override,要么父库不要用与goods_sync相同的inherit_key。
4.5 Agent 侧店铺上下文与 retriever
Agno 传空 knowledge={} 会导致检索恒为 None,因此用自定义 knowledge_retriever 接本地 search_knowledge(agent.py):
def _customer_agno_knowledge_retriever(km: "KnowledgeManager"):
def _retriever(agent, query: str, num_documents=None, **kwargs):
q = (query or "").strip()
if not q:
return None
limit = 5
if num_documents is not None:
try:
n = int(num_documents)
if n > 0:
limit = n
except (TypeError, ValueError):
pass
hits = km.search_knowledge(q, top_k=limit)
# ... 转为 Agno Document 格式 ...
return _retriever
UI 导入文档时可指定 platform_shop_id;不指定则进父库。
5. 商品同步、分块与「双 Manager」坑
5.1 同步流程(scripts/sync_goods_to_kb.py)
validate_pinduoduo_account:未在「用户管理」登录则直接失败(曾踩坑:API 失败却 UI 显示「成功 0 条」);delete_goods_sync_documents(shop_id):删掉本店旧source=goods_sync,避免重复 chunk;- 分页
get_product_list(字段products,兼容product_list); - 每条
get_product_detail→ 拼 Markdown →upsert_goods_sync_document。
成功条件:success=true 且 synced_count > 0;店铺零商品或 API 失败必须 success=false 并带 error。
登录校验与失败返回(修复「0 条却显示成功」):
def validate_pinduoduo_account(shop_id: str, user_id: str) -> Optional[str]:
acc = db_manager.get_account("pinduoduo", str(shop_id), str(user_id))
if not acc:
return (
f"未找到已登录的拼多多账号(店铺 {shop_id} / 用户 {user_id})。"
"请先在「用户管理」完成商家后台登录后再同步商品。"
)
return None
# sync_all_products 内
login_err = validate_pinduoduo_account(self.shop_id, self.user_id)
if login_err:
return {"success": False, "error": login_err, "synced_count": 0, ...}
product_list = result.get("products") or result.get("product_list") or []
if total_synced == 0:
return {
"success": False,
"error": "未同步任何商品(店铺可能暂无在售商品,或全部写入失败)",
"synced_count": 0,
...
}
5.2 写入字段(与检索的关系)
upsert_goods_sync_document 完整写入逻辑:
def upsert_goods_sync_document(self, *, platform_shop_id, goods_id, title, content) -> bool:
doc_id = f"goods_sync_{sid}_{gid}"
inherit_key = f"goods:{gid}"
row = {
"id": doc_id,
"title": title,
"content": content,
"source": "goods_sync",
"import_format": "markdown",
"platform_shop_id": sid,
"inherit_key": inherit_key,
"allow_child_override": False,
}
if self._document_should_use_chunks(content):
row["chunks"] = self._build_chunk_entries(content)
row["embedding"] = row["chunks"][0].get("embedding") if row["chunks"] else ...
else:
row["embedding"] = self._embed_text(content)
# 写入 documents[] + LanceDB ...
5.3 同步文档长什么样(节选)
# LIMEGIRL SUNone 美甲灯
**商品 ID**: 111127661
**拼单价**: ¥10.28
## SKU 规格(名称 / 价格 / 库存)
- **颜色: 白色 | 功率: 24W** | 价格: ¥10.28 | 库存: 120件 | SKU ID: 90001
- **颜色: 粉色 | 功率: 24W** | 价格: ¥10.28 | 库存: 0件 | SKU ID: 90002
## 商品详情
(详情正文;CLI 可加 --with-ocr 用 PaddleOCR 补图内文字,UI 默认不 OCR)
SKU 段落由 ProductManager 解析后写入(sync_goods_to_kb._build_product_document):
lines.extend(["", "## SKU 规格(名称 / 价格 / 库存)", ""])
for sku in detail.get("sku_list") or []:
name = sku.get("sku_name") or "默认规格"
sid = sku.get("sku_id")
qty = sku.get("quantity")
price_yuan = sku.get("price")
row = f"- **{name}**"
if price_yuan is not None:
row += f" | 价格: ¥{price_yuan}"
if qty is not None:
row += f" | 库存: {qty}件"
if sid is not None:
row += f" | SKU ID: {sid}"
lines.append(row)
5.4 长文档分块(与父子库正交)
类常量直接写在 NailLampKnowledgeManager 上(便于对照调参):
class NailLampKnowledgeManager:
_SCORE_SNIPPET_CHARS = 12000 # 打分只看前 N 字,返回仍用全文
_EMBED_QUERY_TEXT_MAX = 3800
_CHUNK_LONG_DOC_THRESHOLD = 520 # 超过则分块 embedding
_CHUNK_TARGET = 480
_CHUNK_OVERLAP = 96
_CHUNK_MIN_MERGE = 72
| 参数 | 作用 |
|---|---|
_CHUNK_LONG_DOC_THRESHOLD |
超过则分块,避免整篇一个向量 |
_CHUNK_OVERLAP |
块重叠,SKU 段不被截断在边界 |
_SCORE_SNIPPET_CHARS |
超长导入时加速打分 |
踩坑:整篇文档一个向量 → 问「粉色有没有货」可能召回整篇灯介绍,分数却一般;分块后 SKU 段更容易排到前面。但库存数字仍可能过期 → 见下一节 Tool。
5.5 双 KnowledgeManager
| 路径 | 类 | 用途 |
|---|---|---|
| UI / 同步脚本 | NailLampKnowledgeManager |
导入、商品 sync、界面列表 |
| Agent 对话 | LanceDBKnowledgeManager(agent_knowledge_lancedb.py) |
Agno knowledge_retriever |
二者应指向同一 LanceDB 目录(get_temp_path()/lancedb + knowledge_docs.json)。
踩坑:路径不一致 → 「知识库界面能搜到,AI 说不知道」。
6. RAG 与 Tool:价库存绝不能只靠向量库
6.1 分工表
| 买家问法 | 应走 | 原因 |
|---|---|---|
| 多少钱、有没有白色、库存 | get_shop_products / get_product_skus |
实时 Cookie API |
| 怎么用、保修、话术、PDF | LanceDB RAG | 静态说明 |
| 「发我看看」 | send_goods_link |
发平台商品卡片 send_mallGoodsCard,不是文本 URL |
Agent 内 _KNOWLEDGE_GROUNDING 是写进 instructions 的硬约束(节选):
_KNOWLEDGE_GROUNDING: List[str] = [
"当买家询问商品相关信息(价格、规格、库存、款式、颜色等)时,"
"必须优先使用 get_shop_products 或 get_product_skus(goods_id) 查询;"
"无需先同步知识库。禁止凭空猜测或编造商品信息。",
"不要引导买家「再发图」「发照片」来辨认商品:本链路中 AI 无法查看聊天图片。",
"禁止使用「转人工客服」「转人工」… 统一改为「我去问问产品经理」。",
"【三层记忆】输入中含【长期摘要】【任务状态】【短期记忆】…",
]
6.2 CustomerAgent 装配(节选)
from Agent.CustomerAgent.agent_knowledge_lancedb import (
LanceDBKnowledgeManager as KnowledgeManager,
set_platform_shop_context,
)
from Agent.CustomerAgent.tools.get_product_list import get_shop_products, get_product_skus
from Agent.CustomerAgent.tools.send_goods_link import send_goods_link
self._agent = Agent(
db=SqliteDb(db_file=db_path),
knowledge=None,
knowledge_retriever=_customer_agno_knowledge_retriever(self.knowledge_manager),
model=OpenAILike(**model_kw),
tools=[
transfer_conversation,
send_goods_link, # send_mallGoodsCard 商品卡片
get_shop_products,
get_product_skus,
],
search_knowledge=True,
instructions=_KNOWLEDGE_GROUNDING + ...,
)
出站发消息在 AIReplyHandler._send_reply 里走拼多多 HTTP(与知识库无关):
from Channel.pinduoduo.utils.API.send_message import SendMessage
sender = SendMessage(shop_id, user_id)
result = await asyncio.to_thread(sender.send_text, from_uid, reply)
if isinstance(result, dict) and result.get("success"):
await asyncio.to_thread(persist_ai_message, ...)
return True
6.3 Cookie 是一切的根
| 能力 | Cookie 失效后 |
|---|---|
| WebSocket 收消息 | 可能断连 |
SendMessage.send_text |
失败 |
ProductManager 列表/详情 |
失败 |
| 知识库 RAG | 仍可能正常(误导:以为系统还活着) |
运维第一反应应是「用户管理 → 重新 Playwright 登录」,而不是调 embedding。
7. 三层会话记忆:为什么 Agno 历史还不够
database/models.py 中 ChatSession 扩展字段:
class ChatSession(Base):
__tablename__ = "chat_sessions"
...
ai_mode = Column(Boolean, default=True, comment="是否 AI 自动回复")
task_state_json = Column(Text, nullable=True)
long_term_summary = Column(Text, nullable=True)
memory_summary_through_id = Column(Integer, default=0)
conversation_memory.py 中的任务状态结构:
@dataclass
class TaskState:
intent: str = "general"
slots: Dict[str, str] = field(default_factory=dict)
pending_confirm: List[str] = field(default_factory=list)
flow_node: str = "general"
@dataclass
class LongTermSummary:
user_requests: List[str] = field(default_factory=list)
confirmed: List[str] = field(default_factory=list)
open_issues: List[str] = field(default_factory=list)
| 层 | 配置 | 作用 |
|---|---|---|
| 短期 | chat.memory.short_term_rounds(6~12) |
最近原文轮次 |
| 任务状态 | task_state_json |
intent / slots / 待确认 |
| 长期摘要 | long_term_summary |
更早事实压缩 |
build_layered_prompt() 输出形如 【短期记忆】…【任务状态】…【长期摘要】… + 本轮买家话,再传给 arun。agent.py 另有 _NATURAL_STYLE_INSTRUCTIONS,压制「每条都自我介绍」的仿写倾向。
巧思:电商买家是burst 式连发 + 短句;只靠模型窗口里的最后一条 user 消息,极易答偏 → 需要 DB 拉取 + 连发合并(第 10 节)。
8. 排队降级:进 LLM 之前的「认输」
8.1 动机
大促时 28 路 AI 仍可能排队。若 estimated_wait > 2 分钟 才进 LLM,买家体验差于立刻一句安抚 + 可选弹人工窗。
降级发生在 ai_inflight 之前,不占 LLM 槽位、不烧 token。
8.2 公式与实现(Message/ai_queue_load.py)
核心类(建议对照仓库全文阅读):
class AIQueueLoadTracker:
def __init__(self) -> None:
self._lock = asyncio.Lock()
self._active = 0
self._window: Deque[float] = deque(maxlen=100)
def effective_duration_sec(self) -> float:
cap = _cfg_float("chat.queue_p95_cap_sec", 30.0, 5.0, 120.0)
prior = _cfg_float("chat.queue_prior_duration_sec", 8.0, 1.0, 60.0)
min_samples = _cfg_int("chat.queue_stats_min_samples", 10, 1, 100)
recent_n = _cfg_int("chat.queue_stats_recent_size", 20, 5, 100)
if len(self._window) < min_samples:
return min(prior, cap)
vals = list(self._window)
raw_p95 = _percentile(sorted(vals), 95.0)
recent = vals[-recent_n:] if len(vals) >= recent_n else vals
recent_med = statistics.median(recent) if recent else prior
return min(raw_p95, recent_med * 2.0, cap)
def estimated_wait_sec(self) -> float:
return (self._active + 1) * self.effective_duration_sec()
def should_queue_degrade(self) -> bool:
if not bool(get_config("chat.queue_degrade_enabled", True)):
return False
threshold = _cfg_float("chat.queue_degrade_threshold_sec", 120.0, 30.0, 600.0)
return self.estimated_wait_sec() > threshold
@asynccontextmanager
async def ai_inflight(self):
async with self._lock:
self._active += 1
try:
yield
finally:
async with self._lock:
self._active = max(0, self._active - 1)
成功回复后写入窗口(仅 send 成功才记):
# ai_handler.handle 内
if success:
tracker.record_success_duration(time.perf_counter() - ai_t0)
为什么不用「消息在 Queue 里等了多久」?
队列前面可能是 Keyword 毫秒级返回,也可能是 AI 卡 90s;深度 × 未知类型 估不准。active × 单任务耗时 更接近「我现在再进 AI 要排多久」。
8.3 数值算例
假设:effective_duration_sec = 12s(P95 已稳定),active_ai_tasks = 9,threshold = 120。
estimated_wait = (9 + 1) × 12 = 120s → 边界,不降级(需 >120)
active = 10 → estimated_wait = 132s → 降级
冷启动:active=14,effective=8(先验)→ (15)×8=120,再来一条就降级。
踩坑:刚启动、样本少时,用 8s 先验可能偏敏感;营业一段时间 P95 上升后反而更「宽容」。
8.4 降级时发生什么
async def _handle_queue_degrade(self, context, metadata, processed_content: str) -> bool:
notice = self._degrade_notice() # chat.queue_degrade_notice 或默认文案
if bool(config.get("chat.queue_degrade_emit_assist", True)):
emit_human_assist("queue_degrade", context, metadata, processed_content or notice)
ok = await self._send_reply(context, notice, metadata)
self.logger.info(
"排队降级: estimated_wait={:.1f}s active={} effective={:.1f}s",
get_ai_queue_tracker().estimated_wait_sec(),
get_ai_queue_tracker().active_tasks,
get_ai_queue_tracker().effective_duration_sec(),
)
return ok
在 handle 里,降级判断位于 burst 合并之后、Watchdog 之前:
tracker = get_ai_queue_tracker()
if tracker.should_queue_degrade():
return await self._handle_queue_degrade(context, metadata, processed_content)
# 以下才会 begin_watchdog_turn / ai_inflight
日志示例:
排队降级: estimated_wait=132.0s active=10 effective=12.0s
8.5 与 Watchdog 的分工
| 机制 | 触发点 | 是否调 LLM |
|---|---|---|
| 排队降级 | 进 AI 前,负载过高 | 否 |
| Watchdog | 进 AI 后,150s 未 mark_delivered |
已调过,不再调第二次 |
| LLM 传输重试 | ai_inflight 内,网络瞬断 |
同一次请求重试 1 次 |
9. AI 回复兜底 v2:从二次 LLM 到 epoch + 150s
9.1 v1 为什么被推翻
| v1 做法 | 线上问题 |
|---|---|
| 30s 发提醒 | 与业务话术重复、打扰买家 |
| 90s 再调一次 AI | 双倍 token;可能与第一次回复矛盾 |
| 从入队开始计时 | Semaphore / 同买家锁等待算进超时 → 误转人工 |
| 转人工后未挡发送 | 买家收到「人工句 + AI 长文」叠罗汉 |
9.2 v2 三条铁律
- T0 =
schedule_watchdog()调用时刻(begin_watchdog_turn之后、ai_inflight之前)。 - 150s(
ai_watchdog_escalate_sec)内未mark_delivered→escalate_to_human,不再第二次 LLM。 epoch:同session_key新消息 →epoch++,cancel 旧asyncio.Task,防旧定时器误伤新回合。
9.3 核心代码路径(AIReplyHandler.handle 节选)
processed_content = self.preprocessor.process(context.content, context.type)
processed_content = build_merged_buyer_query_for_ai(...) # 连发合并
session_key = self._get_session_key(context, metadata)
tracker = get_ai_queue_tracker()
if tracker.should_queue_degrade():
return await self._handle_queue_degrade(context, metadata, processed_content)
epoch = await begin_watchdog_turn(session_key)
if session_key and epoch:
schedule_watchdog(self, context, metadata, processed_content, session_key, epoch)
ai_t0 = time.perf_counter()
async with tracker.ai_inflight():
reply = await self._get_ai_reply_with_sync_retry(processed_content, context)
if is_escalated(session_key, epoch):
self.logger.info("会话已转人工,跳过发送 AI 正文")
return True
if self._is_invalid_ai_content(reply):
return await self._escalate_immediate(..., reason="ai_failed")
success = await self._send_reply(context, reply, metadata)
if success:
tracker.record_success_duration(time.perf_counter() - ai_t0)
mark_delivered(session_key, epoch)
无效回复判定(占位符 / 空串):
_FAILURE_PLACEHOLDER_MARKERS = (
"抱歉,我现在无法回复",
"AI客服初始化失败",
)
def _is_invalid_ai_content(content: Optional[str]) -> bool:
if not content or not str(content).strip():
return True
return any(m in str(content).strip() for m in _FAILURE_PLACEHOLDER_MARKERS)
mark_delivered 语义:本轮已对买家有一次有效出站(含转人工占位句)。escalate_to_human 发送成功也会 mark_delivered,避免 Watchdog 二次触发。
9.4 Watchdog 内部(ai_reply_watchdog.py)
模块头注释即 v2 契约:
"""
AI 未及时回复兜底(v2):自 T0 起仅等待 escalate_sec(默认 150s),未 mark_delivered 则转人工。
不发起第二次 AI 调用。
"""
epoch 递增并取消旧任务:
async def begin_watchdog_turn(session_key: Optional[str]) -> int:
if not session_key or not _watchdog_enabled():
return 0
async with _lock:
old = _tasks.pop(session_key, None)
_epoch[session_key] = _epoch.get(session_key, 0) + 1
e = _epoch[session_key]
if old is not None and not old.done():
old.cancel()
return e
等待循环与超时转人工:
async def _sleep_until_delivered(deadline, session_key, epoch) -> bool:
"""返回 True 表示超时且仍未 delivered,应转人工。"""
while time.monotonic() < deadline:
if _is_delivered(session_key, epoch):
return False
if _epoch.get(session_key, 0) != epoch:
return False # 新回合已 begin,本 task 作废
await asyncio.sleep(min(1.0, max(0.05, deadline - time.monotonic())))
return _is_delivered(session_key, epoch) is False and _epoch.get(session_key, 0) == epoch
async def _run_watchdog(handler, context, metadata, processed_query, session_key, epoch):
deadline = time.monotonic() + _escalate_after_sec() # 默认 150,限制 30~3600
if not await _sleep_until_delivered(deadline, session_key, epoch):
return
await escalate_to_human(..., reason="ai_timeout", question=processed_query)
is_escalated 供发送前拦截:
def is_escalated(session_key: Optional[str], epoch: int) -> bool:
if not session_key or epoch <= 0:
return False
return _escalated_epoch.get(session_key, 0) >= epoch
单元测试(test/test_ai_reply_watchdog.py):
@pytest.mark.asyncio
async def test_watchdog_epoch_increments_and_mark_delivered():
key = "pinduoduo:test_shop:test_seller:test_buyer"
e1 = await w.begin_watchdog_turn(key)
e2 = await w.begin_watchdog_turn(key)
assert e2 == e1 + 1
w.mark_delivered(key, e2)
assert w._is_delivered(key, e2)
9.5 传输层重试(不是业务重试)
瞬时网络错误遍历异常链判断(ai_handler.py 与 agent.py 各有一份,避免 import 拉起 LanceDB):
def _is_transient_llm_transport_error(exc: BaseException) -> bool:
while cur is not None:
if isinstance(cur, (BrokenPipeError, ConnectionResetError, asyncio.TimeoutError)):
return True
if isinstance(cur, OSError) and cur.errno in (errno.EPIPE, errno.ECONNRESET, ...):
return True
if type(cur).__name__ in ("ReadTimeout", "ConnectError", "RemoteProtocolError", ...):
return True
cur = cur.__cause__ or cur.__context__
return False
async def _get_ai_reply_with_sync_retry(self, query, context):
max_tries = 2 if config.get("chat.llm_sync_retry_enabled", True) else 1
delay = float(config.get("chat.llm_sync_retry_delay_sec", 1.5))
for attempt in range(1, max_tries + 1):
try:
content = await self._call_bot_once(query, context)
if not self._is_invalid_ai_content(content):
return content
except Exception as e:
if attempt < max_tries and _is_transient_llm_transport_error(e):
await asyncio.sleep(delay)
continue
return None
踩坑:不要把「模型答不好」做成自动重问——成本高且可能重复发;v2 把「业务失败」统一走 _escalate_immediate。
9.6 完整踩坑清单
| 现象 | 可能原因 | 处理 |
|---|---|---|
| 高峰大量误转人工 | T0 开在 Semaphore 前 | 对齐 J 点:schedule 在 ai_inflight 前 |
| 慢模型从不转人工 | T0 开在 LLM 返回后 | 同上 |
| 转人工后又发 AI 长文 | 未检查 is_escalated |
发送前判断 epoch |
| LLM 成功但买家没收到 | send_text 失败未 mark |
150s 后转人工(符合预期) |
| 买家连问两句,第一句被旧 Watchdog 杀 | 未 cancel 旧 task | 检查 begin_watchdog_turn |
| 降级与 Watchdog 重复烧 token | 降级路径未进 ai_inflight |
正常;若仍调 LLM 查配置 |
9.7 ai_mode 与人工模式
def _is_ai_mode_enabled(self, context, metadata) -> bool:
sess = db_manager.get_chat_session_by_buyer(int(acc["id"]), str(buyer_uid), "active")
if not sess:
return True
return bool(sess.get("ai_mode", True))
async def handle(self, context, metadata):
if not self._is_ai_mode_enabled(context, metadata):
await self._maybe_send_manual_mode_notice(context, metadata)
await self.log_message(context, "AI跳过", "会话处于人工模式(ai_mode=False)")
return True
# ... 降级 / Watchdog / LLM ...
manual_mode_send_notice 默认 false;开启后同一 session_key 180s 内只发一次占位,避免人工接待时刷屏。
10. 买家连发合并:避免只盯最后一个字
10.1 问题
买家常见输入序列:
[10:01:01] 在吗
[10:01:03] 这个多少钱
[10:01:05] 白色的
若只把 白色的 送给模型,极易答「您想了解什么白色」——上下文丢失。
10.2 算法(utils/buyer_burst_merge.py)
核心合并函数:
def merge_trailing_buyer_burst(rows, *, gap_seconds=45.0, max_parts=40) -> str:
"""rows: 时间正序 旧→新。从最后一条买家消息向前链式合并。"""
i = n - 1
while i >= 0 and (rows[i].get("sender_type") or "") != "customer":
i -= 1
if i < 0:
return ""
chain_new_to_old: List[int] = [i]
cur = i
while cur > 0:
prev = cur - 1
if (rows[prev].get("sender_type") or "") != "customer":
break
t_newer, t_older = _parse_dt(rows[cur]), _parse_dt(rows[prev])
if t_newer and t_older and (t_newer - t_older).total_seconds() > gap_seconds:
break
chain_new_to_old.append(prev)
cur = prev
if len(chain_new_to_old) >= max_parts:
break
chain_new_to_old.reverse()
return "".join(str(rows[j].get("content") or "") for j in chain_new_to_old).strip()
build_merged_buyer_query_for_ai 从 SQLite 拉最近消息再合并;失败则回退当前条 processed_fallback。
在 handle 里的调用:
gap = float(config.get("chat.buyer_burst_merge_gap_sec", 45) or 45)
max_parts = int(config.get("chat.buyer_burst_merge_max_parts", 40) or 40)
processed_content = build_merged_buyer_query_for_ai(
processed_content, context, metadata, gap_seconds=gap, max_parts=max_parts,
)
单元测试(test/test_buyer_burst_merge.py):
def test_merge_single_char_burst():
rows = [
_row("ai", "hi", t0),
_row("customer", "你", t0 + timedelta(seconds=1)),
_row("customer", "好", t0 + timedelta(seconds=2)),
_row("customer", "啊", t0 + timedelta(seconds=3)),
]
assert merge_trailing_buyer_burst(rows, gap_seconds=45) == "你好啊"
def test_merge_stops_at_ai():
# 中间夹客服回复 → 只保留最后一 burst「新」
assert merge_trailing_buyer_burst(rows, gap_seconds=45) == "新"
合并先于 should_queue_degrade 与 Watchdog,保证送进 LLM 的是整段意图。
10.3 与记忆层的关系
| 机制 | 粒度 |
|---|---|
| burst merge | 尾部几十秒内连发拼一句 |
| 短期记忆 | 6~12 轮结构化历史 |
| 长期摘要 | 更早事实 |
三者叠加,才能扛住「半句 + 半句」的拼多多聊天习惯。
11. 配置调参手册(含算例)
config.json → chat 段完整示例(config.json.example):
"chat": {
"manual_mode_send_notice": false,
"ai_watchdog_enabled": true,
"ai_watchdog_escalate_sec": 150,
"ai_watchdog_escalate_notice": "",
"queue_degrade_enabled": true,
"queue_degrade_threshold_sec": 120,
"queue_degrade_notice": "感谢亲亲选择我们的产品,当前咨询较多请耐心等待;如需人工请直接回复「人工」。",
"queue_degrade_emit_assist": true,
"queue_p95_cap_sec": 30,
"queue_stats_window_size": 100,
"queue_stats_recent_size": 20,
"queue_prior_duration_sec": 8,
"queue_stats_min_samples": 10,
"llm_sync_retry_enabled": true,
"llm_sync_retry_delay_sec": 1.5,
"message_consumer_max_concurrent": 28,
"memory": {
"enabled": true,
"short_term_rounds": 10,
"short_term_rounds_min": 6,
"short_term_rounds_max": 12
}
}
config.py 默认里还有 buyer_burst_merge_gap_sec: 45、buyer_burst_merge_max_parts: 40(未写入 example 时以代码默认为准)。
| 键 | 默认 | 建议 |
|---|---|---|
message_consumer_max_concurrent |
28 | LLM 约 7 req/s 时,28 并行≈4s 平均排队;模型更快可略增 |
queue_degrade_threshold_sec |
120 | 敏感服务可 90;宁可多降级也别空等 3 分钟 |
queue_prior_duration_sec |
8 | 冷启动敏感可提到 10~12 |
queue_p95_cap_sec |
30 | 防止一次 90s 卡顿长期抬高预估 |
queue_stats_min_samples |
10 | 样本少时用先验 |
ai_watchdog_escalate_sec |
150 | 模型 timeout 35s + 重试,留足 2~3 轮传输 |
ai_watchdog_escalate_notice |
空则用内置 | 宜短,与降级话术区分 |
queue_degrade_emit_assist |
true | 高峰建议开,运营能看到队列 |
llm_sync_retry_enabled |
true | 仅网络瞬断 |
buyer_burst_merge_gap_sec |
45 | impatient 买家可 60 |
memory.short_term_rounds |
10 | token 紧可 6 |
调参步骤:
- 营业高峰看日志
排队降级: estimated_wait=...频率; - 若几乎从不降级但买家抱怨慢 → 降
threshold或略降message_consumer_max_concurrent逼更早降级(反直觉:降并发有时让 estimated 更准); - 若误降级多 → 提高
queue_stats_min_samples或prior,等 P95 稳定; - Watchdog 误报 → 先核对 T0,再加大
escalate_sec;漏报 → 减小escalate_sec并查send_text失败率。
12. 脱敏对话案例
案例 1:价库存走 Tool,说明走 RAG
买家: 48w 白色的还有吗 多少钱
处理链:
Keyword 未命中
burst merge(若连发已合并)
未触发 queue_degrade
Agent: get_product_skus(goods_id=…) 或 get_shop_products
RAG 检索「48W 功率说明」类文档
AI(示意): 白色 48W 目前有货,拼单价 xx 元;需要可发商品卡片~
对应代码路径(简化):
# handler_chain: KeywordDetectionHandler.can_handle → False
# AIReplyHandler.handle → build_merged_buyer_query_for_ai → should_queue_degrade → False
# begin_watchdog_turn + schedule_watchdog
# bot.async_reply → Agno 调 get_product_skus / search_knowledge
# _send_reply → mark_delivered
案例 2:高峰降级
背景: active_ai_tasks=12, effective_duration_sec=11 → estimated_wait=143s > 120
买家: 在吗
结果:
不发 LLM
买家收到 queue_degrade_notice
可选弹人工协助(queue_degrade)
无 Watchdog(未进入 schedule_watchdog)
案例 3:LLM 卡住 → Watchdog
背景: 未降级,T0 已 schedule;LLM 或 send 卡住 >150s
结果:
escalate_to_human(ai_timeout)
买家收到 ai_watchdog_escalate_notice(默认「稍等下 这边上报一下呢亲亲」)
mark_delivered → 即使之后 LLM 返回,is_escalated 也会挡掉 AI 正文
案例 4:改地址 → 不进 AI
买家: 帮我把收货地址改成上海市…
OrderLogisticsHandler 命中 → 转人工 + 原因写入 assist
AIReplyHandler 不再执行
13. 压测结论与快速上手
13.1 压测(辅助容量规划)
| 类型 | 条件 | 结果 |
|---|---|---|
| 程序内 | Handler sleep 80ms,30 不同买家 | 峰值并行 ≈ Semaphore 上限 |
| 程序内 | 15 条同买家 | 并行 ≈ 1 |
| 直连 LLM API | DashScope 兼容,阶梯 1~30 并发 | 成功率 100%(示例环境) |
| 直连 LLM API | 30 并发持续 30s | 约 7.1 req/s,P50 延迟约 2.5~3.9s |
结论:瓶颈在单条 LLM 延迟,不是 Python 调度。message_consumer_max_concurrent 与 queue_degrade_threshold_sec 应随模型 SLA 调整。
仓库内测试:
uv run python -m pytest test/test_ai_reply_watchdog.py -v
uv run python -m pytest test/test_buyer_burst_merge.py -v
uv run python -m pytest test/ -q
13.2 快速上手
git clone https://github.com/paidaxin-12138/customer-agent-agno.git
cd customer-agent-agno
uv sync
uv run playwright install chromium
cp config.json.example config.json
# 填写 llm、embedder、pinduoduo.shop_id / user_id
uv run python app.py
推荐顺序:用户管理登录 → 设置检查模型与开放平台 → 知识库(可选导入/商品同步)→ AI 测试 → 开启自动接待。
13.3 故障排查
| 现象 | 优先检查 |
|---|---|
| 商品同步 0 条却显示成功 | 已修复:须 validate_pinduoduo_account + API success + synced_count>0 |
| AI 不说话 | llm.api_key、model_name、logs/app.log |
| 界面能搜、AI 不能 | LanceDB 路径是否双 Manager 一致 |
| 物流查不到 | pinduoduo_open.enabled 与 token |
| Cookie 过期 | 收消息/发消息/商品 API 全挂 → 重新登录 |
14. 结语
三条线合在一起,解决的是三类不同失败模式:
父子知识库 → 答得像「这家店」、不像背错总部的稿
排队降级 → 高峰别让用户干等 LLM 槽位
Watchdog v2 → 进了 LLM 仍失败时,让人接手,且别烧第二次 token
若你正在做垂直电商客服,建议至少亲手核对这三处是否与线上一致:allow_child_override 默认 false、T0 在 ai_inflight 前、降级公式用 active×P95。欢迎 Star、Issue 与 PR。
说明:实现以仓库 main 为准;敏感配置勿提交 Git。转载请注明出处与项目链接。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)