架构设计:高性能、可扩展的监控架构

深入了解 AI Observability Agent 的系统架构和设计理念

整体架构图

┌─────────────────────────────────────────────────────────────────────────────┐
│                              AI Observability Agent                          │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────────┐  ┌───────┐ │
│  │ 系统采集器      │  │ 服务抓取器      │  │ OTLP 接收器        │  │ 插件  │ │
│  │ System Collectors│  │ Service Scrapers│  │ (gRPC/HTTP)        │  │ 系统  │ │
│  └────────┬────────┘  └────────┬────────┘  └────────┬────────────┘  └──┬────┘ │
│           │                    │                     │                   │      │
│           └────────────────────┼─────────────────────┼───────────────────┘      │
│                                │                     │                            │
│  ┌─────────────────────────────────────────────────────────────────────────────┐ │
│  │                          数据处理层                                          │ │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  ┌────────┐ │ │
│  │  │ 指标转换    │  │ 成本计算    │  │ 质量评估               │  │ 缓存   │ │ │
│  │  │ (OTLP→Prom) │  │ 引擎        │  │ (规则引擎)             │  │ 批处理 │ │ │
│  │  └─────────────┘  └─────────────┘  └─────────────────────────┘  └────────┘ │ │
│  └─────────────────────────────────────────────────────────────────────────────┘ │
│                                │                     │                            │
│  ┌─────────────────────────────────────────────────────────────────────────────┐ │
│  │                          数据上报层                                          │ │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  ┌────────┐ │ │
│  │  │ Remote      │  │ 重试策略    │  │ 多端点故障转移         │  │ 本地   │ │ │
│  │  │ Write 客户端 │  │ (指数退避)  │  │ (健康检查)             │  │ 持久化 │ │ │
│  │  └─────────────┘  └─────────────┘  └─────────────────────────┘  └────────┘ │ │
│  └─────────────────────────────────────────────────────────────────────────────┘ │
│                                │                     │                            │
└────────────────────────────────┼─────────────────────┼────────────────────────────┘
                                 ↓                     ↓
                          Prometheus/VictoriaMetrics  磁盘存储

核心模块介绍

1. 数据采集层

1.1 系统采集器 (System Collectors)

系统采集器负责采集主机或容器的系统指标:

#[async_trait]
pub trait Collector: Send + Sync {
    fn name(&self) -> &str;
    async fn collect(&self) -> Result<Vec<Sample>, CollectorError>;
    fn enabled(&self) -> bool;
    fn set_enabled(&mut self, enabled: bool);
}

特点

  • 插件式架构:每个采集器独立实现 Collector trait
  • 并发采集:各采集器并行执行,互不影响
  • 动态控制:支持运行时启用/禁用采集器
  • 跨平台:支持 Linux、macOS、Windows

内置采集器

  • CPU 采集器:/proc/stat (Linux)、sysctl (macOS)、WMI (Windows)
  • 内存采集器:/proc/meminfosysctl、WMI
  • 磁盘采集器:/proc/diskstats、I/O Kit (macOS)、WMI
  • 网络采集器:/proc/net/devnetstat、WMI
  • 负载采集器:/proc/loadavgsysctl、WMI
1.2 服务抓取器 (Service Scrapers)

服务抓取器定期抓取配置的 HTTP metrics 端点:

pub struct ServiceScraper {
    id: String,
    url: String,
    interval: Duration,
    timeout: Duration,
    auth: Option<AuthConfig>,
    extra_labels: HashMap<String, String>,
    client: reqwest::Client,
}

功能

  • HTTP 抓取:使用 reqwest 异步客户端
  • 认证支持:HTTP Basic Auth、Bearer Token
  • 自定义标签:为每个 target 注入自定义标签
  • 超时控制:可配置的抓取超时时间
  • Prometheus Text 解析:解析标准 Prometheus text exposition 格式
1.3 OTLP 接收器 (OTLP Receiver)

OTLP 接收器接收 OpenTelemetry 协议数据:

pub struct OtlpReceiver {
    grpc_endpoint: SocketAddr,      // gRPC 接收端点 (默认 :4317)
    http_endpoint: SocketAddr,      // HTTP 接收端点 (默认 :4318)
    metrics_converter: OtlpToPromConverter,
}

支持的协议

  • gRPC:端口 4317,高性能二进制协议
  • HTTP:端口 4318,基于 JSON 的协议

指标类型

  • Gauge:单值指标
  • Counter:累加计数器
  • Histogram:直方图
  • Summary:分位数
1.4 插件系统 (Plugin System)

插件系统支持通过插件扩展采集能力:

支持的插件类型

  • HTTP 插件:从 HTTP 端点获取 Prometheus 格式指标
  • Exec 插件:执行命令获取输出
  • Script 插件:执行脚本获取输出

2. 数据处理层

2.1 指标转换 (Metrics Converter)

负责将不同格式的指标转换为 Prometheus 格式:

  • OTLP→Prom:将 OTLP 指标转换为 Prometheus Sample
  • 格式标准化:统一标签格式和指标命名
  • 标签处理:注入额外标签,处理高基数标签
2.2 成本计算引擎 (Cost Calculator)

根据 Token 使用量计算 AI API 成本:

pub struct CostCalculator {
    pricing_table: HashMap<String, ModelPricing>,
}

pub struct ModelPricing {
    input_cost_per_1k: f64,
    output_cost_per_1k: f64,
    cache_read_cost_per_1k: Option<f64>,
    cache_write_cost_per_1k: Option<f64>,
}

功能

  • 内置定价表:支持 20+ 主流 AI 模型
  • 自定义定价:支持用户自定义模型定价
  • 实时计算:每次采集后立即计算成本
  • 成本聚合:按模型、提供商、时间维度聚合
2.3 质量评估 (Quality Scoring)

评估 AI 服务质量:

pub struct QualityScorer {
    rules: Vec<QualityRule>,
}

pub enum QualityRule {
    ResponseTimeThreshold { max_seconds: f64 },
    TokenEfficiency { max_tokens_per_request: u64 },
    ErrorRateThreshold { max_rate: f64 },
}

规则类型

  • 响应时间规则:检查平均响应时间
  • Token 效率规则:检查 output/input 比率
  • 错误率规则:检查错误请求百分比

评分算法

  • 每个规则返回 0-100 分
  • 最终得分 = Σ(规则分数 × 权重) / Σ权重
2.4 缓存与批处理 (Cache & Batching)
pub struct Batcher {
    buffer: Arc<Mutex<Vec<Sample>>>,
    capacity: usize,
    max_samples_per_send: usize,
    batch_send_deadline: Duration,
    last_send: Arc<Mutex<Instant>>,
}

功能

  • 缓冲机制:临时存储采集的指标
  • 批量发送:按配置的批次大小发送
  • 背压控制:超过容量时淘汰最旧数据
  • 定时发送:即使数据量不足也定期发送

3. 数据上报层

3.1 Remote Write 客户端 (Remote Write Client)

负责将数据推送到 Prometheus 或兼容的存储后端:

pub struct RemoteWriteClient {
    endpoint: String,
    auth: Option<AuthConfig>,
    http_client: reqwest::Client,
    batcher: Arc<Batcher>,
    retry_policy: Arc<RwLock<RetryPolicy>>,
    max_shards: usize,
}

HTTP 请求头

Header Value
Content-Type application/x-protobuf
Content-Encoding snappy
X-Prometheus-Remote-Write-Version 0.1.0

数据编码流程

  1. Sample → TimeSeries(按 metric_name + labels 分组)
  2. TimeSeries → WriteRequest(protobuf 编码)
  3. WriteRequest → snappy 压缩
  4. HTTP POST 发送
3.2 重试策略 (Retry Policy)
pub struct RetryPolicy {
    max_retries: u32,
    min_backoff: Duration,
    max_backoff: Duration,
    current_retry: u32,
}

特点

  • 指数退避backoff = min_backoff × 2^retry + jitter
  • 抖动:随机 jitter 防止重试风暴
  • 上限:backoff 不超过 max_backoff
  • 重置:成功后重置计数器
