构建可靠的LLM应用监控体系

前言

LLM 应用的不确定性和高成本特性使得监控变得尤为重要。完善的可观测性体系可以帮助我们追踪模型性能、排查问题、优化成本。本文将介绍 LLM 应用监控的最佳实践。


监控概述

可观测性三支柱

┌─────────────────────────────────────────────────────────────────┐
│                    LLM 可观测性三支柱                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐            │
│  │    日志     │  │    指标     │  │    追踪     │            │
│  │   Logs     │  │   Metrics   │  │   Traces   │            │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘            │
│         │                │                │                    │
│  • 请求/响应内容    • 延迟/吞吐    • 调用链路                    │
│  • 错误详情        • Token 消耗   • 依赖关系                    │
│  • 用户反馈        • 成本统计     • 性能瓶颈                    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

LLM 特有监控需求

监控项说明重要性
Token 使用输入/输出 token 统计
响应延迟TTFT, 总延迟
模型输出质量幻觉、偏离主题
成本追踪实时成本监控
错误率API 错误、超时
缓存命中率缓存效果评估

日志最佳实践

结构化日志

import logging
import json
from datetime import datetime
from typing import Any, Dict

class LLMLogger:
    """LLM 专用结构化日志"""
    
    def __init__(self, name: str = "llm_app"):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # JSON 格式化
        handler = logging.StreamHandler()
        handler.setFormatter(self.JsonFormatter())
        self.logger.addHandler(handler)
    
    class JsonFormatter(logging.Formatter):
        def format(self, record):
            log_data = {
                "timestamp": datetime.utcnow().isoformat(),
                "level": record.levelname,
                "message": record.getMessage(),
                "logger": record.name
            }
            if hasattr(record, 'extra'):
                log_data.update(record.extra)
            return json.dumps(log_data, ensure_ascii=False)
    
    def log_request(
        self,
        request_id: str,
        model: str,
        messages: list,
        params: dict = None
    ):
        """记录请求"""
        self.logger.info(
            "LLM Request",
            extra={
                "extra": {
                    "request_id": request_id,
                    "model": model,
                    "messages": messages,
                    "params": params
                }
            }
        )

---

#### 进阶:全链路追踪 (Tracing)

在复杂的 RAG 或 Agent 系统中,一个用户请求会触发多次 LLM 调用、向量检索和工具执行。简单的日志无法理清这些复杂的依赖关系。

#### 使用 LangSmith / LangFuse
这些工具是专门为 LLM 设计的可观测性平台。

**核心功能:**
- **可视化 Trace**:清晰展示每一个步骤的输入、输出、耗时和 Token 消耗。
- **调试与重放**:可以在网页端直接修改 Prompt 并重新运行,对比效果。
- **数据集收集**:将生产环境中的优质或劣质回答一键保存为测试集。

