28. 用 Camel-AI 与 OWL 编排思想,验证「运营数字员工」POC
本文基于仓库 ai-ops-assistant-lab 的 V1~V4 演进实践,说明如何用 Camel-AI 搭建多 Agent「数字员工」,并用 OWL 编排思想 把运营同学的自然语言问题,收敛为可审计的 SQL → 洞察 → 报告 链路。
一、我们在验证什么?
游戏运营团队每天面对大量重复性分析:「最近 7 天流失怎么样?」「DAU 和 GMV 趋势如何?」传统做法是运营提需求 → 数据同学写 SQL → 等数 → 写报告,周期长、口径难对齐。
数字员工 在这里的含义很具体:不是单一聊天机器人,而是一组分工明确的 AI Agent——理解问题、选指标、查数、写洞察、出报告——像一条小型流水线上的「虚拟同事」。
本仓库的 POC 目标可以概括为四句话:
| 目标 | POC 要证明的事 |
|---|---|
| 链路可行 | 自然语言 → 可控取数 → 分析 → Markdown 报告,离线 Mock 也能一键跑通 |
| 架构可演进 | 同一业务域下,从「LLM 直写 SQL」逐步收敛到「指标语义层 + 确定性编译」 |
| 过程可观测 | 每个阶段有 trace / Hook 时间线,方便审计与排错 |
| 试错成本低 | 默认 Mock LLM + Mock Doris,无需 API Key 即可演示 |
适用场景:数据平台、运营分析、AI 应用工程师,以及想把 Text-to-SQL 治理到语义层的团队。
二、Camel-AI 与 OWL:各自扮演什么角色?
Camel-AI:数字员工的「大脑」
Camel-AI 是本仓库 V1~V3 的 Agent 运行时。每个「数字员工」本质上是一个 ChatAgent:
# agents/_camel_runtime.py 中的调用链
ModelFactory.create(...) # 按平台/模型名创建 LLM
ChatAgent(system_message=...) # 固定系统提示 = 角色设定
BaseMessage.make_user_message(...) # 结构化 JSON 作为用户输入
agent.step(msg) # 单轮推理,返回 JSON
仓库约定统一的 Agent 接口:
def run(payload: Dict[str, Any]) -> Dict[str, Any]:
# 输入、输出都是 JSON-serializable dict
# use_mock_agents=True 时走确定性 Mock,不调 API
这让每个数字员工既可以接真实大模型,也可以在 POC 阶段完全离线演示。
OWL:数字员工的「班组长」
OWL 是 Camel 生态里的多 Agent 编排框架,核心是 Workforce 动态任务分解。本仓库没有直接依赖 OWL 包(requirements.txt 里标注为可选),而是采用 OWL 编排思想:
- 显式阶段:Pipeline 每一步清晰可见
- 状态传递:
context/OpsWorkflowState在阶段间流转 - 可观测 trace:
--json可打印全链路中间结果 - Camel
TaskAPI 对齐:用Task/TaskState包装用户问题与最终结果
# workflow/ops_workflow.py
task = Task(content=user_question)
task.set_state(TaskState.RUNNING)
# ... 五阶段执行 ...
task.update_result(markdown)
官方 OWL Workforce 适合动态拆任务;POC 阶段用固定 DAG 更易理解数据流。注释里已说明:后续可将每个阶段注册为 Worker,接口不变。
三、整体架构:V1 → V4 的四次进化
同一业务问题「最近 7 天用户流失情况如何?」,在四个版本里走的路径不同,但数字员工的分工逻辑一脉相承。
四、V1:用 Camel Agent 证明闭环(MVP POC)
V1 回答第一个 POC 问题:运营能否用一句话拿到报告?
五阶段固定 Pipeline
用户问题
│
▼
Query Understanding Agent → 意图 / 时间范围 / 关注指标
│
▼
SQL Generation Agent → SQL + mock_plan
│
▼
SQL Tool(Doris / Mock) → 行集结果
│
▼
Analysis Agent → 结构化洞察
│
▼
Report Agent → Markdown 分析报告
OpsWorkflow 按顺序调用各 Agent,并把中间结果写入 trace:
def run(self, user_question: str) -> Dict[str, Any]:
task = Task(content=user_question)
task.set_state(TaskState.RUNNING)
self.trace = {}
context: Dict[str, Any] = {"user_question": user_question}
understanding = query_understanding_agent.run(
{"user_question": user_question}
)
self.trace["query_understanding"] = understanding
# ... sql_generation → sql_execution → analysis → report ...
task.update_result(markdown)
return {
"task_id": task.id or "root",
"task_state": task.state.value,
"steps": STEP_ORDER,
"markdown_report": markdown,
"trace": self.trace,
}
Mock 模式:零密钥 POC
无 OPENAI_API_KEY 时,query_understanding_agent 用关键词 + 正则做确定性路由:
churn_kw = any(k in question for k in ("流失", "留存", "沉默", "流失率", "退坑"))
if churn_kw or "churn" in q:
return {
"intent": "churn_analysis",
"intent_label": "用户流失与活跃变化",
"time_range_days": days,
"metrics": ["churn_users_7d_window", "dau", "retention_d1_pct"],
...
}
POC 收获:端到端闭环成立;Agent 边界、Prompt、Tool 分层清晰。
POC 暴露的问题:LLM 可直接写 SQL → 易幻觉列名、漏分区、口径不一致。
五、V2:OWL Workflow + Schema 治理
V2 引入 OWL Workflow 命名与 StageCache,把 SQL 生成拆成三个专职数字员工:
Query Understanding → Schema Retriever(catalog.yaml)
→ SQL Planner → SQL Optimizer → SQL Execution
→ Insight → Report
OWL 编排的两项工程能力
1. 结构化状态 OpsWorkflowState 集中保存各阶段产物。
2. 阶段缓存 StageCache.memo:相同输入签名跳过重复 LLM 调用。
intent, hit_i = self.cache.memo(
"intent",
{"question": state.user_question},
lambda: query_understanding_agent.run({"user_question": state.user_question}),
)
catalog.yaml 把表/列/分区语义注入 Planner Prompt,成为第一道硬约束。
POC 收获:可连接真实 Doris;Schema 显著降低「瞎写 SQL」概率;缓存降低重复调用成本。
六、V3:指标语义层——禁止 LLM 直连 SQL
V3 是 POC 的治理分水岭:LLM 只选指标与意图,SQL 由 Compiler 确定性拼装。
Query Understanding → Metric Agent(registry 白名单)
→ Semantic Planner → SQL Compiler → Execution
→ Insight → Report
指标注册表示例:
# metrics/registry.yaml
active_user:
sql_template: "dau"
source_table: game_daily_metrics
dimensions: [dt]
grain: day
sql_compiler.compile_query_plan 只做字符串拼装 + 白名单校验,不接受模型自由发挥:
def compile_query_plan(plan: Dict[str, Any]) -> Dict[str, Any]:
"""
将 Semantic Planner 的 QueryPlan 编译为唯一可执行 SQL。
"""
if not plan.get("ok"):
return {"ok": False, "sql": "", ...}
table = plan["source_table"]
if not _safe_ident(table):
return _reject("INVALID_TABLE_NAME")
POC 收获:SQL 可审计、可版本管理;业务术语通过 term_map.yaml 对齐指标 ID;数字员工职责更清晰——模型负责理解,编译器负责落库。
七、V4:从 CLI POC 到「运营工作台」
V4 不再用 Camel ChatAgent 流水线,而是用 Skill + Hook + HITL 产品化,但复用 V3 的指标与编译逻辑。
Agent Runtime (Session + Workflow + HITL)
│
Skill Engine Hook Engine (SSE 时间线)
│
Model Adapter (mock / openai / anthropic / ...)
│
metrics + semantic_layer + Doris/Mock
典型交互流程:
- 用户输入「最近七天活跃度怎么样?」
- Workflow 依次执行 Skill:
understand-intent→select-metrics→compile-and-query - 右侧 Hooks 面板展示各 Skill 推理步骤(SSE)
- HITL 闸门:数据就绪后暂停,运营可「确认指标 / 修改指标」
- 确认后
analyze-insight→generate-report,中间面板渲染 Markdown
POC 收获:CLI 验证了链路,V4 验证了运营日常协作形态——过程透明、人在回路、多模型可切换。
八、数字员工分工一览
| 数字员工(Agent/Skill) | 职责 | Camel-AI 参与 |
|---|---|---|
| Query Understanding | NL → 意图、时间窗、指标候选 | V1~V3 ChatAgent |
| SQL Generation / Planner / Optimizer | 生成或规划 SQL | V1~V2 ChatAgent |
| Metric Agent | 从 registry 白名单选指标 | V3 ChatAgent |
| SQL Compiler | 确定性拼装 SQL | 纯 Python,非 LLM |
| SQL Execution | 查 Doris / Mock | Tool 层 |
| Insight / Analysis | 数据 → 业务洞察 | ChatAgent |
| Report | 洞察 → Markdown 报告 | ChatAgent |
| Skill 脚本(V4) | 同上,声明式 SKILL.md |
ModelAdapter 替代 Camel |
OWL 编排层贯穿 V1~V3:Task 生命周期 + 阶段顺序 + trace/cache_hits;V4 用 Workflow YAML + HITL 闸门延续同一思想。
九、如何 5 分钟跑通 POC
CLI(V1,无需 API Key):
cd ai-ops-assistant-v1
pip install -r requirements.txt
python main.py "最近7天用户流失情况如何?"
python main.py --json # 查看各数字员工的中间输出
Web UI(V4):
# 终端 1
cd ai-ops-assistant-v4/backend && pip install -r requirements.txt
python -m uvicorn main:app --reload --port 8000
# 终端 2
cd ai-ops-assistant-v4/frontend && npm install && npm run dev
# 打开 http://localhost:5173
Mock 数据覆盖 14 个交易日(game_daily_metrics、order_daily_summary),支持「最近 7 天」「最近 14 天」等常见问法。
十、POC 结论与下一步
已验证
- Camel-AI
ChatAgent适合把运营分析拆成多个专职数字员工,统一run(dict) → dict接口便于编排。 - OWL 编排思想(显式阶段 + Task 状态 + trace)让 POC 可观测、可调试,比黑盒端到端更易落地。
- 渐进式治理有效:V1 证闭环 → V2 加 Schema → V3 加指标 Compiler → V4 加 HITL 产品形态。
- Mock 优先大幅降低演示与评审成本,适合向业务方展示「数字员工」能力边界。
尚未实现、可作为 V5+ 方向
| 方向 | 说明 |
|---|---|
| 接入 OWL Workforce | 在保持指标 Compiler 前提下,按问题动态拆分 Worker |
| 语义层产品化 | 指标版本、血缘、口径审批流 |
| 评估回归 | 黄金问题集 + SQL/结果 diff |
| 协作推送 | 飞书/钉钉 Bot,Skill 化定时报告 |
十一、写给想复现这套 POC 的工程师
三条实践建议:
- 先固定 Pipeline,再谈动态编排。 POC 阶段用 V1 式 DAG 把数据流画清楚,比一上来接
Workforce更容易说服业务方。 - Agent 输出一律结构化 JSON。
_camel_runtime.extract_json_object处理了模型不守规矩的情况;下游用_normalize_*兜底,避免 KeyError 级联失败。 - 越早引入语义层,数字员工越「靠谱」。 V3 的
registry.yaml+sql_compiler是 POC 从「能跑」到「敢用」的关键转折;V4 的 HITL 则是从「敢用」到「敢上线协作」的转折。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)