3.3 多端点故障转移 (Multi-endpoint Failover)

支持配置多个 Remote Write 端点,实现故障转移:

工作原理

  1. 按优先级排序端点列表
  2. 优先使用最高优先级的健康端点
  3. 当前端点发送失败后自动标记为不健康
  4. 自动切换到下一个优先级的健康端点
  5. 端点恢复健康后自动切回

健康检查

  • 定期检查端点健康状态
  • 失败一定次数后标记为不健康
  • 恢复后重新标记为健康
3.4 本地持久化 (Local Persistence)

当网络发生故障时,将数据持久化到磁盘:

功能

  • 数据写入:网络故障时将数据写入磁盘文件
  • 数据恢复:网络恢复后自动重发持久化数据
  • 过期清理:定期清理过期数据
  • 空间管理:限制单个文件大小和总存储空间

数据流设计

┌─────────────┐     ┌──────────────┐     ┌──────────────┐     ┌────────────────┐
│  数据源      │────→│  采集/接收    │────→│  处理/转换    │────→│  缓冲/批处理   │
│  (系统/服务/AI)│     │  (Collectors) │     │  (Converter)  │     │  (Batcher)    │
└─────────────┘     └──────────────┘     └──────────────┘     └────────────────┘
                                                                     │
                                                                     ↓
┌────────────────┐     ┌──────────────┐     ┌──────────────┐     ┌────────────────┐
│  远程存储      │←─────│  网络发送    │←─────│  编码/压缩    │←─────│  重试/故障转移 │
│  (Prometheus)  │     │  (HTTP)      │     │  (Protobuf)   │     │  (Failover)   │
└────────────────┘     └──────────────┘     └──────────────┘     └────────────────┘
                                                                     │
                                                                     ↓
                                                         ┌────────────────┐
                                                         │  本地持久化    │
                                                         │  (Disk)        │
                                                         └────────────────┘

数据流步骤

  1. 采集阶段

    • 系统采集器读取系统信息
    • 服务抓取器抓取 HTTP 端点
    • OTLP 接收器接收 OpenTelemetry 数据
    • 插件系统执行自定义采集
  2. 处理阶段

    • 指标转换:统一格式
    • 成本计算:计算 AI 成本
    • 质量评估:评估服务质量
    • 标签处理:注入额外标签
  3. 缓冲阶段

    • 数据推入 Batcher 缓冲区
    • 按容量或时间触发批量发送
  4. 编码阶段

    • 按 metric_name + labels 分组为 TimeSeries
    • Protobuf 编码 WriteRequest
    • Snappy 压缩
  5. 发送阶段

    • HTTP POST 发送到 Remote Write 端点
    • 失败时指数退避重试
    • 网络故障时写入本地存储
    • 多端点故障转移

异步并发模型

Agent 基于 tokio 异步运行时构建,采用多线程并发模型:

核心并发策略

  1. 采集并发

    • 每个采集器独立 tokio::spawn
    • 采集失败不影响其他采集器
  2. 抓取并发

    • 多个 ServiceScraper 并发抓取
    • 超时控制防止阻塞
  3. 发送并发

    • 分片并发发送(max_shards 控制)
    • Semaphore 限制并发数
  4. 后台任务

    • flush 循环:定期检查缓冲区
    • 健康检查:定期检查端点状态
    • 过期清理:定期清理本地存储

线程模型

┌─────────────────────┐
│ tokio::Runtime      │
│ (多线程调度器)      │
├─────────────────────┤
│                     │
│  ┌────────────────┐ │
│  │ 主线程          │ │
│  │ (信号处理)      │ │
│  └────────────────┘ │
│                     │
│  ┌────────────────┐ │
│  │ 采集任务线程池   │ │
│  │ (采集器/抓取器)  │ │
│  └────────────────┘ │
│                     │
│  ┌────────────────┐ │
│  │ 发送任务线程池   │ │
│  │ (Remote Write)  │ │
│  └────────────────┘ │
│                     │
│  ┌────────────────┐ │
│  │ HTTP 服务线程池  │ │
│  │ (API 接口)       │ │
│  └────────────────┘ │
└─────────────────────┘

