前言

在 AI 应用开发中,我们往往专注于功能实现,却忽略了一个关键问题:我们的 AI 到底消耗了多少资源? 每次对话用了多少 Token?产生了多少费用?响应延迟如何?哪些模型和接口被调用得最多?

本文将以一个完整的聊天应用为例,从零开始搭建一套 AI 使用数据监控系统,涵盖后端数据采集、API 设计和前端可视化面板的完整实现。

技术栈

  • 后端: Python + Flask + LangChain (ChatOpenAI / DashScope)
  • 数据库: MySQL
  • 前端: React 19 + Ant Design 5 + Zustand

一、整体架构

用户请求
  │
  ▼
Flask 后端 (server.py)
  │
  ├─ 调用 LLM (ChatOpenAI)
  │     │
  │     ▼
  │   提取 response_metadata (token 用量)
  │     │
  │     ▼
  ├─ log_usage() → 写入 MySQL (ai_usage_logs 表)
  │
  ├─ /api/monitoring/summary  ── 聚合统计
  ├─ /api/monitoring/daily    ── 每日趋势
  ├─ /api/monitoring/recent   ── 最近请求
  │
  ▼
React 前端 (MonitoringPanel.jsx)
  │
  ├─ 5 个统计卡片 (Tokens / 费用 / 请求数 / 延迟 / 系统状态)
  ├─ SVG 双轴趋势图 (面积图 + 柱状图)
  ├─ 模型分布 + 端点分布
  └─ 可筛选/排序的请求记录表格

二、后端实现

2.1 创建数据库表

首先在 MySQL 中创建使用日志表:

CREATE TABLE IF NOT EXISTS ai_usage_logs (
    id INT AUTO_INCREMENT PRIMARY KEY,
    endpoint VARCHAR(100) NOT NULL COMMENT 'API 端点',
    model VARCHAR(50) NOT NULL COMMENT '模型名称',
    prompt_tokens INT DEFAULT 0,
    completion_tokens INT DEFAULT 0,
    total_tokens INT DEFAULT 0,
    latency_ms INT DEFAULT 0 COMMENT '响应耗时(ms)',
    cost DOUBLE DEFAULT 0 COMMENT '费用(CNY)',
    conversation_id VARCHAR(36) DEFAULT NULL COMMENT '关联对话',
    request_preview VARCHAR(200) DEFAULT NULL COMMENT '用户输入预览',
    response_preview VARCHAR(200) DEFAULT NULL COMMENT 'AI回复预览',
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_model (model),
    INDEX idx_endpoint (endpoint),
    INDEX idx_created (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

每条记录对应一次 LLM 调用,存储端点、模型、Token 用量、延迟、费用等关键指标。

2.2 使用数据追踪模块

创建 usage_tracker.py,封装数据记录和提取逻辑:

"""AI 使用数据追踪模块"""
import pymysql
from config import DB_CONFIG

# DashScope 模型定价 (CNY / 1K tokens)
MODEL_PRICING = {
    "qwen-plus": {"input": 0.004, "output": 0.012},
    "qwen-vl-plus": {"input": 0.008, "output": 0.02},
}


def calculate_cost(model, prompt_tokens, completion_tokens):
    """根据模型定价计算费用"""
    pricing = MODEL_PRICING.get(model, {"input": 0.004, "output": 0.012})
    return (prompt_tokens * pricing["input"] 
            + completion_tokens * pricing["output"]) / 1000


def log_usage(endpoint, model, prompt_tokens=0, completion_tokens=0,
             total_tokens=0, latency_ms=0, conversation_id=None,
             request_preview=None, response_preview=None):
    """记录一次 AI 调用的使用数据"""
    if total_tokens == 0 and (prompt_tokens > 0 or completion_tokens > 0):
        total_tokens = prompt_tokens + completion_tokens

    cost = calculate_cost(model, prompt_tokens, completion_tokens)
    conn = pymysql.connect(**DB_CONFIG)
    try:
        cursor = conn.cursor()
        cursor.execute(
            """INSERT INTO ai_usage_logs
               (endpoint, model, prompt_tokens, completion_tokens, 
                total_tokens, latency_ms, cost, conversation_id,
                request_preview, response_preview)
               VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
            (endpoint, model, prompt_tokens, completion_tokens,
             total_tokens, latency_ms, round(cost, 6), conversation_id,
             (request_preview or "")[:200],
             (response_preview or "")[:200])
        )
        conn.commit()
    finally:
        conn.close()


def extract_metadata(response):
    """从 LangChain 响应中提取 Token 使用数据"""
    metadata = {"prompt_tokens": 0, "completion_tokens": 0, 
                "total_tokens": 0, "model_name": ""}
    if hasattr(response, "response_metadata"):
        rm = response.response_metadata
        if isinstance(rm, dict):
            token_usage = rm.get("token_usage", {})
            metadata["prompt_tokens"] = token_usage.get("prompt_tokens", 0)
            metadata["completion_tokens"] = token_usage.get("completion_tokens", 0)
            metadata["total_tokens"] = token_usage.get("total_tokens", 0)
            metadata["model_name"] = rm.get("model_name", "")
    return metadata


def estimate_tokens(text):
    """估算 Token 数 (中文约 1.5 token/字, 英文约 0.25/字符)"""
    if not text:
        return 0
    chinese = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
    return int(chinese * 1.5 + (len(text) - chinese) * 0.25)

核心设计要点:

  • extract_metadata() — 从 LangChain 的 ChatOpenAI 响应对象中提取精确的 Token 用量
  • estimate_tokens() — 对于流式调用(DashScope 不返回 Token 统计),用字符估算替代
  • calculate_cost() — 基于模型定价自动计算每次调用费用
2.3 在 LLM 调用点注入追踪

流式调用(估算 Token):

# 流式调用 — DashScope 不返回 Token 统计,使用估算
start_time = time.time()
full_reply = ""

for chunk in llm.stream(history):
    if chunk.content:
        full_reply += chunk.content
        yield f"data: {json.dumps({'token': chunk.content})}\n\n"

# 流结束后记录
latency = int((time.time() - start_time) * 1000)
p_est = estimate_tokens(user_message)
c_est = estimate_tokens(full_reply)
log_usage(
    endpoint="/api/chat/stream",
    model="qwen-plus",
    prompt_tokens=p_est,
    completion_tokens=c_est,
    total_tokens=p_est + c_est,
    latency_ms=latency,
    request_preview=user_message,
    response_preview=full_reply[:200],
)

关键陷阱: 在流式端点中,如果不同路径(RAG、MCP、普通对话)共用 log_usage,务必确保引用的变量在所有路径中都存在。例如 history 只在普通对话路径定义,RAG/MCP 路径应使用 user_message 进行 Token 估算。

2.4 监控 API 端点

提供三个聚合查询接口,供前端面板调用:

# ==================== 监控 API ====================

@app.route("/api/monitoring/summary", methods=["GET"])
def monitoring_summary():
    """获取使用数据汇总"""
    days = int(request.args.get("days", 30))
    import pymysql
    conn = pymysql.connect(**_DB_CONFIG)
    try:
        cursor = conn.cursor(pymysql.cursors.DictCursor)

        # 总体统计
        cursor.execute(
            """SELECT
                COUNT(*) as total_requests,
                COALESCE(SUM(prompt_tokens), 0) as prompt_tokens,
                COALESCE(SUM(completion_tokens), 0) as completion_tokens,
                COALESCE(SUM(total_tokens), 0) as total_tokens,
                COALESCE(SUM(cost), 0) as total_cost,
                COALESCE(AVG(latency_ms), 0) as avg_latency,
                COALESCE(MAX(latency_ms), 0) as max_latency,
                COALESCE(MIN(latency_ms), 0) as min_latency
            FROM ai_usage_logs
            WHERE created_at >= DATE_SUB(NOW(), INTERVAL %s DAY)""",
            (days,),
        )
        overall = cursor.fetchone()

        # P95 延迟
        cursor.execute(
            """SELECT COALESCE(latency_ms, 0) as latency
            FROM ai_usage_logs
            WHERE created_at >= DATE_SUB(NOW(), INTERVAL %s DAY)
              AND latency_ms IS NOT NULL
            ORDER BY latency ASC""",
            (days,),
        )
        latency_rows = cursor.fetchall()
        p95_latency = 0
        if latency_rows:
            idx = int(len(latency_rows) * 0.95)
            p95_latency = latency_rows[min(idx, len(latency_rows) - 1)]["latency"]

        # 成功率 (有 response_preview 的视为成功)
        cursor.execute(
            """SELECT
                COUNT(*) as total,
                SUM(CASE WHEN response_preview IS NOT NULL AND response_preview != '' THEN 1 ELSE 0 END) as success
            FROM ai_usage_logs
            WHERE created_at >= DATE_SUB(NOW(), INTERVAL %s DAY)""",
            (days,),
        )
        sr = cursor.fetchone()
        success_rate = round((int(sr["success"]) / int(sr["total"])) * 100, 1) if int(sr["total"]) > 0 else 100.0

        # 按模型分组
        cursor.execute(
            """SELECT
                model,
                COUNT(*) as requests,
                COALESCE(SUM(prompt_tokens), 0) as prompt_tokens,
                COALESCE(SUM(completion_tokens), 0) as completion_tokens,
                COALESCE(SUM(total_tokens), 0) as tokens,
                COALESCE(SUM(cost), 0) as cost
            FROM ai_usage_logs
            WHERE created_at >= DATE_SUB(NOW(), INTERVAL %s DAY)
            GROUP BY model""",
            (days,),
        )
        model_rows = cursor.fetchall()
        models = {}
        for row in model_rows:
            models[row["model"]] = {
                "tokens": row["tokens"],
                "promptTokens": row["prompt_tokens"],
                "completionTokens": row["completion_tokens"],
                "requests": row["requests"],
                "cost": round(float(row["cost"]), 4),
            }

        # 按端点分组
        cursor.execute(
            """SELECT
                endpoint,
                COUNT(*) as requests,
                COALESCE(SUM(total_tokens), 0) as tokens,
                COALESCE(SUM(cost), 0) as cost
            FROM ai_usage_logs
            WHERE created_at >= DATE_SUB(NOW(), INTERVAL %s DAY)
            GROUP BY endpoint""",
            (days,),
        )
        ep_rows = cursor.fetchall()
        endpoints = {}
        for row in ep_rows:
            endpoints[row["endpoint"]] = {
                "tokens": row["tokens"],
                "requests": row["requests"],
                "cost": round(float(row["cost"]), 4),
            }

        # 今日统计
        cursor.execute(
            """SELECT
                COALESCE(SUM(total_tokens), 0) as tokens,
                COUNT(*) as requests,
                COALESCE(SUM(cost), 0) as cost
            FROM ai_usage_logs
            WHERE created_at >= CURDATE()"""
        )
        today = cursor.fetchone()

        # 昨日统计
        cursor.execute(
            """SELECT
                COALESCE(SUM(total_tokens), 0) as tokens
            FROM ai_usage_logs
            WHERE created_at >= DATE_SUB(CURDATE(), INTERVAL 1 DAY)
              AND created_at < CURDATE()"""
        )
        yesterday = cursor.fetchone()

        yd_tokens = yesterday["tokens"]
        td_tokens = int(today["tokens"])
        yd_tokens = int(yesterday["tokens"])
        tokens_change = round(((td_tokens - yd_tokens) / yd_tokens) * 100, 1) if yd_tokens > 0 else (100.0 if td_tokens > 0 else 0.0)

        # 最后请求时间
        cursor.execute("SELECT MAX(created_at) as last_at FROM ai_usage_logs")
        last_row = cursor.fetchone()
        last_request_at = str(last_row["last_at"]) if last_row["last_at"] else None

        # 本月费用
        cursor.execute(
            """SELECT COALESCE(SUM(cost), 0) as cost
            FROM ai_usage_logs
            WHERE created_at >= DATE_FORMAT(NOW(), '%%Y-%%m-01')"""
        )
        month_cost = float(cursor.fetchone()["cost"])

        return jsonify({
            "code": 200,
            "data": {
                "totalTokens": overall["total_tokens"],
                "promptTokens": overall["prompt_tokens"],
                "completionTokens": overall["completion_tokens"],
                "totalCost": round(float(overall["total_cost"]), 4),
                "totalRequests": overall["total_requests"],
                "avgLatencyMs": int(overall["avg_latency"]),
                "maxLatencyMs": int(overall["max_latency"]),
                "minLatencyMs": int(overall["min_latency"]),
                "p95LatencyMs": int(p95_latency),
                "successRate": success_rate,
                "todayTokens": td_tokens,
                "todayRequests": today["requests"],
                "todayCost": round(float(today["cost"]), 4),
                "yesterdayTokens": yd_tokens,
                "tokensChangeRate": tokens_change,
                "lastRequestAt": last_request_at,
                "monthCost": round(month_cost, 4),
                "models": models,
                "endpoints": endpoints,
            },
        })
    except Exception as e:
        return jsonify({"code": 500, "msg": str(e)})
    finally:
        conn.close()


@app.route("/api/monitoring/daily", methods=["GET"])
def monitoring_daily():
    """获取每日使用趋势"""
    days = int(request.args.get("days", 7))
    import pymysql
    conn = pymysql.connect(**_DB_CONFIG)
    try:
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        cursor.execute(
            """SELECT
                DATE(created_at) as date,
                COALESCE(SUM(total_tokens), 0) as tokens,
                COUNT(*) as requests,
                COALESCE(SUM(cost), 0) as cost
            FROM ai_usage_logs
            WHERE created_at >= DATE_SUB(NOW(), INTERVAL %s DAY)
            GROUP BY DATE(created_at)
            ORDER BY date ASC""",
            (days,),
        )
        rows = cursor.fetchall()
        data = []
        for row in rows:
            data.append({
                "date": str(row["date"]),
                "tokens": row["tokens"],
                "requests": row["requests"],
                "cost": round(float(row["cost"]), 4),
            })
        return jsonify({"code": 200, "data": data})
    except Exception as e:
        return jsonify({"code": 500, "msg": str(e)})
    finally:
        conn.close()


@app.route("/api/monitoring/recent", methods=["GET"])
def monitoring_recent():
    """获取最近的 AI 调用记录"""
    limit = int(request.args.get("limit", 20))
    import pymysql
    conn = pymysql.connect(**_DB_CONFIG)
    try:
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        cursor.execute(
            """SELECT
                id, endpoint, model, conversation_id,
                prompt_tokens, completion_tokens, total_tokens,
                latency_ms, cost, request_preview, response_preview,
                created_at
            FROM ai_usage_logs
            ORDER BY created_at DESC
            LIMIT %s""",
            (limit,),
        )
        rows = cursor.fetchall()
        data = []
        for row in rows:
            has_response = row["response_preview"] is not None and row["response_preview"] != ""
            data.append({
                "id": row["id"],
                "endpoint": row["endpoint"],
                "model": row["model"],
                "conversationId": row["conversation_id"],
                "promptTokens": row["prompt_tokens"],
                "completionTokens": row["completion_tokens"],
                "totalTokens": row["total_tokens"],
                "latencyMs": row["latency_ms"],
                "cost": round(float(row["cost"]), 4),
                "requestPreview": row["request_preview"],
                "responsePreview": row["response_preview"][:200] if row["response_preview"] else None,
                "status": "success" if has_response else "failed",
                "createdAt": str(row["created_at"]),
            })
        return jsonify({"code": 200, "data": data})
    except Exception as e:
        return jsonify({"code": 500, "msg": str(e)})
    finally:
        conn.close()

三、前端实现

3.1 API 服务层
// monitoringService.js

/**
 * Monitoring API Service
 * AI 使用数据监控
 */

const BASE_URL = process.env.REACT_APP_CHAT_API_URL || 'http://localhost:5000';

export class MonitoringService {

  async getSummary(days = 30) {
    const res = await fetch(`${BASE_URL}/api/monitoring/summary?days=${days}`);
    const data = await res.json();
    if (data.code === 200) return data.data;
    throw new Error(data.msg || '获取监控汇总失败');
  }

  async getDailyUsage(days = 7) {
    const res = await fetch(`${BASE_URL}/api/monitoring/daily?days=${days}`);
    const data = await res.json();
    if (data.code === 200) return data.data;
    throw new Error(data.msg || '获取每日趋势失败');
  }

  async getRecentRequests(limit = 20) {
    const res = await fetch(`${BASE_URL}/api/monitoring/recent?limit=${limit}`);
    const data = await res.json();
    if (data.code === 200) return data.data;
    throw new Error(data.msg || '获取最近记录失败');
  }
}

export const monitoringService = new MonitoringService();
3.2 监控面板组件

面板使用 antd Modal 弹窗 (960px 宽),包含以下区域:

1) 五个统计卡片

<Row gutter={[12, 12]} align="stretch">
  {/* 总 Tokens / 总费用 / 请求次数 / 平均延迟 / 系统状态 */}
  {statCards.map(card => (
    <Col flex="1" key={card.key}>
      <div className={styles.statCard} style={{ background: card.gradient }}>
        <div className={styles.statLabel}>{card.title}</div>
        <div className={styles.statValue}>{card.value}</div>
        <div className={styles.statSub}>{card.sub}</div>
      </div>
    </Col>
  ))}
</Row>

每个卡片使用渐变背景色 + 环比变化率标签 + 今日子数据,系统状态卡片额外展示成功率、API 在线状态、月预算进度条。

2) SVG 双轴趋势图

使用纯 SVG 绘制面积图 + 柱状图组合:

  • 左 Y 轴 = Tokens(平滑曲线 + 面积填充)
  • 右 Y 轴 = 请求次数(半透明柱状图)
  • Catmull-Rom 曲线平滑算法,hover 显示详情
function DualChart({ data }) {
  // 平滑曲线路径 (Catmull-Rom → Cubic Bezier)
  function smoothPath(points) {
    let d = `M${points[0].x},${points[0].y}`;
    for (let i = 0; i < points.length - 1; i++) {
      const tension = 0.3;
      // 计算 Catmull-Rom 控制点...
      d += ` C${cp1x},${cp1y} ${cp2x},${cp2y} ${p2.x},${p2.y}`;
    }
    return d;
  }
  // SVG: 网格线 + 柱状图 + 面积 + 曲线 + 数据点
}

3) 模型分布 + 端点分布

左右并排展示,使用 antd Progress 进度条 + 百分比,直观显示各模型和端点的使用占比。

4) 最近请求表格

<Table
  dataSource={recentRequests}
  columns={[
    { title: '时间', sorter: true },
    { title: '端点', filters: [...] },  // 可筛选
    { title: '模型', filters: [...] },
    { title: 'Input/Output' },
    { title: '延迟', sorter: true, render: colorByLatency },
    { title: '状态', render: successOrFailedTag },
    { title: '操作', render: detailPopover },
  ]}
  pagination={{ pageSize: 8 }}
/>

支持按端点、模型筛选,按时间、Token、延迟排序,点击「详情」查看完整请求/响应内容。

3.3 浮动按钮入口

在聊天页面右下角添加浮动按钮,点击打开监控弹窗:

<div className={styles.monitoringFab} onClick={() => setMonitoringOpen(true)}>
        <DashboardOutlined />
      </div>
      <MonitoringPanel open={monitoringOpen} onClose={() => setMonitoringOpen(false)} />
    </div>
.monitoringFab {
  position: fixed;
  bottom: 28px;
  right: 28px;
  width: 48px;
  height: 48px;
  border-radius: 50%;
  background: linear-gradient(135deg, #1677ff, #4096ff);
  color: #fff;
  display: flex;
  align-items: center;
  justify-content: center;
  font-size: 20px;
  cursor: pointer;
  box-shadow: 0 4px 16px rgba(22, 119, 255, 0.4);
  z-index: 100;
  transition: all 0.25s ease;
  user-select: none;

  &:hover {
    transform: scale(1.1);
    box-shadow: 0 6px 24px rgba(22, 119, 255, 0.5);
  }

  &:active {
    transform: scale(0.95);
  }
}

五、最终效果

监控面板展示以下数据:

区域 内容
统计卡片 总 Tokens (含环比) / 总费用 / 请求次数 / 平均延迟 / 系统状态 + 预算进度
趋势图 每日 Token 使用趋势 (面积图) + 请求次数 (柱状图),双 Y 轴
分布 按模型分布 + 按端点分布的进度条
请求列表 可筛选/排序的最近请求,含状态、详情弹窗

总结

本文实现了一套轻量但完整的 AI 使用监控系统,核心思路是:

  1. 拦截 LLM 响应 — 从 response_metadata 提取精确数据,流式调用用估算
  2. 结构化存储 — 每次调用记录端点、模型、Token、延迟、费用到 MySQL
  3. 聚合 API — 提供多维度统计查询,支持时间范围、模型/端点分组、环比计算
  4. 可视化面板 - 渐变卡片 + SVG 图表 + 可操作表格,集成在聊天界面中

整个过程不需要 LangSmith 平台账号,纯自建实现。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