```python
# LangSmith 自动集成示例
import os
from langsmith import Client
from langchain_openai import ChatOpenAI

# 只需要设置环境变量,LangChain 就会自动记录所有 Trace
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your_api_key"

llm = ChatOpenAI()
llm.invoke("你好,请介绍一下你自己")

语义监控:超越传统的指标

传统的监控只看“请求是否成功”,但 LLM 应用还需要看“回答是否正确”。

1. 幻觉检测 (Hallucination Detection)

使用 Arize PhoenixGiskard 等工具,在后台自动扫描响应内容,检测是否存在事实性错误。

2. 主题漂移监控

如果你的客服机器人开始和用户聊政治或游戏,这就是“主题漂移”。

  • 实现:定期对请求进行 Embedding 聚类分析,观察是否有大量请求偏离了预设的业务范围。

3. 负面反馈追踪

将用户的“点踩 (Thumbs down)”行为作为最高优先级的监控指标。

def log_user_feedback(request_id: str, score: int, comment: str = None):
    """记录用户反馈并触发报警"""
    # 如果连续收到 5 个点踩,立即发送 Slack 报警
    if score < 0:
        alert_dev_team(f"Request {request_id} received negative feedback: {comment}")

性能指标:TTFT 与 P99

对于流式输出(Streaming),传统的“总延迟”指标不再适用。

  • TTFT (Time To First Token):首字延迟。这是用户感知的“快慢”最关键的指标。
  • TPS (Tokens Per Second):生成速度。
  • P99 延迟:关注那 1% 最慢的请求,通常是由于长上下文或网络抖动引起的。

总结:监控体系建设路径

  1. L1:基础监控:记录请求/响应日志,统计 Token 消耗和成本。
  2. L2:链路追踪:引入 LangFuseLangSmith,理清 Agent 内部逻辑。
  3. L3:语义监控:建立自动化的质量评估流水线,监控幻觉和安全风险。
  4. L4:闭环优化:将监控发现的问题自动转化为测试用例,驱动 Prompt 和模型的持续迭代。

没有监控的 LLM 应用就像是在黑夜中驾驶。只有建立了完善的可观测性体系,我们才能真正掌控 AI 系统的行为。 extra={ "extra": { "event": "llm_request", "request_id": request_id, "model": model, "message_count": len(messages), "params": params or {} } } )

def log_response(
    self,
    request_id: str,
    model: str,
    response: str,
    usage: dict,
    latency_ms: float
):
    """记录响应"""
    self.logger.info(
        "LLM Response",
        extra={
            "extra": {
                "event": "llm_response",
                "request_id": request_id,
                "model": model,
                "response_length": len(response),
                "input_tokens": usage.get("prompt_tokens"),
                "output_tokens": usage.get("completion_tokens"),
                "total_tokens": usage.get("total_tokens"),
                "latency_ms": latency_ms
            }
        }
    )

def log_error(
    self,
    request_id: str,
    error_type: str,
    error_message: str,
    context: dict = None
):
    """记录错误"""
    self.logger.error(
        f"LLM Error: {error_type}",
        extra={
            "extra": {
                "event": "llm_error",
                "request_id": request_id,
                "error_type": error_type,
                "error_message": error_message,
                "context": context or {}
            }
        }
    )

使用

logger = LLMLogger()

def monitored_chat(messages: list, model: str = "gpt-4o") -> str: import time import uuid

request_id = str(uuid.uuid4())
client = OpenAI()

# 记录请求
logger.log_request(request_id, model, messages)

start_time = time.time()

try:
    response = client.chat.completions.create(
        model=model,
        messages=messages
    )
    
    latency_ms = (time.time() - start_time) * 1000
    result = response.choices[0].message.content
    
    # 记录响应
    logger.log_response(
        request_id=request_id,
        model=model,
        response=result,
        usage={
            "prompt_tokens": response.usage.prompt_tokens,
            "completion_tokens": response.usage.completion_tokens,
            "total_tokens": response.usage.total_tokens
        },
        latency_ms=latency_ms
    )
    
    return result
    
except Exception as e:
    logger.log_error(
        request_id=request_id,
        error_type=type(e).__name__,
        error_message=str(e)
    )
    raise

#### 敏感信息处理

```python
import re
from typing import List

class LogSanitizer:
    """日志脱敏处理"""
    
    def __init__(self):
        self.patterns = [
            # 邮箱
            (r'\b[\w.-]+@[\w.-]+\.\w+\b', '[EMAIL]'),
            # 手机号
            (r'\b1[3-9]\d{9}\b', '[PHONE]'),
            # 身份证
            (r'\b\d{17}[\dXx]\b', '[ID_CARD]'),
            # API Key
            (r'\bsk-[a-zA-Z0-9]{48}\b', '[API_KEY]'),
            # 信用卡
            (r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b', '[CARD]'),
        ]
    
    def sanitize(self, text: str) -> str:
        """脱敏处理"""
        result = text
        for pattern, replacement in self.patterns:
            result = re.sub(pattern, replacement, result)
        return result
    
    def sanitize_messages(self, messages: List[dict]) -> List[dict]:
        """脱敏消息列表"""
        sanitized = []
        for msg in messages:
            sanitized.append({
                "role": msg["role"],
                "content": self.sanitize(msg["content"])
            })
        return sanitized

# 使用
sanitizer = LogSanitizer()

def safe_log(messages: list):
    """安全日志"""
    safe_messages = sanitizer.sanitize_messages(messages)
    logger.log_request("xxx", "gpt-4o", safe_messages)

指标收集

Prometheus 指标

from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time

# 定义指标
LLM_REQUESTS = Counter(
    'llm_requests_total',
    'Total LLM API requests',
    ['model', 'status']
)

LLM_LATENCY = Histogram(
    'llm_request_latency_seconds',
    'LLM request latency',
    ['model'],
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60]
)

