构建可靠的LLM应用监控体系

前言

LLM 应用的不确定性和高成本特性使得监控变得尤为重要。完善的可观测性体系可以帮助我们追踪模型性能、排查问题、优化成本。本文将介绍 LLM 应用监控的最佳实践。


监控概述

可观测性三支柱

┌─────────────────────────────────────────────────────────────────┐
│                    LLM 可观测性三支柱                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐            │
│  │    日志     │  │    指标     │  │    追踪     │            │
│  │   Logs     │  │   Metrics   │  │   Traces   │            │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘            │
│         │                │                │                    │
│  • 请求/响应内容    • 延迟/吞吐    • 调用链路                    │
│  • 错误详情        • Token 消耗   • 依赖关系                    │
│  • 用户反馈        • 成本统计     • 性能瓶颈                    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

LLM 特有监控需求

监控项 说明 重要性
Token 使用 输入/输出 token 统计
响应延迟 TTFT, 总延迟
模型输出质量 幻觉、偏离主题
成本追踪 实时成本监控
错误率 API 错误、超时
缓存命中率 缓存效果评估

日志最佳实践

结构化日志

import logging
import json
from datetime import datetime
from typing import Any, Dict

class LLMLogger:
    """LLM 专用结构化日志"""
    
    def __init__(self, name: str = "llm_app"):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # JSON 格式化
        handler = logging.StreamHandler()
        handler.setFormatter(self.JsonFormatter())
        self.logger.addHandler(handler)
    
    class JsonFormatter(logging.Formatter):
        def format(self, record):
            log_data = {
                "timestamp": datetime.utcnow().isoformat(),
                "level": record.levelname,
                "message": record.getMessage(),
                "logger": record.name
            }
            if hasattr(record, 'extra'):
                log_data.update(record.extra)
            return json.dumps(log_data, ensure_ascii=False)
    
    def log_request(
        self,
        request_id: str,
        model: str,
        messages: list,
        params: dict = None
    ):
        """记录请求"""
        self.logger.info(
            "LLM Request",
            extra={
                "extra": {
                    "request_id": request_id,
                    "model": model,
                    "messages": messages,
                    "params": params
                }
            }
        )

---

#### 进阶:全链路追踪 (Tracing)

在复杂的 RAG  Agent 系统中一个用户请求会触发多次 LLM 调用向量检索和工具执行简单的日志无法理清这些复杂的依赖关系

#### 使用 LangSmith / LangFuse
这些工具是专门为 LLM 设计的可观测性平台

**核心功能**
- **可视化 Trace**清晰展示每一个步骤的输入输出耗时和 Token 消耗
- **调试与重放**可以在网页端直接修改 Prompt 并重新运行对比效果
- **数据集收集**将生产环境中的优质或劣质回答一键保存为测试集

