这篇文档记录了如何用 Python + LangChain + RAG + LangGraph 从零搭建一个能自主调用工具、多步推理的智能设备故障诊断系统。
适合有 Python 基础、想入门 LLM 应用开发的同学。


项目是什么

一句话:一个能像运维工程师一样排查设备故障的 AI Agent。

它能做到:

  • 接收用户描述的故障现象(“制冰机报错 E03”)
  • 自主查询设备状态、读取日志、检索知识库
  • 综合分析后生成结构化的诊断报告

不是简单的问答机器人,而是一个有"手"(工具)有"脑"(LLM 推理)的 Agent。


技术栈一览

层级 技术 作用
LLM Claude / GPT-4o 大脑,负责推理和生成
框架 LangChain LLM 应用开发框架,统一接口
Agent LangGraph (ReAct) Agent 编排,自主决策调用工具
知识库 ChromaDB + Embedding 向量检索,让 AI 能查文档
API FastAPI + SSE 对外提供 REST 接口和流式输出
部署 Docker + docker-compose 一键部署

项目结构

ai-diagnostic-agent/
├── src/
│   ├── config/settings.py          # 配置管理
│   ├── llm/client.py               # LLM 客户端(双模型 + 降级)
│   ├── rag/
│   │   ├── loader.py               # 文档加载
│   │   ├── splitter.py             # 文本切分
│   │   ├── vectorstore.py          # 向量存储
│   │   ├── reranker.py             # CrossEncoder 重排序
│   │   ├── hybrid_search.py        # BM25 混合检索 + RRF 融合
│   │   └── chain.py                # RAG 检索链
│   ├── tools/
│   │   ├── knowledge_search.py     # 知识库检索工具
│   │   ├── log_reader.py           # SSH 日志读取工具
│   │   ├── device_status.py        # 设备状态查询工具
│   │   ├── diagnosis_report.py     # 诊断报告生成工具
│   │   ├── device_restart.py       # 设备服务重启工具
│   │   └── firmware_check.py       # 固件版本检查工具
│   ├── agent/
│   │   ├── diagnostic_agent.py     # ReAct 诊断 Agent(6 工具)
│   │   ├── repair_agent.py         # 维修建议 Agent
│   │   ├── monitor_agent.py        # 设备监控 Agent
│   │   └── supervisor.py           # Supervisor 多 Agent 编排
│   ├── api/
│   │   ├── app.py                  # FastAPI 入口
│   │   ├── limiter.py              # API 限流
│   │   ├── models.py               # 请求/响应模型
│   │   └── routes.py               # API 路由
│   └── main.py                     # CLI 入口
├── knowledge-base/
│   ├── device-manuals/             # 14 份设备手册
│   └── fault-cases/                # 25 个故障案例(5 类)
├── evaluation/                     # RAG 评估框架
├── frontend/                       # React + Vite + TypeScript 前端
├── scripts/build_knowledge_base.py # 知识库构建脚本
├── tests/                          # 133 个测试用例
├── Dockerfile
└── docker-compose.yml

Phase 1:LLM 基础接入

目标:让程序能调用 Claude/GPT,实现多轮对话。

核心概念

LangChain 把不同厂商的 LLM 统一成了相同的接口。不管你用 Claude 还是 GPT,调用方式都一样:

from langchain_anthropic import ChatAnthropic
from langchain_openai import ChatOpenAI

# 两个不同的模型,但接口完全一致
llm = ChatAnthropic(model="claude-sonnet-4-20250514", api_key="...")
llm = ChatOpenAI(model="gpt-4o", api_key="...")

# 调用方式一样
response = llm.invoke([HumanMessage(content="你好")])

关键代码:src/llm/client.py