LLM_TOKENS = Counter(
    'llm_tokens_total',
    'Total tokens used',
    ['model', 'token_type']
)

LLM_COST = Counter(
    'llm_cost_dollars',
    'Total cost in dollars',
    ['model']
)

LLM_CACHE_HITS = Counter(
    'llm_cache_hits_total',
    'Cache hit count',
    ['cache_type']
)

class MetricsCollector:
    """指标收集器"""
    
    def __init__(self, port: int = 9090):
        # 启动 Prometheus 服务
        start_http_server(port)
        
        self.pricing = {
            "gpt-4o": {"input": 5, "output": 15},
            "gpt-4o-mini": {"input": 0.15, "output": 0.60}
        }
    
    def record_request(
        self,
        model: str,
        status: str,
        latency: float,
        input_tokens: int,
        output_tokens: int
    ):
        """记录请求指标"""
        # 请求计数
        LLM_REQUESTS.labels(model=model, status=status).inc()
        
        # 延迟
        LLM_LATENCY.labels(model=model).observe(latency)
        
        # Token 使用
        LLM_TOKENS.labels(model=model, token_type="input").inc(input_tokens)
        LLM_TOKENS.labels(model=model, token_type="output").inc(output_tokens)
        
        # 成本
        if model in self.pricing:
            price = self.pricing[model]
            cost = (input_tokens * price["input"] + 
                    output_tokens * price["output"]) / 1_000_000
            LLM_COST.labels(model=model).inc(cost)
    
    def record_cache_hit(self, cache_type: str):
        """记录缓存命中"""
        LLM_CACHE_HITS.labels(cache_type=cache_type).inc()

# 使用
metrics = MetricsCollector()

def instrumented_chat(messages: list, model: str = "gpt-4o") -> str:
    """带指标的聊天"""
    start = time.time()
    client = OpenAI()
    
    try:
        response = client.chat.completions.create(
            model=model,
            messages=messages
        )
        
        latency = time.time() - start
        
        metrics.record_request(
            model=model,
            status="success",
            latency=latency,
            input_tokens=response.usage.prompt_tokens,
            output_tokens=response.usage.completion_tokens
        )
        
        return response.choices[0].message.content
        
    except Exception as e:
        latency = time.time() - start
        metrics.record_request(
            model=model,
            status="error",
            latency=latency,
            input_tokens=0,
            output_tokens=0
        )
        raise

自定义指标仪表板

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List
from collections import defaultdict

@dataclass
class MetricPoint:
    timestamp: datetime
    value: float
    labels: Dict[str, str] = field(default_factory=dict)

class MetricsDashboard:
    """指标仪表板"""
    
    def __init__(self):
        self.metrics: Dict[str, List[MetricPoint]] = defaultdict(list)
    
    def record(self, name: str, value: float, labels: dict = None):
        """记录指标"""
        point = MetricPoint(
            timestamp=datetime.now(),
            value=value,
            labels=labels or {}
        )
        self.metrics[name].append(point)
    
    def get_summary(self, hours: int = 24) -> dict:
        """获取摘要"""
        cutoff = datetime.now() - timedelta(hours=hours)
        
        summary = {}
        for name, points in self.metrics.items():
            recent = [p for p in points if p.timestamp > cutoff]
            if recent:
                values = [p.value for p in recent]
                summary[name] = {
                    "count": len(values),
                    "sum": sum(values),
                    "avg": sum(values) / len(values),
                    "min": min(values),
                    "max": max(values)
                }
        
        return summary
    
    def print_dashboard(self):
        """打印仪表板"""
        summary = self.get_summary()
        
        print("\n" + "=" * 50)
        print("          LLM 监控仪表板")
        print("=" * 50)
        
        for name, stats in summary.items():
            print(f"\n📊 {name}")
            print(f"   总计: {stats['sum']:.2f}")
            print(f"   平均: {stats['avg']:.4f}")
            print(f"   最小: {stats['min']:.4f}")
            print(f"   最大: {stats['max']:.4f}")
        
        print("\n" + "=" * 50)

分布式追踪

OpenTelemetry 集成

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from functools import wraps