```python
# LangSmith 自动集成示例
import os
from langsmith import Client
from langchain_openai import ChatOpenAI

# 只需要设置环境变量,LangChain 就会自动记录所有 Trace
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your_api_key"

llm = ChatOpenAI()
llm.invoke("你好,请介绍一下你自己")

语义监控:超越传统的指标

传统的监控只看“请求是否成功”,但 LLM 应用还需要看“回答是否正确”。

1. 幻觉检测 (Hallucination Detection)

使用 Arize PhoenixGiskard 等工具,在后台自动扫描响应内容,检测是否存在事实性错误。

2. 主题漂移监控

如果你的客服机器人开始和用户聊政治或游戏,这就是“主题漂移”。

  • 实现:定期对请求进行 Embedding 聚类分析,观察是否有大量请求偏离了预设的业务范围。

3. 负面反馈追踪

将用户的“点踩 (Thumbs down)”行为作为最高优先级的监控指标。

def log_user_feedback(request_id: str, score: int, comment: str = None):
    """记录用户反馈并触发报警"""
    # 如果连续收到 5 个点踩,立即发送 Slack 报警
    if score < 0:
        alert_dev_team(f"Request {request_id} received negative feedback: {comment}")

性能指标:TTFT 与 P99

对于流式输出(Streaming),传统的“总延迟”指标不再适用。

  • TTFT (Time To First Token):首字延迟。这是用户感知的“快慢”最关键的指标。
  • TPS (Tokens Per Second):生成速度。
  • P99 延迟:关注那 1% 最慢的请求,通常是由于长上下文或网络抖动引起的。

总结:监控体系建设路径

  1. L1:基础监控:记录请求/响应日志,统计 Token 消耗和成本。
  2. L2:链路追踪:引入 LangFuseLangSmith,理清 Agent 内部逻辑。
  3. L3:语义监控:建立自动化的质量评估流水线,监控幻觉和安全风险。
  4. L4:闭环优化:将监控发现的问题自动转化为测试用例,驱动 Prompt 和模型的持续迭代。

没有监控的 LLM 应用就像是在黑夜中驾驶。只有建立了完善的可观测性体系,我们才能真正掌控 AI 系统的行为。 extra={ “extra”: { “event”: “llm_request”, “request_id”: request_id, “model”: model, “message_count”: len(messages), “params”: params or {} } } )

def log_response(
    self,
    request_id: str,
    model: str,
    response: str,
    usage: dict,
    latency_ms: float
):
    """记录响应"""
    self.logger.info(
        "LLM Response",
        extra={
            "extra": {
                "event": "llm_response",
                "request_id": request_id,
                "model": model,
                "response_length": len(response),
                "input_tokens": usage.get("prompt_tokens"),
                "output_tokens": usage.get("completion_tokens"),
                "total_tokens": usage.get("total_tokens"),
                "latency_ms": latency_ms
            }
        }
    )

def log_error(
    self,
    request_id: str,
    error_type: str,
    error_message: str,
    context: dict = None
):
    """记录错误"""
    self.logger.error(
        f"LLM Error: {error_type}",
        extra={
            "extra": {
                "event": "llm_error",
                "request_id": request_id,
                "error_type": error_type,
                "error_message": error_message,
                "context": context or {}
            }
        }
    )

使用

logger = LLMLogger()

def monitored_chat(messages: list, model: str = “gpt-4o”) -> str: import time import uuid

request_id = str(uuid.uuid4())
client = OpenAI()

# 记录请求
logger.log_request(request_id, model, messages)

start_time = time.time()

try:
    response = client.chat.completions.create(
        model=model,
        messages=messages
    )
    
    latency_ms = (time.time() - start_time) * 1000
    result = response.choices[0].message.content
    
    # 记录响应
    logger.log_response(
        request_id=request_id,
        model=model,
        response=result,
        usage={
            "prompt_tokens": response.usage.prompt_tokens,
            "completion_tokens": response.usage.completion_tokens,
            "total_tokens": response.usage.total_tokens
        },
        latency_ms=latency_ms
    )
    
    return result
    
except Exception as e:
    logger.log_error(
        request_id=request_id,
        error_type=type(e).__name__,
        error_message=str(e)
    )
    raise ```

敏感信息处理

import re
from typing import List

class LogSanitizer:
    """日志脱敏处理"""
    
    def __init__(self):
        self.patterns = [
            # 邮箱
            (r'\b[\w.-]+@[\w.-]+\.\w+\b', '[EMAIL]'),
            # 手机号
            (r'\b1[3-9]\d{9}\b', '[PHONE]'),
            # 身份证
            (r'\b\d{17}[\dXx]\b', '[ID_CARD]'),
            # API Key
            (r'\bsk-[a-zA-Z0-9]{48}\b', '[API_KEY]'),
            # 信用卡
            (r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b', '[CARD]'),
        ]
    
    def sanitize(self, text: str) -> str:
        """脱敏处理"""
        result = text
        for pattern, replacement in self.patterns:
            result = re.sub(pattern, replacement, result)
        return result
    
    def sanitize_messages(self, messages: List[dict]) -> List[dict]:
        """脱敏消息列表"""
        sanitized = []
        for msg in messages:
            sanitized.append({
                "role": msg["role"],
                "content": self.sanitize(msg["content"])
            })
        return sanitized

# 使用
sanitizer = LogSanitizer()

def safe_log(messages: list):
    """安全日志"""
    safe_messages = sanitizer.sanitize_messages(messages)
    logger.log_request("xxx", "gpt-4o", safe_messages)

指标收集

Prometheus 指标

from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time

# 定义指标
LLM_REQUESTS = Counter(
    'llm_requests_total',
    'Total LLM API requests',
    ['model', 'status']
)

LLM_LATENCY = Histogram(
    'llm_request_latency_seconds',
    'LLM request latency',
    ['model'],
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60]
)