def create_llm(provider: str = None):
    """工厂函数:根据 provider 创建对应的 LLM 实例"""
    provider = provider or settings.DEFAULT_LLM_PROVIDER

    if provider == "claude":
        kwargs = {
            "model": settings.CLAUDE_MODEL,
            "api_key": settings.ANTHROPIC_API_KEY,
            "max_tokens": 4096,
        }
        if settings.ANTHROPIC_BASE_URL:
            kwargs["base_url"] = settings.ANTHROPIC_BASE_URL  # 支持中转站
        return ChatAnthropic(**kwargs)

    elif provider == "openai":
        kwargs = {
            "model": settings.OPENAI_MODEL,
            "api_key": settings.OPENAI_API_KEY,
        }
        if settings.OPENAI_BASE_URL:
            kwargs["base_url"] = settings.OPENAI_BASE_URL
        return ChatOpenAI(**kwargs)

消息类型

LangChain 用三种消息类型来表示对话:

from langchain_core.messages import SystemMessage, HumanMessage, AIMessage

messages = [
    SystemMessage(content="你是一个设备诊断助手"),  # 系统提示词,定义 AI 的角色
    HumanMessage(content="制冰机报错了"),           # 用户输入
    AIMessage(content="请问是什么错误码?"),         # AI 回复
    HumanMessage(content="E03"),                    # 用户继续
]

# 把整个消息列表传给 LLM,它就能理解上下文
response = llm.invoke(messages)

流式输出

用户体验的关键 —— 不用等 AI 想完再一次性显示,而是逐字输出:

for chunk in llm.stream(messages):
    print(chunk.content, end="", flush=True)  # 逐 token 打印

学到了什么

  • LangChain 的 ChatModel 抽象:统一接口,切换模型只需改一行
  • 消息历史管理:多轮对话的本质就是维护一个消息列表
  • 流式输出:stream() 方法,逐 token 返回
  • 中转站配置:国内访问 Claude/GPT 的实际方案

Phase 2:RAG 知识库构建

目标:让 AI 能查阅设备手册和故障案例,而不是只靠自己的知识瞎猜。

RAG 是什么

RAG = Retrieval-Augmented Generation(检索增强生成)

核心思路:先搜索,再回答。

用户提问 → 从知识库检索相关文档 → 把文档塞进 Prompt → LLM 基于文档生成回答

为什么需要 RAG?因为 LLM 不知道你公司的设备手册内容。RAG 让它能"开卷考试"。

Step 1:文档加载 (loader.py)

把各种格式的文件统一加载成 LangChain 的 Document 对象:

from langchain_core.documents import Document

# Document 就两个字段:内容 + 元数据
doc = Document(
    page_content="制冰机错误码 E03 表示...",
    metadata={
        "source": "设备手册.md",
        "filename": "设备手册.md",
        "file_type": "markdown",
    }
)

支持 .md.pdf.txt 三种格式。PDF 按页加载,每页一个 Document。

Step 2:文本切分 (splitter.py)

为什么要切分?因为一篇 5000 字的文档直接做 Embedding 效果很差。切成小块后,检索更精确。

from langchain_text_splitters import RecursiveCharacterTextSplitter, MarkdownHeaderTextSplitter

# 通用切分器:按段落 → 换行 → 句号 → 空格的优先级切分
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,      # 每块最多 1000 字符
    chunk_overlap=200,    # 相邻块重叠 200 字符,避免切断语义
    separators=["\n\n", "\n", "。", ";", " ", ""],
)

对 Markdown 文件,我们用了二次切分策略:

# 第一次:按标题切分(保持章节完整性)
md_splitter = MarkdownHeaderTextSplitter(
    headers_to_split_on=[("#", "h1"), ("##", "h2"), ("###", "h3")]
)

# 第二次:对过长的 section 再按字符切分
final_chunks = text_splitter.split_documents(md_chunks)

Step 3:向量存储 (vectorstore.py)

Embedding 就是把文本变成一个高维向量(一串数字)。语义相似的文本,向量距离近。

# 本地 Embedding(免费离线)
from langchain_community.embeddings import HuggingFaceEmbeddings
embedding = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")