容错与高可用设计

1. 采集器隔离

  • 每个采集器独立运行,互不影响
  • 单个采集器失败只记录警告日志
  • 采集失败不会导致整个 Agent 崩溃

2. 网络容错

  • 重试机制:指数退避 + 抖动
  • 故障转移:多端点自动切换
  • 本地持久化:网络故障数据不丢失
  • 背压控制:防止内存无限增长

3. 资源控制

  • 内存限制:Batcher 容量上限
  • CPU 控制:异步 IO,避免阻塞
  • 网络限制:连接池 + 超时控制

4. 优雅关闭

  • 收到 SIGINT/SIGTERM 信号时
  • 调用 flush_all() 将缓冲区数据全部发送
  • 等待所有任务完成后退出

性能优化策略

1. 内存优化

  • 零拷贝:使用 bytes 库实现零拷贝操作
  • 批量处理:减少内存分配次数
  • 压缩传输:Snappy 压缩减小网络传输量
  • 容量限制:Batcher 防止内存无限增长

2. 网络优化

  • HTTP 连接复用:reqwest 内部连接池
  • 批量发送:减少网络请求次数
  • 并发发送:分片并发提高吞吐量
  • 压缩传输:Snappy 压缩减小传输体积

3. 计算优化

  • 异步 IO:非阻塞网络操作
  • 并发采集:多采集器并行执行
  • 高效编码:prost 实现高性能 Protobuf 编码
  • 缓存机制:避免重复计算

4. 存储优化

  • 本地持久化:磁盘存储格式优化
  • 过期清理:定期清理过期数据
  • 空间限制:防止磁盘空间耗尽

扩展性设计

1. 插件式架构

通过实现 Collector trait 即可添加新的采集器:

#[async_trait]
impl Collector for MyCollector {
    fn name(&self) -> &str { "my_collector" }
    async fn collect(&self) -> Result<Vec<Sample>, CollectorError> { ... }
    fn enabled(&self) -> bool { self.enabled }
    fn set_enabled(&mut self, enabled: bool) { self.enabled = enabled; }
}

2. 配置扩展性

  • YAML 配置:支持复杂配置结构
  • 环境变量:支持通过环境变量覆盖配置
  • API 控制:运行时动态修改配置

3. 协议扩展性

  • OTLP 协议:支持 OpenTelemetry 生态
  • Prometheus 协议:兼容 Prometheus 生态
  • 自定义插件:支持扩展其他协议

4. 未来扩展方向

  • Service Discovery:集成 Kubernetes/Consul 服务发现
  • 配置热重载:SIGHUP 信号触发配置重载
  • Kubernetes CRD:通过自定义资源管理 Agent 配置
  • 高级告警:集成 Alertmanager
  • 多租户支持:支持多租户隔离

技术栈

类别 技术 说明
语言 Rust 2021 Edition 内存安全、零成本抽象、无 GC
异步运行时 tokio 多线程调度器、IO 驱动
HTTP 客户端 reqwest 异步 HTTP,支持 TLS
HTTP 服务端 axum 轻量级异步 Web 框架
序列化 serde + serde_yaml YAML 配置解析
Protobuf prost Remote Write 协议编码
压缩 snap snappy 压缩算法
日志 tracing + tracing-subscriber 结构化异步日志
错误处理 thiserror 派生宏错误类型
gRPC tonic OTLP gRPC 服务端
OpenTelemetry opentelemetry-proto OTLP 协议定义

总结

AI Observability Agent 的架构设计体现了以下核心原则:

  1. 高性能:Rust + tokio 异步模型,低资源消耗
  2. 可靠性:多层容错机制,数据不丢失
  3. 可扩展性:插件式架构,易于扩展
  4. 灵活性:多协议支持,适应不同场景
  5. 易用性:开箱即用,配置简单

这种架构设计使 Agent 能够在各种环境下稳定运行,为 AI 时代的监控需求提供了强大的支持。

下一步

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