LLM_TOKENS = Counter(
    'llm_tokens_total',
    'Total tokens used',
    ['model', 'token_type']
)

LLM_COST = Counter(
    'llm_cost_dollars',
    'Total cost in dollars',
    ['model']
)

LLM_CACHE_HITS = Counter(
    'llm_cache_hits_total',
    'Cache hit count',
    ['cache_type']
)

class MetricsCollector:
    """指标收集器"""
    
    def __init__(self, port: int = 9090):
        # 启动 Prometheus 服务
        start_http_server(port)
        
        self.pricing = {
            "gpt-4o": {"input": 5, "output": 15},
            "gpt-4o-mini": {"input": 0.15, "output": 0.60}
        }
    
    def record_request(
        self,
        model: str,
        status: str,
        latency: float,
        input_tokens: int,
        output_tokens: int
    ):
        """记录请求指标"""
        # 请求计数
        LLM_REQUESTS.labels(model=model, status=status).inc()
        
        # 延迟
        LLM_LATENCY.labels(model=model).observe(latency)
        
        # Token 使用
        LLM_TOKENS.labels(model=model, token_type="input").inc(input_tokens)
        LLM_TOKENS.labels(model=model, token_type="output").inc(output_tokens)
        
        # 成本
        if model in self.pricing:
            price = self.pricing[model]
            cost = (input_tokens * price["input"] + 
                    output_tokens * price["output"]) / 1_000_000
            LLM_COST.labels(model=model).inc(cost)
    
    def record_cache_hit(self, cache_type: str):
        """记录缓存命中"""
        LLM_CACHE_HITS.labels(cache_type=cache_type).inc()

# 使用
metrics = MetricsCollector()

def instrumented_chat(messages: list, model: str = "gpt-4o") -> str:
    """带指标的聊天"""
    start = time.time()
    client = OpenAI()
    
    try:
        response = client.chat.completions.create(
            model=model,
            messages=messages
        )
        
        latency = time.time() - start
        
        metrics.record_request(
            model=model,
            status="success",
            latency=latency,
            input_tokens=response.usage.prompt_tokens,
            output_tokens=response.usage.completion_tokens
        )
        
        return response.choices[0].message.content
        
    except Exception as e:
        latency = time.time() - start
        metrics.record_request(
            model=model,
            status="error",
            latency=latency,
            input_tokens=0,
            output_tokens=0
        )
        raise

自定义指标仪表板

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List
from collections import defaultdict

@dataclass
class MetricPoint:
    timestamp: datetime
    value: float
    labels: Dict[str, str] = field(default_factory=dict)

class MetricsDashboard:
    """指标仪表板"""
    
    def __init__(self):
        self.metrics: Dict[str, List[MetricPoint]] = defaultdict(list)
    
    def record(self, name: str, value: float, labels: dict = None):
        """记录指标"""
        point = MetricPoint(
            timestamp=datetime.now(),
            value=value,
            labels=labels or {}
        )
        self.metrics[name].append(point)
    
    def get_summary(self, hours: int = 24) -> dict:
        """获取摘要"""
        cutoff = datetime.now() - timedelta(hours=hours)
        
        summary = {}
        for name, points in self.metrics.items():
            recent = [p for p in points if p.timestamp > cutoff]
            if recent:
                values = [p.value for p in recent]
                summary[name] = {
                    "count": len(values),
                    "sum": sum(values),
                    "avg": sum(values) / len(values),
                    "min": min(values),
                    "max": max(values)
                }
        
        return summary
    
    def print_dashboard(self):
        """打印仪表板"""
        summary = self.get_summary()
        
        print("\n" + "=" * 50)
        print("          LLM 监控仪表板")
        print("=" * 50)
        
        for name, stats in summary.items():
            print(f"\n📊 {name}")
            print(f"   总计: {stats['sum']:.2f}")
            print(f"   平均: {stats['avg']:.4f}")
            print(f"   最小: {stats['min']:.4f}")
            print(f"   最大: {stats['max']:.4f}")
        
        print("\n" + "=" * 50)

分布式追踪

OpenTelemetry 集成

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from functools import wraps

# 初始化追踪器
resource = Resource.create({"service.name": "llm-service"})
provider = TracerProvider(resource=resource)