# 存入 ChromaDB
from langchain_chroma import Chroma
vectorstore = Chroma.from_documents(
    documents=chunks,           # 切分后的文档块
    embedding=embedding,        # Embedding 模型
    persist_directory="./chroma_data",  # 持久化到磁盘
)

# 检索:输入问题,返回最相似的 5 个文档块
results = vectorstore.similarity_search_with_score("制冰机 E03 错误", k=5)

Step 4:RAG 检索链 (chain.py)

把检索和生成串起来:

def rag_query(question: str) -> dict:
    # 1. 检索相关文档
    results = search_with_scores(question, k=5)

    # 2. 格式化为上下文
    context = format_context(results)

    # 3. 构造 Prompt(把文档塞进去)
    messages = [
        SystemMessage(content=f"根据以下文档回答问题:\n{context}"),
        HumanMessage(content=question),
    ]

    # 4. LLM 生成回答
    response = llm.invoke(messages)

    return {"answer": response.content, "sources": [...]}

一键构建知识库

# 把 knowledge-base/ 目录下的文档全部加载、切分、Embedding、存入 ChromaDB
python scripts/build_knowledge_base.py --local

学到了什么

  • RAG 的完整流程:加载 → 切分 → Embedding → 存储 → 检索 → 生成
  • chunk_size 和 chunk_overlap 的权衡:太大检索不精确,太小丢失上下文
  • Embedding 选型:本地模型(免费离线)vs OpenAI API(效果好但要钱)
  • ChromaDB:轻量级向量数据库,开发阶段够用

Phase 3:Agent & Function Calling

目标:让 AI 不只是回答问题,而是能自主调用工具来收集信息、分析问题。

Agent vs Chain

这是最关键的概念区分:

Chain(链):固定流程
  用户提问 → 检索文档 → 生成回答
  每次都走一样的路径,不管问题是什么。

Agent(智能体):动态决策
  用户提问 → AI 思考 → 决定调用哪个工具 → 观察结果 → 继续思考 → ...
  根据问题不同,走不同的路径。

ReAct 模式

ReAct = Reasoning + Acting(推理 + 行动)

Thought: 用户说制冰机报错 E03,我需要先查设备状态
Action:  调用 query_device_status("BAR-001")
Observation: 制冰机状态 error,错误码 E03

Thought: 我需要查日志看看具体错误信息
Action:  调用 fetch_device_logs("middleware", keyword="E03")
Observation: [2026-04-09 14:25] ICE_MACHINE: Error E03 - 水泵压力异常

Thought: 我需要查知识库看看 E03 的处理方法
Action:  调用 search_knowledge_base("制冰机 E03 水泵压力")
Observation: 根据设备手册,E03 表示...

Thought: 信息够了,生成诊断报告
Action:  调用 generate_diagnosis_report(...)
Final Answer: 📋 诊断报告...

定义工具

@tool 装饰器定义工具。Agent 通过工具的 namedescription 来决定什么时候调用它:

from langchain_core.tools import tool

@tool
def search_knowledge_base(query: str) -> str:
    """搜索设备知识库,查找设备手册、操作规范、故障案例等技术文档。
    当需要查询设备的技术参数、操作流程、日志规范、错误码含义、
    硬件接口说明等信息时使用此工具。

    Args:
        query: 搜索关键词,例如"制冰机故障"、"日志错误码"
    """
    results = search_with_scores(query, k=5)
    # ... 格式化返回

description 写得好不好,直接影响 Agent 的智能程度。要写清楚"什么场景下该用这个工具"。

六个工具

工具 作用 数据来源
query_device_status 查设备实时状态 bar_middleware 真实 API
fetch_device_logs 读取设备日志 本地文件 / SSH(可配置)
search_knowledge_base 检索知识库(混合检索+重排序) ChromaDB + BM25 + CrossEncoder
generate_diagnosis_report 生成诊断报告 格式化输出
restart_device_service 重启设备服务 SSH 命令白名单
check_firmware_version 检查固件版本 middleware API

日志读取的双模式设计 (log_reader.py)

fetch_device_logs 支持两种读取模式,通过 .env 中的 LOG_READ_MODE 切换:

# .env
LOG_READ_MODE=local   # 同机部署,直接读本地文件
# LOG_READ_MODE=ssh   # 跨机部署,通过 SSH 远程读取

本地模式直接读文件,简单高效:

def _read_local(log_type: str, lines: int, keyword: str) -> str:
    log_path = Path(LOG_PATHS[log_type])
    with open(log_path, "r", encoding="utf-8", errors="ignore") as f:
        all_lines = f.readlines()
    if keyword:
        all_lines = [l for l in all_lines if keyword.lower() in l.lower()]
    return "".join(all_lines[-lines:])

SSH 模式通过 subprocess 调用 ssh 命令远程读取:

def _read_ssh(log_type: str, lines: int, keyword: str) -> str:
    remote_cmd = f"tail -n {lines} {LOG_PATHS[log_type]}"
    ssh_cmd = ["ssh", f"{settings.DEVICE_SSH_USER}@{settings.DEVICE_SSH_HOST}", remote_cmd]
    result = subprocess.run(ssh_cmd, capture_output=True, text=True, timeout=15)
    return result.stdout

工具入口根据配置自动选择:

@tool
def fetch_device_logs(log_type="syslog", lines=50, keyword="") -> str:
    if settings.LOG_READ_MODE == "local":
        return _read_local(log_type, lines, keyword)
    else:
        return _read_ssh(log_type, lines, keyword)

同机部署时用 local 模式:不需要 SSH 配置,读取更快,少一个故障点。

创建 Agent

用 LangGraph 的 create_react_agent 一行搞定:

from langgraph.prebuilt import create_react_agent

TOOLS = [search_knowledge_base, fetch_device_logs, query_device_status, generate_diagnosis_report]

agent = create_react_agent(
    model=llm,                      # LLM 作为大脑
    tools=TOOLS,                    # 注册工具
    prompt=AGENT_SYSTEM_PROMPT,     # 系统提示词
)

# 执行
result = agent.invoke({
    "messages": [HumanMessage(content="制冰机报错 E03,怎么回事?")]
})

Agent 会自动完成:选择工具 → 填充参数 → 调用 → 解析结果 → 决定下一步 → 循环直到得出结论。

学到了什么

  • Agent 和 Chain 的本质区别:固定流程 vs 动态决策
  • ReAct 模式:Thought → Action → Observation 循环
  • 工具定义:@tool 装饰器,description 是关键
  • LangGraph:比 LangChain 的 AgentExecutor 更灵活的 Agent 编排框架

Phase 4:生产化

目标:把 CLI 工具包装成可部署的 API 服务。

FastAPI 服务

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI(title="设备故障诊断 Agent API")

# CORS 跨域(允许前端调用)
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])

API 路由设计

方法 路径 功能
POST /api/chat 普通对话(非流式)
POST /api/chat/stream 普通对话(SSE 流式)
POST /api/diagnose Agent 诊断(工具调用)
POST /api/diagnose/stream Agent 诊断(SSE 流式)
POST /api/rag/query RAG 知识库检索
GET /api/sessions 列出活跃会话
DELETE /api/session/{id} 清除会话
GET /health 健康检查

SSE 流式输出

SSE(Server-Sent Events)是实现流式输出的标准方式,比 WebSocket 更轻量:

from fastapi.responses import StreamingResponse

@router.post("/chat/stream")
async def chat_stream(req: ChatRequest):
    session = _get_session(req.session_id, req.provider)

    async def event_generator():
        for token in session.stream_chat(req.message):
            # 每个 token 作为一个 SSE event 发送
            yield f"data: {json.dumps({'token': token}, ensure_ascii=False)}\n\n"
        yield f"data: {json.dumps({'done': True})}\n\n"

    return StreamingResponse(event_generator(), media_type="text/event-stream")

前端用 EventSource API 接收:

const source = new EventSource('/api/chat/stream');
source.onmessage = (event) => {
    const data = JSON.parse(event.data);
    if (data.token) appendToChat(data.token);
    if (data.done) source.close();
};

