"用户反馈AI助手回答质量下降,但不知道是哪出了问题——是Prompt改了?模型升级了?还是RAG检索出了问题?"这是AI应用运维中最令人头疼的场景。
传统的应用可观测性(Logs、Metrics、Traces)在面对LLM应用时显得力不从心。LLM调用的非确定性、多步骤推理链、外部工具调用,都需要新的可观测性范式。## 一、LLM应用可观测性的特殊挑战### 1.1 与传统应用的根本差异| 维度 | 传统应用 | LLM应用 ||------|---------|---------|| 输出确定性 | 确定性(相同输入→相同输出) | 非确定性(相同输入→不同输出) || 质量度量 | 二进制(正确/错误) | 连续谱(好/一般/差) || 调试方式 | 堆栈跟踪 + 日志 | 需要理解自然语言推理链 || 性能瓶颈 | CPU/内存/IO | Token生成速度 + 上下文窗口 || 成本模型 | 固定(服务器成本) | 可变(按Token计费) || 安全边界 | 明确的输入验证 | 模糊的自然语言攻击面 |### 1.2 核心观测维度一个LLM应用的观测需要覆盖四个维度:“用户反馈AI助手回答质量下降,但不知道是哪出了问题——是Prompt改了?模型升级了?还是RAG检索出了问题?“这是AI应用运维中最令人头疼的场景。
传统的应用可观测性(Logs、Metrics、Traces)在面对LLM应用时显得力不从心。LLM调用的非确定性、多步骤推理链、外部工具调用,都需要新的可观测性范式。## 一、LLM应用可观测性的特殊挑战### 1.1 与传统应用的根本差异| 维度 | 传统应用 | LLM应用 ||------|---------|---------|| 输出确定性 | 确定性(相同输入→相同输出) | 非确定性(相同输入→不同输出) || 质量度量 | 二进制(正确/错误) | 连续谱(好/一般/差) || 调试方式 | 堆栈跟踪 + 日志 | 需要理解自然语言推理链 || 性能瓶颈 | CPU/内存/IO | Token生成速度 + 上下文窗口 || 成本模型 | 固定(服务器成本) | 可变(按Token计费) || 安全边界 | 明确的输入验证 | 模糊的自然语言攻击面 |### 1.2 核心观测维度一个LLM应用的观测需要覆盖四个维度:text┌─────────────────────────────────────────────────────┐│ LLM 可观测性四维模型 │├───────────────┬───────────────┬─────────────────────┤│ 质量观测 │ 性能观测 │ 成本观测 ││ • 回答准确性 │ • TTFT │ • Token消耗 ││ • 幻觉检测 │ • 端到端延迟 │ • 模型调用费用 ││ • 用户满意度 │ • 吞吐量 │ • 缓存命中率 ││ • 任务完成率 │ • 错误率 │ • ROI分析 │├───────────────┴───────────────┴─────────────────────┤│ 安全观测 ││ • Prompt注入检测 • 敏感信息泄露 • 滥用检测 │└─────────────────────────────────────────────────────┘text## 二、Trace体系:LLM调用的全链路追踪### 2.1 Span设计借鉴OpenTelemetry的Span模型,为LLM应用定义专用的Span类型:pythonfrom dataclasses import dataclass, fieldfrom datetime import datetimefrom typing import Optional, Dict, Any, List@dataclassclass LLMSpan: """LLM调用的追踪单元""" span_id: str trace_id: str parent_span_id: Optional[str] # 调用信息 span_type: str # 'llm_call', 'retrieval', 'tool_call', 'agent_step' model_name: Optional[str] = None provider: Optional[str] = None # 'openai', 'anthropic', 'self-hosted' # 输入输出 input_messages: List[Dict] = field(default_factory=list) output_content: Optional[str] = None input_tokens: int = 0 output_tokens: int = 0 # 时间信息 start_time: Optional[datetime] = None end_time: Optional[datetime] = None first_token_time: Optional[datetime] = None # TTFT # 元数据 tags: Dict[str, str] = field(default_factory=dict) error: Optional[str] = None @property def duration_ms(self) -> float: if self.start_time and self.end_time: return (self.end_time - self.start_time).total_seconds() * 1000 return 0 @property def ttft_ms(self) -> float: if self.start_time and self.first_token_time: return (self.first_token_time - self.start_time).total_seconds() * 1000 return 0text### 2.2 自动埋点装饰器pythonimport functoolsimport uuidimport timeclass LLMTracer: def __init__(self): self.spans: Dict[str, LLMSpan] = {} self.current_trace_id: Optional[str] = None def trace(self, span_type: str, model_name: str = None): """装饰器:自动为LLM调用创建追踪Span""" def decorator(func): @functools.wraps(func) async def wrapper(*args, **kwargs): span = LLMSpan( span_id=str(uuid.uuid4()), trace_id=self.current_trace_id or str(uuid.uuid4()), parent_span_id=self._current_span_id(), span_type=span_type, model_name=model_name ) span.start_time = datetime.now() self.spans[span.span_id] = span try: result = await func(*args, **kwargs) # 自动提取Token使用量 if hasattr(result, 'usage'): span.input_tokens = result.usage.input_tokens span.output_tokens = result.usage.output_tokens span.output_content = str(result)[:1000] # 截断存储 return result except Exception as e: span.error = str(e) raise finally: span.end_time = datetime.now() await self._export_span(span) return wrapper return decorator# 使用示例tracer = LLMTracer()@tracer.trace(span_type="llm_call", model_name="claude-sonnet-4")async def call_llm(messages): return await anthropic_client.messages.create( model="claude-sonnet-4-20250514", messages=messages )text### 2.3 Agent执行链的追踪Agent的执行涉及多步推理和工具调用,需要追踪完整的决策链:pythonclass AgentTracer: def __init__(self, tracer: LLMTracer): self.tracer = tracer async def trace_agent_run(self, agent, task: str) -> AgentTrace: trace_id = str(uuid.uuid4()) self.tracer.current_trace_id = trace_id trace = AgentTrace(trace_id=trace_id, task=task, steps=[]) async for step in agent.run(task): agent_step = AgentStep( step_number=len(trace.steps) + 1, thought=step.thought, action=step.action, action_input=step.action_input, observation=step.observation, spans=self.tracer.get_trace_spans(trace_id) ) trace.steps.append(agent_step) # 计算总体指标 trace.total_duration_ms = sum(s.duration_ms for s in trace.all_spans()) trace.total_tokens = sum(s.input_tokens + s.output_tokens for s in trace.all_spans()) trace.total_llm_calls = len([s for s in trace.all_spans() if s.span_type == "llm_call"]) return tracetext## 三、Metrics体系:量化LLM应用的健康状态### 3.1 核心指标定义pythonclass LLMMetricsCollector: def __init__(self): # 性能指标 self.ttft_p50 = Histogram() # TTFT P50 self.ttft_p99 = Histogram() # TTFT P99 self.latency_p50 = Histogram() # 端到端延迟P50 self.latency_p99 = Histogram() # 端到端延迟P99 self.throughput = Gauge() # 每秒请求数 # 质量指标 self.error_rate = Gauge() # 错误率 self.retry_rate = Gauge() # 重试率 self.hallucination_rate = Gauge() # 幻觉率 # 成本指标 self.tokens_per_request = Histogram() # 每次请求Token数 self.cost_per_request = Histogram() # 每次请求成本 self.cache_hit_rate = Gauge() # 缓存命中率 # 安全指标 self.blocked_requests = Counter() # 被拦截的请求 self.pii_leak_events = Counter() # PII泄露事件 def record_llm_call(self, span: LLMSpan): """记录一次LLM调用的指标""" self.ttft_p50.observe(span.ttft_ms) self.ttft_p99.observe(span.ttft_ms) self.latency_p50.observe(span.duration_ms) self.latency_p99.observe(span.duration_ms) self.tokens_per_request.observe(span.input_tokens + span.output_tokens) if span.error: self.error_rate.inc()text### 3.2 业务级质量指标技术指标之外,还需要业务视角的质量指标:pythonclass BusinessQualityMetrics: def __init__(self): self.metrics = {} def calculate_task_completion(self, conversation: Conversation) -> float: """计算任务完成率""" # 判断用户意图是否得到满足 signals = [ conversation.user_did_not_rephrase(), # 用户没有重新提问 conversation.user_did_not_escalate(), # 用户没有要求转人工 conversation.user_continued_engagement(), # 用户继续使用 not conversation.ended_with_error(), # 没有以错误结束 ] return sum(signals) / len(signals) def calculate_helpfulness_score(self, conversation: Conversation) -> float: """计算有用性得分""" base_score = 0.5 # 正面信号加分 if conversation.user_copied_response(): base_score += 0.2 if conversation.user_shared_response(): base_score += 0.15 if conversation.user_gave_thumbs_up(): base_score += 0.15 # 负面信号减分 if conversation.user_regenerated(): base_score -= 0.2 if conversation.user_gave_thumbs_down(): base_score -= 0.3 return max(0.0, min(1.0, base_score))text## 四、日志体系:结构化与非结构化的融合### 4.1 结构化日志格式pythonimport structloglogger = structlog.get_logger()class LLMLogger: @staticmethod def log_llm_call( trace_id: str, model: str, messages: List[Dict], response: str, usage: Dict, duration_ms: float ): logger.info( "llm_call_completed", trace_id=trace_id, model=model, message_count=len(messages), input_tokens=usage.get("input_tokens", 0), output_tokens=usage.get("output_tokens", 0), total_tokens=usage.get("total_tokens", 0), duration_ms=duration_ms, first_message_role=messages[0].get("role") if messages else None, last_message_role=messages[-1].get("role") if messages else None, response_preview=response[:200] # 截断存储 ) @staticmethod def log_rag_retrieval( trace_id: str, query: str, retrieved_docs: int, retrieval_time_ms: float, top_scores: List[float] ): logger.info( "rag_retrieval_completed", trace_id=trace_id, query_preview=query[:100], docs_retrieved=retrieved_docs, retrieval_time_ms=retrieval_time_ms, top_score=max(top_scores) if top_scores else 0, avg_score=sum(top_scores) / len(top_scores) if top_scores else 0 )text### 4.2 日志的查询与分析有了结构化日志,可以用SQL-like查询进行问题排查:sql-- 查找TTFT异常的LLM调用SELECT trace_id, model, input_tokens, ttft_msFROM llm_callsWHERE ttft_ms > 3000 AND timestamp > NOW() - INTERVAL '1 hour'ORDER BY ttft_ms DESCLIMIT 20;-- 查找成本最高的用户会话SELECT trace_id, SUM(input_tokens + output_tokens) as total_tokens, SUM(cost) as total_costFROM llm_callsWHERE timestamp > NOW() - INTERVAL '24 hours'GROUP BY trace_idORDER BY total_cost DESCLIMIT 10;-- 查找错误率突增的模型SELECT model, COUNT(*) as total_calls, SUM(CASE WHEN error IS NOT NULL THEN 1 ELSE 0 END) as errors, SUM(CASE WHEN error IS NOT NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) as error_rateFROM llm_callsWHERE timestamp > NOW() - INTERVAL '1 hour'GROUP BY modelHAVING error_rate > 5ORDER BY error_rate DESC;text## 五、诊断工具:从告警到根因### 5.1 智能告警规则pythonclass LLMAlertManager: def __init__(self): self.rules = [ LatencySpikeRule(threshold_ms=5000, window_minutes=5), ErrorRateRule(threshold=0.05, window_minutes=5), CostAnomalyRule(z_score_threshold=3.0), QualityDegradationRule(satisfaction_drop_threshold=0.1), HallucinationSpikeRule(rate_threshold=0.15), ] async def evaluate(self, metrics: LLMMetrics) -> List[Alert]: alerts = [] for rule in self.rules: if await rule.check(metrics): alerts.append(rule.create_alert(metrics)) return alertsclass CostAnomalyRule: def __init__(self, z_score_threshold=3.0): self.threshold = z_score_threshold async def check(self, metrics: LLMMetrics) -> bool: # 获取最近1小时的成本数据 recent_costs = metrics.get_cost_series(minutes=60) historical_mean = metrics.get_cost_mean(days=7, hour=datetime.now().hour) historical_std = metrics.get_cost_std(days=7, hour=datetime.now().hour) current_cost = sum(recent_costs) z_score = (current_cost - historical_mean) / historical_std return abs(z_score) > self.thresholdtext### 5.2 根因分析辅助pythonclass RootCauseAnalyzer: async def analyze(self, trace_id: str) -> RootCauseReport: trace = await self.load_trace(trace_id) findings = [] # 检查1:TTFT是否异常 for span in trace.spans: if span.ttft_ms > 3000: findings.append(Finding( severity="HIGH", category="latency", description=f"TTFT异常:{span.ttft_ms}ms(模型:{span.model_name})", suggestion="检查模型服务是否过载,考虑切换到更快的模型或增加并发" )) # 检查2:Token消耗是否异常 avg_tokens = statistics.mean([s.input_tokens for s in trace.spans]) if avg_tokens > 5000: findings.append(Finding( severity="MEDIUM", category="cost", description=f"输入Token过多:平均{avg_tokens}/请求", suggestion="检查Prompt是否过于冗长,启用Prompt压缩或选择性上下文注入" )) # 检查3:是否有重试循环 retry_count = sum(1 for s in trace.spans if s.tags.get("is_retry")) if retry_count > 2: findings.append(Finding( severity="HIGH", category="reliability", description=f"检测到重试循环:{retry_count}次重试", suggestion="检查工具调用是否有幂等性问题,增加超时和熔断机制" )) return RootCauseReport(trace_id=trace_id, findings=findings)text## 六、开源工具生态2026年主流的LLM可观测性工具:| 工具 | 特点 | 适用场景 ||------|------|---------|| LangSmith | LangChain官方,深度集成 | LangChain/LangGraph用户 || Arize Phoenix | 开源,支持自托管 | 需要数据隐私的团队 || Helicone | 轻量级,专注网关层 | API代理层观测 || Weights & Biases | 实验追踪+生产监控 | 从训练到推理的全流程 || OpenLLMetry | OpenTelemetry标准 | 希望统一观测标准的团队 |## 七、最佳实践总结1. 从第一天就建立观测体系:不要等出了问题再补2. Trace是核心:所有指标和日志都应该关联到Trace3. 关注业务指标:技术指标最终要服务于用户体验4. 建立基线:正常状态长什么样,才能识别异常5. 采样策略:高流量场景下使用智能采样,保留异常Trace6. 隐私与成本平衡:不要记录完整的用户输入输出,使用截断和脱敏—LLM应用的可观测性不是锦上添花,而是生产就绪的必要条件。投入观测体系建设,就是在为你的AI应用购买一份"质量保险”——当问题发生时,你能快速定位、精准修复,而不是在黑暗中摸索。text┌─────────────────────────────────────────────────────┐│ LLM 可观测性四维模型 │├───────────────┬───────────────┬─────────────────────┤│ 质量观测 │ 性能观测 │ 成本观测 ││ • 回答准确性 │ • TTFT │ • Token消耗 ││ • 幻觉检测 │ • 端到端延迟 │ • 模型调用费用 ││ • 用户满意度 │ • 吞吐量 │ • 缓存命中率 ││ • 任务完成率 │ • 错误率 │ • ROI分析 │├───────────────┴───────────────┴─────────────────────┤│ 安全观测 ││ • Prompt注入检测 • 敏感信息泄露 • 滥用检测 │└─────────────────────────────────────────────────────┘text## 二、Trace体系:LLM调用的全链路追踪### 2.1 Span设计借鉴OpenTelemetry的Span模型,为LLM应用定义专用的Span类型:pythonfrom dataclasses import dataclass, fieldfrom datetime import datetimefrom typing import Optional, Dict, Any, List@dataclassclass LLMSpan: """LLM调用的追踪单元""" span_id: str trace_id: str parent_span_id: Optional[str] # 调用信息 span_type: str # 'llm_call', 'retrieval', 'tool_call', 'agent_step' model_name: Optional[str] = None provider: Optional[str] = None # 'openai', 'anthropic', 'self-hosted' # 输入输出 input_messages: List[Dict] = field(default_factory=list) output_content: Optional[str] = None input_tokens: int = 0 output_tokens: int = 0 # 时间信息 start_time: Optional[datetime] = None end_time: Optional[datetime] = None first_token_time: Optional[datetime] = None # TTFT # 元数据 tags: Dict[str, str] = field(default_factory=dict) error: Optional[str] = None @property def duration_ms(self) -> float: if self.start_time and self.end_time: return (self.end_time - self.start_time).total_seconds() * 1000 return 0 @property def ttft_ms(self) -> float: if self.start_time and self.first_token_time: return (self.first_token_time - self.start_time).total_seconds() * 1000 return 0text### 2.2 自动埋点装饰器pythonimport functoolsimport uuidimport timeclass LLMTracer: def __init__(self): self.spans: Dict[str, LLMSpan] = {} self.current_trace_id: Optional[str] = None def trace(self, span_type: str, model_name: str = None): """装饰器:自动为LLM调用创建追踪Span""" def decorator(func): @functools.wraps(func) async def wrapper(*args, **kwargs): span = LLMSpan( span_id=str(uuid.uuid4()), trace_id=self.current_trace_id or str(uuid.uuid4()), parent_span_id=self._current_span_id(), span_type=span_type, model_name=model_name ) span.start_time = datetime.now() self.spans[span.span_id] = span try: result = await func(*args, **kwargs) # 自动提取Token使用量 if hasattr(result, 'usage'): span.input_tokens = result.usage.input_tokens span.output_tokens = result.usage.output_tokens span.output_content = str(result)[:1000] # 截断存储 return result except Exception as e: span.error = str(e) raise finally: span.end_time = datetime.now() await self._export_span(span) return wrapper return decorator# 使用示例tracer = LLMTracer()@tracer.trace(span_type="llm_call", model_name="claude-sonnet-4")async def call_llm(messages): return await anthropic_client.messages.create( model="claude-sonnet-4-20250514", messages=messages )text### 2.3 Agent执行链的追踪Agent的执行涉及多步推理和工具调用,需要追踪完整的决策链:pythonclass AgentTracer: def __init__(self, tracer: LLMTracer): self.tracer = tracer async def trace_agent_run(self, agent, task: str) -> AgentTrace: trace_id = str(uuid.uuid4()) self.tracer.current_trace_id = trace_id trace = AgentTrace(trace_id=trace_id, task=task, steps=[]) async for step in agent.run(task): agent_step = AgentStep( step_number=len(trace.steps) + 1, thought=step.thought, action=step.action, action_input=step.action_input, observation=step.observation, spans=self.tracer.get_trace_spans(trace_id) ) trace.steps.append(agent_step) # 计算总体指标 trace.total_duration_ms = sum(s.duration_ms for s in trace.all_spans()) trace.total_tokens = sum(s.input_tokens + s.output_tokens for s in trace.all_spans()) trace.total_llm_calls = len([s for s in trace.all_spans() if s.span_type == "llm_call"]) return tracetext## 三、Metrics体系:量化LLM应用的健康状态### 3.1 核心指标定义pythonclass LLMMetricsCollector: def __init__(self): # 性能指标 self.ttft_p50 = Histogram() # TTFT P50 self.ttft_p99 = Histogram() # TTFT P99 self.latency_p50 = Histogram() # 端到端延迟P50 self.latency_p99 = Histogram() # 端到端延迟P99 self.throughput = Gauge() # 每秒请求数 # 质量指标 self.error_rate = Gauge() # 错误率 self.retry_rate = Gauge() # 重试率 self.hallucination_rate = Gauge() # 幻觉率 # 成本指标 self.tokens_per_request = Histogram() # 每次请求Token数 self.cost_per_request = Histogram() # 每次请求成本 self.cache_hit_rate = Gauge() # 缓存命中率 # 安全指标 self.blocked_requests = Counter() # 被拦截的请求 self.pii_leak_events = Counter() # PII泄露事件 def record_llm_call(self, span: LLMSpan): """记录一次LLM调用的指标""" self.ttft_p50.observe(span.ttft_ms) self.ttft_p99.observe(span.ttft_ms) self.latency_p50.observe(span.duration_ms) self.latency_p99.observe(span.duration_ms) self.tokens_per_request.observe(span.input_tokens + span.output_tokens) if span.error: self.error_rate.inc()text### 3.2 业务级质量指标技术指标之外,还需要业务视角的质量指标:pythonclass BusinessQualityMetrics: def __init__(self): self.metrics = {} def calculate_task_completion(self, conversation: Conversation) -> float: """计算任务完成率""" # 判断用户意图是否得到满足 signals = [ conversation.user_did_not_rephrase(), # 用户没有重新提问 conversation.user_did_not_escalate(), # 用户没有要求转人工 conversation.user_continued_engagement(), # 用户继续使用 not conversation.ended_with_error(), # 没有以错误结束 ] return sum(signals) / len(signals) def calculate_helpfulness_score(self, conversation: Conversation) -> float: """计算有用性得分""" base_score = 0.5 # 正面信号加分 if conversation.user_copied_response(): base_score += 0.2 if conversation.user_shared_response(): base_score += 0.15 if conversation.user_gave_thumbs_up(): base_score += 0.15 # 负面信号减分 if conversation.user_regenerated(): base_score -= 0.2 if conversation.user_gave_thumbs_down(): base_score -= 0.3 return max(0.0, min(1.0, base_score))text## 四、日志体系:结构化与非结构化的融合### 4.1 结构化日志格式pythonimport structloglogger = structlog.get_logger()class LLMLogger: @staticmethod def log_llm_call( trace_id: str, model: str, messages: List[Dict], response: str, usage: Dict, duration_ms: float ): logger.info( "llm_call_completed", trace_id=trace_id, model=model, message_count=len(messages), input_tokens=usage.get("input_tokens", 0), output_tokens=usage.get("output_tokens", 0), total_tokens=usage.get("total_tokens", 0), duration_ms=duration_ms, first_message_role=messages[0].get("role") if messages else None, last_message_role=messages[-1].get("role") if messages else None, response_preview=response[:200] # 截断存储 ) @staticmethod def log_rag_retrieval( trace_id: str, query: str, retrieved_docs: int, retrieval_time_ms: float, top_scores: List[float] ): logger.info( "rag_retrieval_completed", trace_id=trace_id, query_preview=query[:100], docs_retrieved=retrieved_docs, retrieval_time_ms=retrieval_time_ms, top_score=max(top_scores) if top_scores else 0, avg_score=sum(top_scores) / len(top_scores) if top_scores else 0 )text### 4.2 日志的查询与分析有了结构化日志,可以用SQL-like查询进行问题排查:sql-- 查找TTFT异常的LLM调用SELECT trace_id, model, input_tokens, ttft_msFROM llm_callsWHERE ttft_ms > 3000 AND timestamp > NOW() - INTERVAL '1 hour'ORDER BY ttft_ms DESCLIMIT 20;-- 查找成本最高的用户会话SELECT trace_id, SUM(input_tokens + output_tokens) as total_tokens, SUM(cost) as total_costFROM llm_callsWHERE timestamp > NOW() - INTERVAL '24 hours'GROUP BY trace_idORDER BY total_cost DESCLIMIT 10;-- 查找错误率突增的模型SELECT model, COUNT(*) as total_calls, SUM(CASE WHEN error IS NOT NULL THEN 1 ELSE 0 END) as errors, SUM(CASE WHEN error IS NOT NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) as error_rateFROM llm_callsWHERE timestamp > NOW() - INTERVAL '1 hour'GROUP BY modelHAVING error_rate > 5ORDER BY error_rate DESC;text## 五、诊断工具:从告警到根因### 5.1 智能告警规则pythonclass LLMAlertManager: def __init__(self): self.rules = [ LatencySpikeRule(threshold_ms=5000, window_minutes=5), ErrorRateRule(threshold=0.05, window_minutes=5), CostAnomalyRule(z_score_threshold=3.0), QualityDegradationRule(satisfaction_drop_threshold=0.1), HallucinationSpikeRule(rate_threshold=0.15), ] async def evaluate(self, metrics: LLMMetrics) -> List[Alert]: alerts = [] for rule in self.rules: if await rule.check(metrics): alerts.append(rule.create_alert(metrics)) return alertsclass CostAnomalyRule: def __init__(self, z_score_threshold=3.0): self.threshold = z_score_threshold async def check(self, metrics: LLMMetrics) -> bool: # 获取最近1小时的成本数据 recent_costs = metrics.get_cost_series(minutes=60) historical_mean = metrics.get_cost_mean(days=7, hour=datetime.now().hour) historical_std = metrics.get_cost_std(days=7, hour=datetime.now().hour) current_cost = sum(recent_costs) z_score = (current_cost - historical_mean) / historical_std return abs(z_score) > self.thresholdtext### 5.2 根因分析辅助pythonclass RootCauseAnalyzer: async def analyze(self, trace_id: str) -> RootCauseReport: trace = await self.load_trace(trace_id) findings = [] # 检查1:TTFT是否异常 for span in trace.spans: if span.ttft_ms > 3000: findings.append(Finding( severity="HIGH", category="latency", description=f"TTFT异常:{span.ttft_ms}ms(模型:{span.model_name})", suggestion="检查模型服务是否过载,考虑切换到更快的模型或增加并发" )) # 检查2:Token消耗是否异常 avg_tokens = statistics.mean([s.input_tokens for s in trace.spans]) if avg_tokens > 5000: findings.append(Finding( severity="MEDIUM", category="cost", description=f"输入Token过多:平均{avg_tokens}/请求", suggestion="检查Prompt是否过于冗长,启用Prompt压缩或选择性上下文注入" )) # 检查3:是否有重试循环 retry_count = sum(1 for s in trace.spans if s.tags.get("is_retry")) if retry_count > 2: findings.append(Finding( severity="HIGH", category="reliability", description=f"检测到重试循环:{retry_count}次重试", suggestion="检查工具调用是否有幂等性问题,增加超时和熔断机制" )) return RootCauseReport(trace_id=trace_id, findings=findings)text## 六、开源工具生态2026年主流的LLM可观测性工具:| 工具 | 特点 | 适用场景 ||------|------|---------|| LangSmith | LangChain官方,深度集成 | LangChain/LangGraph用户 || Arize Phoenix | 开源,支持自托管 | 需要数据隐私的团队 || Helicone | 轻量级,专注网关层 | API代理层观测 || Weights & Biases | 实验追踪+生产监控 | 从训练到推理的全流程 || OpenLLMetry | OpenTelemetry标准 | 希望统一观测标准的团队 |## 七、最佳实践总结1. 从第一天就建立观测体系:不要等出了问题再补2. Trace是核心:所有指标和日志都应该关联到Trace3. 关注业务指标:技术指标最终要服务于用户体验4. 建立基线:正常状态长什么样,才能识别异常5. 采样策略:高流量场景下使用智能采样,保留异常Trace6. 隐私与成本平衡:不要记录完整的用户输入输出,使用截断和脱敏—LLM应用的可观测性不是锦上添花,而是生产就绪的必要条件。投入观测体系建设,就是在为你的AI应用购买一份"质量保险”——当问题发生时,你能快速定位、精准修复,而不是在黑暗中摸索。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