# 初始化追踪器
resource = Resource.create({"service.name": "llm-service"})
provider = TracerProvider(resource=resource)

# 配置导出器(发送到 Jaeger/Tempo)
exporter = OTLPSpanExporter(endpoint="localhost:4317", insecure=True)
provider.add_span_processor(BatchSpanProcessor(exporter))

trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)

def trace_llm(func):
    """LLM 调用追踪装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        with tracer.start_as_current_span(
            f"llm_call_{func.__name__}",
            kind=trace.SpanKind.CLIENT
        ) as span:
            # 记录输入
            if args:
                span.set_attribute("llm.messages_count", len(args[0]))
            
            model = kwargs.get("model", "gpt-4o")
            span.set_attribute("llm.model", model)
            
            try:
                result = func(*args, **kwargs)
                
                # 记录输出
                if hasattr(result, 'usage'):
                    span.set_attribute("llm.input_tokens", result.usage.prompt_tokens)
                    span.set_attribute("llm.output_tokens", result.usage.completion_tokens)
                
                span.set_status(trace.Status(trace.StatusCode.OK))
                return result
                
            except Exception as e:
                span.set_status(
                    trace.Status(trace.StatusCode.ERROR, str(e))
                )
                span.record_exception(e)
                raise
    
    return wrapper

# 使用
@trace_llm
def chat_completion(messages: list, model: str = "gpt-4o"):
    client = OpenAI()
    return client.chat.completions.create(
        model=model,
        messages=messages
    )

调用链追踪

import uuid
from contextvars import ContextVar

# 追踪上下文
trace_context: ContextVar[dict] = ContextVar('trace_context', default={})

class TraceContext:
    """追踪上下文管理"""
    
    @staticmethod
    def start_trace(name: str = None) -> str:
        """开始追踪"""
        trace_id = str(uuid.uuid4())
        trace_context.set({
            "trace_id": trace_id,
            "span_id": trace_id[:8],
            "name": name,
            "spans": []
        })
        return trace_id
    
    @staticmethod
    def get_trace_id() -> str:
        """获取追踪 ID"""
        ctx = trace_context.get()
        return ctx.get("trace_id", "")
    
    @staticmethod
    def add_span(name: str, data: dict):
        """添加 Span"""
        ctx = trace_context.get()
        span_id = str(uuid.uuid4())[:8]
        
        span = {
            "span_id": span_id,
            "parent_id": ctx.get("span_id"),
            "name": name,
            "timestamp": datetime.now().isoformat(),
            "data": data
        }
        
        ctx["spans"].append(span)
        ctx["span_id"] = span_id

class TracedLLMClient:
    """带追踪的 LLM 客户端"""
    
    def __init__(self):
        self.client = OpenAI()
    
    def chat(self, messages: list, model: str = "gpt-4o") -> dict:
        """带追踪的聊天"""
        trace_id = TraceContext.start_trace("chat_completion")
        
        # 预处理 Span
        TraceContext.add_span("preprocess", {
            "message_count": len(messages),
            "model": model
        })
        
        # LLM 调用 Span
        start = time.time()
        response = self.client.chat.completions.create(
            model=model,
            messages=messages
        )
        latency = time.time() - start
        
        TraceContext.add_span("llm_call", {
            "model": model,
            "latency_ms": latency * 1000,
            "input_tokens": response.usage.prompt_tokens,
            "output_tokens": response.usage.completion_tokens
        })
        
        # 后处理 Span
        result = response.choices[0].message.content
        TraceContext.add_span("postprocess", {
            "response_length": len(result)
        })
        
        return {
            "response": result,
            "trace_id": trace_id,
            "trace": trace_context.get()
        }

LangSmith 集成

配置

pip install langsmith
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY=your_api_key
export LANGCHAIN_PROJECT=my_project

使用

from langsmith import Client
from langsmith.wrappers import wrap_openai

# 包装 OpenAI 客户端
client = wrap_openai(OpenAI())

# 所有调用自动追踪
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}]
)

# 添加自定义元数据
from langsmith import traceable

@traceable(
    name="qa_chain",
    metadata={"version": "1.0"}
)
def qa_chain(question: str) -> str:
    """带追踪的问答链"""
    # RAG 检索
    docs = retrieve_documents(question)
    
    # LLM 调用
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": f"Context: {docs}"},
            {"role": "user", "content": question}
        ]
    )
    
    return response.choices[0].message.content

# 评估
from langsmith.evaluation import evaluate

results = evaluate(
    qa_chain,
    data="my_dataset",
    evaluators=["correctness", "helpfulness"]
)

LangFuse 集成

配置

from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context

# 初始化
langfuse = Langfuse(
    public_key="pk-xxx",
    secret_key="sk-xxx",
    host="https://cloud.langfuse.com"
)

@observe()
def rag_pipeline(query: str) -> str:
    """带 LangFuse 追踪的 RAG 管道"""
    
    # 添加自定义属性
    langfuse_context.update_current_trace(
        user_id="user_123",
        session_id="session_456",
        metadata={"source": "web"}
    )
    
    # 检索
    docs = retrieve(query)
    
    langfuse_context.update_current_observation(
        input=query,
        metadata={"doc_count": len(docs)}
    )
    
    # 生成
    response = generate(query, docs)
    
    # 记录分数
    langfuse_context.score_current_trace(
        name="relevance",
        value=0.9
    )
    
    return response

# 手动追踪
def manual_trace_example():
    """手动追踪示例"""
    trace = langfuse.trace(
        name="chat_session",
        user_id="user_123"
    )
    
    # 记录生成
    generation = trace.generation(
        name="llm_call",
        model="gpt-4o",
        input=[{"role": "user", "content": "Hello"}],
        output="Hi there!"
    )
    
    # 结束生成
    generation.end(
        usage={
            "input": 10,
            "output": 5
        }
    )
    
    # 刷新数据
    langfuse.flush()

告警配置

告警规则

from dataclasses import dataclass
from typing import Callable, List
from enum import Enum

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class AlertRule:
    name: str
    condition: Callable[[dict], bool]
    severity: AlertSeverity
    message_template: str

class AlertManager:
    """告警管理器"""
    
    def __init__(self):
        self.rules: List[AlertRule] = []
        self.handlers: List[Callable] = []
    
    def add_rule(self, rule: AlertRule):
        """添加告警规则"""
        self.rules.append(rule)
    
    def add_handler(self, handler: Callable):
        """添加告警处理器"""
        self.handlers.append(handler)
    
    def check(self, metrics: dict):
        """检查告警"""
        for rule in self.rules:
            if rule.condition(metrics):
                alert = {
                    "name": rule.name,
                    "severity": rule.severity.value,
                    "message": rule.message_template.format(**metrics),
                    "timestamp": datetime.now().isoformat()
                }
                
                for handler in self.handlers:
                    handler(alert)

# 配置告警规则
alert_manager = AlertManager()

# 错误率告警
alert_manager.add_rule(AlertRule(
    name="high_error_rate",
    condition=lambda m: m.get("error_rate", 0) > 0.05,
    severity=AlertSeverity.CRITICAL,
    message_template="错误率过高: {error_rate:.2%}"
))

# 延迟告警
alert_manager.add_rule(AlertRule(
    name="high_latency",
    condition=lambda m: m.get("p99_latency", 0) > 10,
    severity=AlertSeverity.WARNING,
    message_template="P99 延迟过高: {p99_latency:.2f}s"
))

# 成本告警
alert_manager.add_rule(AlertRule(
    name="daily_cost_exceeded",
    condition=lambda m: m.get("daily_cost", 0) > 100,
    severity=AlertSeverity.WARNING,
    message_template="日成本超限: ${daily_cost:.2f}"
))

# 告警处理器
def slack_handler(alert: dict):
    """发送到 Slack"""
    print(f"[{alert['severity'].upper()}] {alert['message']}")

def email_handler(alert: dict):
    """发送邮件"""
    if alert["severity"] == "critical":
        print(f"发送邮件告警: {alert['message']}")

alert_manager.add_handler(slack_handler)
alert_manager.add_handler(email_handler)

Grafana 仪表板

Prometheus 查询示例

# 请求速率
rate(llm_requests_total[5m])

# 错误率
rate(llm_requests_total{status="error"}[5m]) 
/ rate(llm_requests_total[5m])

# P99 延迟
histogram_quantile(0.99, 
  rate(llm_request_latency_seconds_bucket[5m]))

# Token 使用量
sum(rate(llm_tokens_total[1h])) by (model, token_type)

# 每小时成本
sum(increase(llm_cost_dollars[1h])) by (model)

# 缓存命中率
sum(rate(llm_cache_hits_total[5m])) 
/ sum(rate(llm_requests_total[5m]))

仪表板面板配置

{
  "panels": [
    {
      "title": "请求速率",
      "type": "timeseries",
      "targets": [{
        "expr": "rate(llm_requests_total[5m])",
        "legendFormat": "{{model}}"
      }]
    },
    {
      "title": "错误率",
      "type": "gauge",
      "targets": [{
        "expr": "rate(llm_requests_total{status='error'}[5m]) / rate(llm_requests_total[5m])"
      }],
      "thresholds": [
        {"value": 0, "color": "green"},
        {"value": 0.01, "color": "yellow"},
        {"value": 0.05, "color": "red"}
      ]
    },
    {
      "title": "延迟分布",
      "type": "heatmap",
      "targets": [{
        "expr": "rate(llm_request_latency_seconds_bucket[5m])"
      }]
    },
    {
      "title": "每日成本",
      "type": "stat",
      "targets": [{
        "expr": "sum(increase(llm_cost_dollars[24h]))"
      }],
      "unit": "currencyUSD"
    }
  ]
}

完整监控方案

class LLMObservability:
    """LLM 完整可观测性方案"""
    
    def __init__(self):
        self.logger = LLMLogger()
        self.metrics = MetricsDashboard()
        self.alert_manager = AlertManager()
        
        # 配置告警
        self._setup_alerts()
    
    def _setup_alerts(self):
        """配置告警规则"""
        self.alert_manager.add_rule(AlertRule(
            name="high_latency",
            condition=lambda m: m.get("avg_latency", 0) > 5,
            severity=AlertSeverity.WARNING,
            message_template="平均延迟过高: {avg_latency:.2f}s"
        ))
    
    def wrap_client(self, client):
        """包装 OpenAI 客户端"""
        original_create = client.chat.completions.create
        
        def wrapped_create(*args, **kwargs):
            request_id = str(uuid.uuid4())
            model = kwargs.get("model", "gpt-4o")
            messages = kwargs.get("messages", [])
            
            # 记录请求
            self.logger.log_request(request_id, model, messages)
            
            start = time.time()
            try:
                response = original_create(*args, **kwargs)
                latency = time.time() - start
                
                # 记录响应
                self.logger.log_response(
                    request_id, model,
                    response.choices[0].message.content,
                    {
                        "prompt_tokens": response.usage.prompt_tokens,
                        "completion_tokens": response.usage.completion_tokens,
                        "total_tokens": response.usage.total_tokens
                    },
                    latency * 1000
                )
                
                # 记录指标
                self.metrics.record("latency", latency, {"model": model})
                self.metrics.record("tokens", response.usage.total_tokens, {"model": model})
                
                return response
                
            except Exception as e:
                latency = time.time() - start
                self.logger.log_error(request_id, type(e).__name__, str(e))
                self.metrics.record("errors", 1, {"model": model})
                raise
        
        client.chat.completions.create = wrapped_create
        return client

# 使用
observability = LLMObservability()
client = observability.wrap_client(OpenAI())

# 所有调用自动监控
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello"}]
)

# 查看仪表板
observability.metrics.print_dashboard()

最佳实践

实践说明优先级
结构化日志便于查询分析
请求 ID端到端追踪
敏感信息脱敏合规要求
Token 监控成本控制
延迟监控用户体验
自动告警及时响应
分布式追踪复杂系统

总结

LLM 应用监控的核心要点:

  1. 日志:结构化、可追踪、已脱敏
  2. 指标:Token、延迟、成本、错误率
  3. 追踪:调用链、依赖关系
  4. 告警:阈值触发、多级通知

推荐工具:

  • 日志: ELK / Loki
  • 指标: Prometheus + Grafana
  • 追踪: LangSmith / LangFuse / Jaeger
  • APM: Datadog / New Relic

参考资源

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:LLM应用开发——监控与可观测性

本文链接:https://www.sshipanoo.com/blog/ai/llm-app/监控与可观测性/

本文最后一次更新为 天前,文章中的某些内容可能已过时!