Agent 诊断接口

最核心的接口 —— 调用 Agent 执行完整诊断流程:

@router.post("/diagnose")
async def diagnose(req: DiagnoseRequest):
    agent = create_diagnostic_agent(req.provider)
    result = agent.invoke({
        "messages": [HumanMessage(content=req.question)],
    })

    # 提取推理步骤和使用的工具
    tools_used = []
    steps = []
    for msg in result["messages"]:
        if hasattr(msg, "tool_calls") and msg.tool_calls:
            for tc in msg.tool_calls:
                tools_used.append(tc["name"])
                steps.append({"type": "tool_call", "tool": tc["name"], "args": tc["args"]})

    return DiagnoseResponse(result=final_content, tools_used=tools_used, steps=steps)

Docker 部署

FROM python:3.11-slim
WORKDIR /app
RUN apt-get update && apt-get install -y openssh-client  # SSH 客户端
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ src/
COPY scripts/ scripts/
COPY knowledge-base/ knowledge-base/
EXPOSE 8000
CMD ["uvicorn", "src.api.app:app", "--host", "0.0.0.0", "--port", "8000"]
# 一键启动
docker-compose up -d

# 测试
curl http://localhost:8000/health
curl -X POST http://localhost:8000/api/diagnose \
  -H "Content-Type: application/json" \
  -d '{"question": "制冰机报错 E03"}'

学到了什么

  • FastAPI:自动生成 Swagger 文档,Pydantic 数据校验
  • SSE 流式输出:比 WebSocket 更适合"AI 逐字输出"的场景
  • 会话管理:通过 session_id 维护多轮对话
  • Docker 部署:Dockerfile + docker-compose 一键部署

整体数据流

把四个 Phase 串起来,看一次完整的诊断请求是怎么走的:

用户: "制冰机报错 E03,怎么处理?"
  │
  ▼
FastAPI 接收请求 (routes.py)
  │
  ▼
创建 ReAct Agent (diagnostic_agent.py)
  │  ├── LLM: Claude/GPT (client.py)
  │  └── Tools: 4 个工具
  │
  ▼
Agent 开始推理循环:
  │
  ├── Thought: 先查设备状态
  │   └── Action: query_device_status("制冰机")
  │       └── HTTP → middleware:8003 → 返回真实状态
  │
  ├── Thought: 查日志看详情
  │   └── Action: fetch_device_logs("middleware", keyword="E03")
  │       └── SSH → 设备 → 返回日志
  │
  ├── Thought: 查知识库找解决方案
  │   └── Action: search_knowledge_base("制冰机 E03")
  │       └── ChromaDB 向量检索 → 返回相关文档
  │
  ├── Thought: 信息够了,生成报告
  │   └── Action: generate_diagnosis_report(...)
  │       └── 返回结构化报告
  │
  └── Final Answer: 📋 诊断报告
      │
      ▼
  FastAPI 返回 JSON 响应

真实 API 接入:从模拟数据到生产环境

为什么要接真实 API

  • Agent 拿到的是设备的真实运行状态,诊断结果有实际意义
  • 简历上可以写"对接真实 IoT 设备 API"
  • 展示了从原型到落地的工程化能力

middleware 接口体系

middleware 是现有的设备中间件(Python + FastAPI),运行在同一台主机。它对每个硬件模块都提供了统一的 /machine/status 接口:

GET /ice/machine/status        → 制冰机状态(错误码、冰量、满载等)
GET /coffee/machine/status     → 咖啡机状态(错误码、运行中/空闲等)
GET /cup/machine/status        → 杯子机状态
GET /lid/machine/status        → 扣盖机状态
GET /power/machine/status      → 电源状态(各路开关)
GET /RobotArm/Dev/status       → 机械臂状态
GET /agent/material/all        → 所有物料数据
GET /mid/version               → 中间件版本

所有接口返回统一格式:

{
    "success": true,
    "msg": "",
    "data": {
        "status_list": [
            {"code": 0, "name": "制冰机", "status": "normal", "error_code": "", "full": 1, "inventory": 5000}
        ]
    }
}

改造 device_status.py

核心变化:把 MOCK_DEVICES 字典换成真实的 HTTP 调用。

import httpx
from src.config.settings import settings

# 各模块的状态接口映射
MODULE_STATUS_ENDPOINTS = {
    "制冰机": "/ice/machine/status",
    "咖啡机": "/coffee/machine/status",
    "杯子机": "/cup/machine/status",
    # ... 10+ 个模块
}

def _fetch(path: str) -> dict | None:
    """请求 middleware 接口"""
    url = f"{settings.MIDDLEWARE_BASE_URL}{path}"
    try:
        resp = httpx.get(url, timeout=5.0)
        return resp.json()
    except Exception:
        return None

@tool
def query_device_status(module: str = "all") -> str:
    """查询饮吧设备各硬件模块的实时运行状态..."""

    # 先检查中间件是否在线
    version_data = _fetch("/mid/version")
    if version_data is None:
        return "无法连接到 middleware,中间件可能未启动。"

    # 逐个查询各模块
    for name, path in MODULE_STATUS_ENDPOINTS.items():
        data = _fetch(path)
        # 格式化输出...

配置

# .env
MIDDLEWARE_BASE_URL=http://localhost:8003  # 同机部署

因为诊断 Agent 和 middleware 部署在同一台机器上,直接 localhost 调用,不需要走网络。


踩过的坑

1. 代理冲突

国内开发经常开 Clash 等代理,但调用中转站 API 时代理会导致 SSL 错误。解决方案:在 settings.py 统一清除代理环境变量。

2. Extended Thinking 格式

Claude 的 extended thinking 模式下,response.content 不是字符串而是列表。需要 extract_text() 函数做兼容处理。

3. 流式输出降级

流式输出可能因为模型配置问题失败,所以做了降级逻辑:流式失败 → 自动切换为非流式调用。

4. 工具 description 的重要性

Agent 选择工具完全依赖 description。一开始写得太简单,Agent 经常选错工具。后来把"什么场景下该用"写清楚,效果好了很多。


快速上手

# 1. 克隆项目
git clone https://github.com/评论区获取项目链接 qaq
cd ai-diagnostic-agent

# 2. 创建虚拟环境
python -m venv .venv
.venv/Scripts/activate  # Windows
# source .venv/bin/activate  # Mac/Linux

# 3. 安装依赖
pip install -r requirements.txt

# 4. 配置环境变量
cp .env.example .env
# 编辑 .env,填入 API Key

# 5. 构建知识库
python scripts/build_knowledge_base.py --local

# 6. 启动 CLI
python -m src.main

# 7. 或者启动 API 服务
uvicorn src.api.app:app --reload --port 8000
# 访问 http://localhost:8000/docs 查看 Swagger 文档

下一步可以做什么(已全部完成 ✅)

以下功能在项目后续迭代中均已实现:

  • ✅ 接入真实设备 API(device_status.py 已对接 middleware 真实接口)
  • ✅ 添加更多工具(device_restart.py 重启服务、firmware_check.py 固件检查)
  • ✅ 前端界面(React + Vite + TypeScript + SSE 流式,含模型选择/多Agent开关/报告导出)
  • ✅ 对话记忆持久化(Redis 会话存储,自动降级到内存)
  • ✅ 多 Agent 协作(Supervisor + 诊断/维修/监控 Agent,支持链式调用)
  • ✅ 评估体系(evaluation/run_eval.py,Recall@K + MRR + 回答质量)
  • ✅ RAG 检索优化(CrossEncoder 重排序 + BM25 混合检索 + RRF 融合)
  • ✅ API 限流(slowapi,按端点分级限制)

详细文档见:

  • docs/usage-and-learning-guide.md — 完整使用指南
  • docs/api-guide.md — API 集成指南
  • docs/deployment-guide.md — 部署最佳实践
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