AI Observability Agent 架构设计:高性能、可扩展的监控架构
·
架构设计:高性能、可扩展的监控架构
深入了解 AI Observability Agent 的系统架构和设计理念
整体架构图
┌─────────────────────────────────────────────────────────────────────────────┐
│ AI Observability Agent │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────┐ ┌───────┐ │
│ │ 系统采集器 │ │ 服务抓取器 │ │ OTLP 接收器 │ │ 插件 │ │
│ │ System Collectors│ │ Service Scrapers│ │ (gRPC/HTTP) │ │ 系统 │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────────┘ └──┬────┘ │
│ │ │ │ │ │
│ └────────────────────┼─────────────────────┼───────────────────┘ │
│ │ │ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ 数据处理层 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ ┌────────┐ │ │
│ │ │ 指标转换 │ │ 成本计算 │ │ 质量评估 │ │ 缓存 │ │ │
│ │ │ (OTLP→Prom) │ │ 引擎 │ │ (规则引擎) │ │ 批处理 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────────────┘ └────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────────────┘ │
│ │ │ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ 数据上报层 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ ┌────────┐ │ │
│ │ │ Remote │ │ 重试策略 │ │ 多端点故障转移 │ │ 本地 │ │ │
│ │ │ Write 客户端 │ │ (指数退避) │ │ (健康检查) │ │ 持久化 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────────────┘ └────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────────────┘ │
│ │ │ │
└────────────────────────────────┼─────────────────────┼────────────────────────────┘
↓ ↓
Prometheus/VictoriaMetrics 磁盘存储
核心模块介绍
1. 数据采集层
1.1 系统采集器 (System Collectors)
系统采集器负责采集主机或容器的系统指标:
#[async_trait]
pub trait Collector: Send + Sync {
fn name(&self) -> &str;
async fn collect(&self) -> Result<Vec<Sample>, CollectorError>;
fn enabled(&self) -> bool;
fn set_enabled(&mut self, enabled: bool);
}
特点:
- 插件式架构:每个采集器独立实现
Collectortrait - 并发采集:各采集器并行执行,互不影响
- 动态控制:支持运行时启用/禁用采集器
- 跨平台:支持 Linux、macOS、Windows
内置采集器:
- CPU 采集器:
/proc/stat(Linux)、sysctl(macOS)、WMI (Windows) - 内存采集器:
/proc/meminfo、sysctl、WMI - 磁盘采集器:
/proc/diskstats、I/O Kit (macOS)、WMI - 网络采集器:
/proc/net/dev、netstat、WMI - 负载采集器:
/proc/loadavg、sysctl、WMI
1.2 服务抓取器 (Service Scrapers)
服务抓取器定期抓取配置的 HTTP metrics 端点:
pub struct ServiceScraper {
id: String,
url: String,
interval: Duration,
timeout: Duration,
auth: Option<AuthConfig>,
extra_labels: HashMap<String, String>,
client: reqwest::Client,
}
功能:
- HTTP 抓取:使用 reqwest 异步客户端
- 认证支持:HTTP Basic Auth、Bearer Token
- 自定义标签:为每个 target 注入自定义标签
- 超时控制:可配置的抓取超时时间
- Prometheus Text 解析:解析标准 Prometheus text exposition 格式
1.3 OTLP 接收器 (OTLP Receiver)
OTLP 接收器接收 OpenTelemetry 协议数据:
pub struct OtlpReceiver {
grpc_endpoint: SocketAddr, // gRPC 接收端点 (默认 :4317)
http_endpoint: SocketAddr, // HTTP 接收端点 (默认 :4318)
metrics_converter: OtlpToPromConverter,
}
支持的协议:
- gRPC:端口 4317,高性能二进制协议
- HTTP:端口 4318,基于 JSON 的协议
指标类型:
- Gauge:单值指标
- Counter:累加计数器
- Histogram:直方图
- Summary:分位数
1.4 插件系统 (Plugin System)
插件系统支持通过插件扩展采集能力:
支持的插件类型:
- HTTP 插件:从 HTTP 端点获取 Prometheus 格式指标
- Exec 插件:执行命令获取输出
- Script 插件:执行脚本获取输出
2. 数据处理层
2.1 指标转换 (Metrics Converter)
负责将不同格式的指标转换为 Prometheus 格式:
- OTLP→Prom:将 OTLP 指标转换为 Prometheus Sample
- 格式标准化:统一标签格式和指标命名
- 标签处理:注入额外标签,处理高基数标签
2.2 成本计算引擎 (Cost Calculator)
根据 Token 使用量计算 AI API 成本:
pub struct CostCalculator {
pricing_table: HashMap<String, ModelPricing>,
}
pub struct ModelPricing {
input_cost_per_1k: f64,
output_cost_per_1k: f64,
cache_read_cost_per_1k: Option<f64>,
cache_write_cost_per_1k: Option<f64>,
}
功能:
- 内置定价表:支持 20+ 主流 AI 模型
- 自定义定价:支持用户自定义模型定价
- 实时计算:每次采集后立即计算成本
- 成本聚合:按模型、提供商、时间维度聚合
2.3 质量评估 (Quality Scoring)
评估 AI 服务质量:
pub struct QualityScorer {
rules: Vec<QualityRule>,
}
pub enum QualityRule {
ResponseTimeThreshold { max_seconds: f64 },
TokenEfficiency { max_tokens_per_request: u64 },
ErrorRateThreshold { max_rate: f64 },
}
规则类型:
- 响应时间规则:检查平均响应时间
- Token 效率规则:检查 output/input 比率
- 错误率规则:检查错误请求百分比
评分算法:
- 每个规则返回 0-100 分
- 最终得分 = Σ(规则分数 × 权重) / Σ权重
2.4 缓存与批处理 (Cache & Batching)
pub struct Batcher {
buffer: Arc<Mutex<Vec<Sample>>>,
capacity: usize,
max_samples_per_send: usize,
batch_send_deadline: Duration,
last_send: Arc<Mutex<Instant>>,
}
功能:
- 缓冲机制:临时存储采集的指标
- 批量发送:按配置的批次大小发送
- 背压控制:超过容量时淘汰最旧数据
- 定时发送:即使数据量不足也定期发送
3. 数据上报层
3.1 Remote Write 客户端 (Remote Write Client)
负责将数据推送到 Prometheus 或兼容的存储后端:
pub struct RemoteWriteClient {
endpoint: String,
auth: Option<AuthConfig>,
http_client: reqwest::Client,
batcher: Arc<Batcher>,
retry_policy: Arc<RwLock<RetryPolicy>>,
max_shards: usize,
}
HTTP 请求头:
| Header | Value |
|---|---|
| Content-Type | application/x-protobuf |
| Content-Encoding | snappy |
| X-Prometheus-Remote-Write-Version | 0.1.0 |
数据编码流程:
- Sample → TimeSeries(按 metric_name + labels 分组)
- TimeSeries → WriteRequest(protobuf 编码)
- WriteRequest → snappy 压缩
- HTTP POST 发送
3.2 重试策略 (Retry Policy)
pub struct RetryPolicy {
max_retries: u32,
min_backoff: Duration,
max_backoff: Duration,
current_retry: u32,
}
特点:
- 指数退避:
backoff = min_backoff × 2^retry + jitter - 抖动:随机 jitter 防止重试风暴
- 上限:backoff 不超过
max_backoff - 重置:成功后重置计数器
3.3 多端点故障转移 (Multi-endpoint Failover)
支持配置多个 Remote Write 端点,实现故障转移:
工作原理:
- 按优先级排序端点列表
- 优先使用最高优先级的健康端点
- 当前端点发送失败后自动标记为不健康
- 自动切换到下一个优先级的健康端点
- 端点恢复健康后自动切回
健康检查:
- 定期检查端点健康状态
- 失败一定次数后标记为不健康
- 恢复后重新标记为健康
3.4 本地持久化 (Local Persistence)
当网络发生故障时,将数据持久化到磁盘:
功能:
- 数据写入:网络故障时将数据写入磁盘文件
- 数据恢复:网络恢复后自动重发持久化数据
- 过期清理:定期清理过期数据
- 空间管理:限制单个文件大小和总存储空间
数据流设计
┌─────────────┐ ┌──────────────┐ ┌──────────────┐ ┌────────────────┐
│ 数据源 │────→│ 采集/接收 │────→│ 处理/转换 │────→│ 缓冲/批处理 │
│ (系统/服务/AI)│ │ (Collectors) │ │ (Converter) │ │ (Batcher) │
└─────────────┘ └──────────────┘ └──────────────┘ └────────────────┘
│
↓
┌────────────────┐ ┌──────────────┐ ┌──────────────┐ ┌────────────────┐
│ 远程存储 │←─────│ 网络发送 │←─────│ 编码/压缩 │←─────│ 重试/故障转移 │
│ (Prometheus) │ │ (HTTP) │ │ (Protobuf) │ │ (Failover) │
└────────────────┘ └──────────────┘ └──────────────┘ └────────────────┘
│
↓
┌────────────────┐
│ 本地持久化 │
│ (Disk) │
└────────────────┘
数据流步骤
-
采集阶段:
- 系统采集器读取系统信息
- 服务抓取器抓取 HTTP 端点
- OTLP 接收器接收 OpenTelemetry 数据
- 插件系统执行自定义采集
-
处理阶段:
- 指标转换:统一格式
- 成本计算:计算 AI 成本
- 质量评估:评估服务质量
- 标签处理:注入额外标签
-
缓冲阶段:
- 数据推入 Batcher 缓冲区
- 按容量或时间触发批量发送
-
编码阶段:
- 按 metric_name + labels 分组为 TimeSeries
- Protobuf 编码 WriteRequest
- Snappy 压缩
-
发送阶段:
- HTTP POST 发送到 Remote Write 端点
- 失败时指数退避重试
- 网络故障时写入本地存储
- 多端点故障转移
异步并发模型
Agent 基于 tokio 异步运行时构建,采用多线程并发模型:
核心并发策略
-
采集并发:
- 每个采集器独立 tokio::spawn
- 采集失败不影响其他采集器
-
抓取并发:
- 多个 ServiceScraper 并发抓取
- 超时控制防止阻塞
-
发送并发:
- 分片并发发送(max_shards 控制)
- Semaphore 限制并发数
-
后台任务:
- flush 循环:定期检查缓冲区
- 健康检查:定期检查端点状态
- 过期清理:定期清理本地存储
线程模型
┌─────────────────────┐
│ tokio::Runtime │
│ (多线程调度器) │
├─────────────────────┤
│ │
│ ┌────────────────┐ │
│ │ 主线程 │ │
│ │ (信号处理) │ │
│ └────────────────┘ │
│ │
│ ┌────────────────┐ │
│ │ 采集任务线程池 │ │
│ │ (采集器/抓取器) │ │
│ └────────────────┘ │
│ │
│ ┌────────────────┐ │
│ │ 发送任务线程池 │ │
│ │ (Remote Write) │ │
│ └────────────────┘ │
│ │
│ ┌────────────────┐ │
│ │ HTTP 服务线程池 │ │
│ │ (API 接口) │ │
│ └────────────────┘ │
└─────────────────────┘
容错与高可用设计
1. 采集器隔离
- 每个采集器独立运行,互不影响
- 单个采集器失败只记录警告日志
- 采集失败不会导致整个 Agent 崩溃
2. 网络容错
- 重试机制:指数退避 + 抖动
- 故障转移:多端点自动切换
- 本地持久化:网络故障数据不丢失
- 背压控制:防止内存无限增长
3. 资源控制
- 内存限制:Batcher 容量上限
- CPU 控制:异步 IO,避免阻塞
- 网络限制:连接池 + 超时控制
4. 优雅关闭
- 收到 SIGINT/SIGTERM 信号时
- 调用
flush_all()将缓冲区数据全部发送 - 等待所有任务完成后退出
性能优化策略
1. 内存优化
- 零拷贝:使用 bytes 库实现零拷贝操作
- 批量处理:减少内存分配次数
- 压缩传输:Snappy 压缩减小网络传输量
- 容量限制:Batcher 防止内存无限增长
2. 网络优化
- HTTP 连接复用:reqwest 内部连接池
- 批量发送:减少网络请求次数
- 并发发送:分片并发提高吞吐量
- 压缩传输:Snappy 压缩减小传输体积
3. 计算优化
- 异步 IO:非阻塞网络操作
- 并发采集:多采集器并行执行
- 高效编码:prost 实现高性能 Protobuf 编码
- 缓存机制:避免重复计算
4. 存储优化
- 本地持久化:磁盘存储格式优化
- 过期清理:定期清理过期数据
- 空间限制:防止磁盘空间耗尽
扩展性设计
1. 插件式架构
通过实现 Collector trait 即可添加新的采集器:
#[async_trait]
impl Collector for MyCollector {
fn name(&self) -> &str { "my_collector" }
async fn collect(&self) -> Result<Vec<Sample>, CollectorError> { ... }
fn enabled(&self) -> bool { self.enabled }
fn set_enabled(&mut self, enabled: bool) { self.enabled = enabled; }
}
2. 配置扩展性
- YAML 配置:支持复杂配置结构
- 环境变量:支持通过环境变量覆盖配置
- API 控制:运行时动态修改配置
3. 协议扩展性
- OTLP 协议:支持 OpenTelemetry 生态
- Prometheus 协议:兼容 Prometheus 生态
- 自定义插件:支持扩展其他协议
4. 未来扩展方向
- Service Discovery:集成 Kubernetes/Consul 服务发现
- 配置热重载:SIGHUP 信号触发配置重载
- Kubernetes CRD:通过自定义资源管理 Agent 配置
- 高级告警:集成 Alertmanager
- 多租户支持:支持多租户隔离
技术栈
| 类别 | 技术 | 说明 |
|---|---|---|
| 语言 | Rust 2021 Edition | 内存安全、零成本抽象、无 GC |
| 异步运行时 | tokio | 多线程调度器、IO 驱动 |
| HTTP 客户端 | reqwest | 异步 HTTP,支持 TLS |
| HTTP 服务端 | axum | 轻量级异步 Web 框架 |
| 序列化 | serde + serde_yaml | YAML 配置解析 |
| Protobuf | prost | Remote Write 协议编码 |
| 压缩 | snap | snappy 压缩算法 |
| 日志 | tracing + tracing-subscriber | 结构化异步日志 |
| 错误处理 | thiserror | 派生宏错误类型 |
| gRPC | tonic | OTLP gRPC 服务端 |
| OpenTelemetry | opentelemetry-proto | OTLP 协议定义 |
总结
AI Observability Agent 的架构设计体现了以下核心原则:
- 高性能:Rust + tokio 异步模型,低资源消耗
- 可靠性:多层容错机制,数据不丢失
- 可扩展性:插件式架构,易于扩展
- 灵活性:多协议支持,适应不同场景
- 易用性:开箱即用,配置简单
这种架构设计使 Agent 能够在各种环境下稳定运行,为 AI 时代的监控需求提供了强大的支持。
下一步
- OTLP 协议支持 - OpenTelemetry 集成详解
- AI 采集器 - Claude Code、OpenAI、LiteLLM 监控
- Remote Write - 高效数据推送详解
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)