# 配置导出器(发送到 Jaeger/Tempo)
exporter = OTLPSpanExporter(endpoint="localhost:4317", insecure=True)
provider.add_span_processor(BatchSpanProcessor(exporter))

trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)

def trace_llm(func):
    """LLM 调用追踪装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        with tracer.start_as_current_span(
            f"llm_call_{func.__name__}",
            kind=trace.SpanKind.CLIENT
        ) as span:
            # 记录输入
            if args:
                span.set_attribute("llm.messages_count", len(args[0]))
            
            model = kwargs.get("model", "gpt-4o")
            span.set_attribute("llm.model", model)
            
            try:
                result = func(*args, **kwargs)
                
                # 记录输出
                if hasattr(result, 'usage'):
                    span.set_attribute("llm.input_tokens", result.usage.prompt_tokens)
                    span.set_attribute("llm.output_tokens", result.usage.completion_tokens)
                
                span.set_status(trace.Status(trace.StatusCode.OK))
                return result
                
            except Exception as e:
                span.set_status(
                    trace.Status(trace.StatusCode.ERROR, str(e))
                )
                span.record_exception(e)
                raise
    
    return wrapper

# 使用
@trace_llm
def chat_completion(messages: list, model: str = "gpt-4o"):
    client = OpenAI()
    return client.chat.completions.create(
        model=model,
        messages=messages
    )

调用链追踪

import uuid
from contextvars import ContextVar

# 追踪上下文
trace_context: ContextVar[dict] = ContextVar('trace_context', default={})

class TraceContext:
    """追踪上下文管理"""
    
    @staticmethod
    def start_trace(name: str = None) -> str:
        """开始追踪"""
        trace_id = str(uuid.uuid4())
        trace_context.set({
            "trace_id": trace_id,
            "span_id": trace_id[:8],
            "name": name,
            "spans": []
        })
        return trace_id
    
    @staticmethod
    def get_trace_id() -> str:
        """获取追踪 ID"""
        ctx = trace_context.get()
        return ctx.get("trace_id", "")
    
    @staticmethod
    def add_span(name: str, data: dict):
        """添加 Span"""
        ctx = trace_context.get()
        span_id = str(uuid.uuid4())[:8]
        
        span = {
            "span_id": span_id,
            "parent_id": ctx.get("span_id"),
            "name": name,
            "timestamp": datetime.now().isoformat(),
            "data": data
        }
        
        ctx["spans"].append(span)
        ctx["span_id"] = span_id

class TracedLLMClient:
    """带追踪的 LLM 客户端"""
    
    def __init__(self):
        self.client = OpenAI()
    
    def chat(self, messages: list, model: str = "gpt-4o") -> dict:
        """带追踪的聊天"""
        trace_id = TraceContext.start_trace("chat_completion")
        
        # 预处理 Span
        TraceContext.add_span("preprocess", {
            "message_count": len(messages),
            "model": model
        })
        
        # LLM 调用 Span
        start = time.time()
        response = self.client.chat.completions.create(
            model=model,
            messages=messages
        )
        latency = time.time() - start
        
        TraceContext.add_span("llm_call", {
            "model": model,
            "latency_ms": latency * 1000,
            "input_tokens": response.usage.prompt_tokens,
            "output_tokens": response.usage.completion_tokens
        })
        
        # 后处理 Span
        result = response.choices[0].message.content
        TraceContext.add_span("postprocess", {
            "response_length": len(result)
        })
        
        return {
            "response": result,
            "trace_id": trace_id,
            "trace": trace_context.get()
        }

LangSmith 集成

配置

pip install langsmith
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY=your_api_key
export LANGCHAIN_PROJECT=my_project

使用

from langsmith import Client
from langsmith.wrappers import wrap_openai

# 包装 OpenAI 客户端
client = wrap_openai(OpenAI())

# 所有调用自动追踪
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}]
)

# 添加自定义元数据
from langsmith import traceable

@traceable(
    name="qa_chain",
    metadata={"version": "1.0"}
)
def qa_chain(question: str) -> str:
    """带追踪的问答链"""
    # RAG 检索
    docs = retrieve_documents(question)
    
    # LLM 调用
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": f"Context: {docs}"},
            {"role": "user", "content": question}
        ]
    )
    
    return response.choices[0].message.content

# 评估
from langsmith.evaluation import evaluate

results = evaluate(
    qa_chain,
    data="my_dataset",
    evaluators=["correctness", "helpfulness"]
)

LangFuse 集成

配置

from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context

# 初始化
langfuse = Langfuse(
    public_key="pk-xxx",
    secret_key="sk-xxx",
    host="https://cloud.langfuse.com"
)

@observe()
def rag_pipeline(query: str) -> str:
    """带 LangFuse 追踪的 RAG 管道"""
    
    # 添加自定义属性
    langfuse_context.update_current_trace(
        user_id="user_123",
        session_id="session_456",
        metadata={"source": "web"}
    )
    
    # 检索
    docs = retrieve(query)
    
    langfuse_context.update_current_observation(
        input=query,
        metadata={"doc_count": len(docs)}
    )
    
    # 生成
    response = generate(query, docs)
    
    # 记录分数
    langfuse_context.score_current_trace(
        name="relevance",
        value=0.9
    )
    
    return response

# 手动追踪
def manual_trace_example():
    """手动追踪示例"""
    trace = langfuse.trace(
        name="chat_session",
        user_id="user_123"
    )
    
    # 记录生成
    generation = trace.generation(
        name="llm_call",
        model="gpt-4o",
        input=[{"role": "user", "content": "Hello"}],
        output="Hi there!"
    )
    
    # 结束生成
    generation.end(
        usage={
            "input": 10,
            "output": 5
        }
    )
    
    # 刷新数据
    langfuse.flush()

告警配置

告警规则

from dataclasses import dataclass
from typing import Callable, List
from enum import Enum

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class AlertRule:
    name: str
    condition: Callable[[dict], bool]
    severity: AlertSeverity
    message_template: str

class AlertManager:
    """告警管理器"""
    
    def __init__(self):
        self.rules: List[AlertRule] = []
        self.handlers: List[Callable] = []
    
    def add_rule(self, rule: AlertRule):
        """添加告警规则"""
        self.rules.append(rule)
    
    def add_handler(self, handler: Callable):
        """添加告警处理器"""
        self.handlers.append(handler)
    
    def check(self, metrics: dict):
        """检查告警"""
        for rule in self.rules:
            if rule.condition(metrics):
                alert = {
                    "name": rule.name,
                    "severity": rule.severity.value,
                    "message": rule.message_template.format(**metrics),
                    "timestamp": datetime.now().isoformat()
                }
                
                for handler in self.handlers:
                    handler(alert)

# 配置告警规则
alert_manager = AlertManager()

# 错误率告警
alert_manager.add_rule(AlertRule(
    name="high_error_rate",
    condition=lambda m: m.get("error_rate", 0) > 0.05,
    severity=AlertSeverity.CRITICAL,
    message_template="错误率过高: {error_rate:.2%}"
))

# 延迟告警
alert_manager.add_rule(AlertRule(
    name="high_latency",
    condition=lambda m: m.get("p99_latency", 0) > 10,
    severity=AlertSeverity.WARNING,
    message_template="P99 延迟过高: {p99_latency:.2f}s"
))

# 成本告警
alert_manager.add_rule(AlertRule(
    name="daily_cost_exceeded",
    condition=lambda m: m.get("daily_cost", 0) > 100,
    severity=AlertSeverity.WARNING,
    message_template="日成本超限: ${daily_cost:.2f}"
))

# 告警处理器
def slack_handler(alert: dict):
    """发送到 Slack"""
    print(f"[{alert['severity'].upper()}] {alert['message']}")

def email_handler(alert: dict):
    """发送邮件"""
    if alert["severity"] == "critical":
        print(f"发送邮件告警: {alert['message']}")

alert_manager.add_handler(slack_handler)
alert_manager.add_handler(email_handler)

Grafana 仪表板

Prometheus 查询示例

# 请求速率
rate(llm_requests_total[5m])

# 错误率
rate(llm_requests_total{status="error"}[5m]) 
/ rate(llm_requests_total[5m])

# P99 延迟
histogram_quantile(0.99, 
  rate(llm_request_latency_seconds_bucket[5m]))

# Token 使用量
sum(rate(llm_tokens_total[1h])) by (model, token_type)

# 每小时成本
sum(increase(llm_cost_dollars[1h])) by (model)

# 缓存命中率
sum(rate(llm_cache_hits_total[5m])) 
/ sum(rate(llm_requests_total[5m]))

仪表板面板配置

{
  "panels": [
    {
      "title": "请求速率",
      "type": "timeseries",
      "targets": [{
        "expr": "rate(llm_requests_total[5m])",
        "legendFormat": ""
      }]
    },
    {
      "title": "错误率",
      "type": "gauge",
      "targets": [{
        "expr": "rate(llm_requests_total{status='error'}[5m]) / rate(llm_requests_total[5m])"
      }],
      "thresholds": [
        {"value": 0, "color": "green"},
        {"value": 0.01, "color": "yellow"},
        {"value": 0.05, "color": "red"}
      ]
    },
    {
      "title": "延迟分布",
      "type": "heatmap",
      "targets": [{
        "expr": "rate(llm_request_latency_seconds_bucket[5m])"
      }]
    },
    {
      "title": "每日成本",
      "type": "stat",
      "targets": [{
        "expr": "sum(increase(llm_cost_dollars[24h]))"
      }],
      "unit": "currencyUSD"
    }
  ]
}

完整监控方案

class LLMObservability:
    """LLM 完整可观测性方案"""
    
    def __init__(self):
        self.logger = LLMLogger()
        self.metrics = MetricsDashboard()
        self.alert_manager = AlertManager()
        
        # 配置告警
        self._setup_alerts()
    
    def _setup_alerts(self):
        """配置告警规则"""
        self.alert_manager.add_rule(AlertRule(
            name="high_latency",
            condition=lambda m: m.get("avg_latency", 0) > 5,
            severity=AlertSeverity.WARNING,
            message_template="平均延迟过高: {avg_latency:.2f}s"
        ))
    
    def wrap_client(self, client):
        """包装 OpenAI 客户端"""
        original_create = client.chat.completions.create
        
        def wrapped_create(*args, **kwargs):
            request_id = str(uuid.uuid4())
            model = kwargs.get("model", "gpt-4o")
            messages = kwargs.get("messages", [])
            
            # 记录请求
            self.logger.log_request(request_id, model, messages)
            
            start = time.time()
            try:
                response = original_create(*args, **kwargs)
                latency = time.time() - start
                
                # 记录响应
                self.logger.log_response(
                    request_id, model,
                    response.choices[0].message.content,
                    {
                        "prompt_tokens": response.usage.prompt_tokens,
                        "completion_tokens": response.usage.completion_tokens,
                        "total_tokens": response.usage.total_tokens
                    },
                    latency * 1000
                )
                
                # 记录指标
                self.metrics.record("latency", latency, {"model": model})
                self.metrics.record("tokens", response.usage.total_tokens, {"model": model})
                
                return response
                
            except Exception as e:
                latency = time.time() - start
                self.logger.log_error(request_id, type(e).__name__, str(e))
                self.metrics.record("errors", 1, {"model": model})
                raise
        
        client.chat.completions.create = wrapped_create
        return client

# 使用
observability = LLMObservability()
client = observability.wrap_client(OpenAI())

# 所有调用自动监控
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello"}]
)

# 查看仪表板
observability.metrics.print_dashboard()

最佳实践

实践 说明 优先级
结构化日志 便于查询分析
请求 ID 端到端追踪
敏感信息脱敏 合规要求
Token 监控 成本控制
延迟监控 用户体验
自动告警 及时响应
分布式追踪 复杂系统

总结

LLM 应用监控的核心要点:

  1. 日志:结构化、可追踪、已脱敏
  2. 指标:Token、延迟、成本、错误率
  3. 追踪:调用链、依赖关系
  4. 告警:阈值触发、多级通知

推荐工具:

  • 日志: ELK / Loki
  • 指标: Prometheus + Grafana
  • 追踪: LangSmith / LangFuse / Jaeger
  • APM: Datadog / New Relic

参考资源

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:《 LLM应用开发——监控与可观测性 》

本文链接:http://localhost:3015/ai/%E7%9B%91%E6%8E%A7%E4%B8%8E%E5%8F%AF%E8%A7%82%E6%B5%8B%E6%80%A7.html

本文最后一次更新为 天前,文章中的某些内容可能已过时!