从开发到生产的完整指南

前言

将LLM应用从开发环境部署到生产环境,需要考虑性能、成本、可靠性等多方面因素。本文介绍LLM应用的部署架构、性能优化和运维最佳实践。


核心技术:推理引擎与性能优化

在生产环境中,直接使用 Transformers 库进行推理往往效率极低。现代推理引擎(如 vLLM, TGI)引入了多项关键技术。

1. PagedAttention (vLLM)

  • 问题:KV Cache(键值缓存)占用大量显存且存在碎片化。
  • 方案:借鉴操作系统的虚拟内存管理,将 KV Cache 划分为固定大小的“页”,按需分配。
  • 效果:显存利用率接近 100%,吞吐量提升 2-4 倍。

2. 连续批处理 (Continuous Batching)

  • 问题:不同请求的生成长度不一,传统批处理会导致 GPU 等待最长的请求完成。
  • 方案:在每个 Token 生成步动态插入新请求或移除已完成请求。
  • 效果:极大地提升了并发处理能力。

3. 投机采样 (Speculative Decoding)

  • 问题:大模型推理受限于显存带宽(Memory-bound)。
  • 方案:使用一个小模型(草稿模型)快速生成多个 Token,再由大模型一次性并行验证。
  • 效果:在不损失精度的情况下,推理速度提升 1.5-2.5 倍。

4. 量化技术 (Quantization)

  • AWQ / GPTQ:针对权重进行量化(4-bit/8-bit),降低显存占用。
  • FP8 推理:在 H100 等新一代 GPU 上使用 FP8 精度,兼顾速度与精度。

部署架构

基础架构设计

                    ┌─────────────────┐
                    │   Load Balancer │
                    └────────┬────────┘
                             │
         ┌───────────────────┼───────────────────┐
         ▼                   ▼                   ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│   API Gateway   │ │   API Gateway   │ │   API Gateway   │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
         │                   │                   │
         └───────────────────┼───────────────────┘
                             │
              ┌──────────────┼──────────────┐
              ▼              ▼              ▼
       ┌──────────┐   ┌──────────┐   ┌──────────┐
       │ LLM App  │   │ LLM App  │   │ LLM App  │
       │ Instance │   │ Instance │   │ Instance │
       └────┬─────┘   └────┬─────┘   └────┬─────┘
            │              │              │
            └──────────────┼──────────────┘
                           │
         ┌─────────────────┼─────────────────┐
         ▼                 ▼                 ▼
  ┌────────────┐    ┌────────────┐    ┌────────────┐
  │   Redis    │    │ Vector DB  │    │  Database  │
  │   Cache    │    │  (Milvus)  │    │  (PgSQL)   │
  └────────────┘    └────────────┘    └────────────┘

API服务实现

from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import Optional, List
import uvicorn
import asyncio
from openai import AsyncOpenAI

app = FastAPI(title="LLM Application API")

# CORS配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 请求模型
class ChatRequest(BaseModel):
    message: str = Field(..., min_length=1, max_length=10000)
    conversation_id: Optional[str] = None
    model: str = "gpt-4o-mini"
    temperature: float = Field(default=0.7, ge=0, le=2)
    max_tokens: int = Field(default=2000, ge=1, le=4000)

class ChatResponse(BaseModel):
    response: str
    conversation_id: str
    usage: dict

# 异步OpenAI客户端
client = AsyncOpenAI()

@app.post("/v1/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """聊天接口"""
    try:
        response = await client.chat.completions.create(
            model=request.model,
            messages=[
                {"role": "user", "content": request.message}
            ],
            temperature=request.temperature,
            max_tokens=request.max_tokens
        )
        
        return ChatResponse(
            response=response.choices[0].message.content,
            conversation_id=request.conversation_id or "new",
            usage={
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens
            }
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    """健康检查"""
    return {"status": "healthy"}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

流式响应

from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import json

@app.post("/v1/chat/stream")
async def chat_stream(request: ChatRequest):
    """流式聊天接口"""
    
    async def generate():
        try:
            stream = await client.chat.completions.create(
                model=request.model,
                messages=[{"role": "user", "content": request.message}],
                temperature=request.temperature,
                max_tokens=request.max_tokens,
                stream=True
            )
            
            async for chunk in stream:
                if chunk.choices[0].delta.content:
                    yield {
                        "event": "message",
                        "data": json.dumps({
                            "content": chunk.choices[0].delta.content,
                            "done": False
                        })
                    }
            
            yield {
                "event": "message",
                "data": json.dumps({"content": "", "done": True})
            }
            
        except Exception as e:
            yield {
                "event": "error",
                "data": json.dumps({"error": str(e)})
            }
    
    return EventSourceResponse(generate())

Docker部署

Dockerfile

# 多阶段构建
FROM python:3.11-slim as builder

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt

# 最终镜像
FROM python:3.11-slim

WORKDIR /app

# 复制依赖
COPY --from=builder /root/.local /root/.local
ENV PATH=/root/.local/bin:$PATH

# 复制应用代码
COPY . .

# 设置环境变量
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1

# 暴露端口
EXPOSE 8000

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

docker-compose.yml

version: '3.8'

services:
  llm-app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - REDIS_URL=redis://redis:6379
      - DATABASE_URL=postgresql://user:pass@postgres:5432/llm_app
    depends_on:
      - redis
      - postgres
    deploy:
      replicas: 3
      resources:
        limits:
          cpus: '2'
          memory: 4G
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes

  postgres:
    image: postgres:15-alpine
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass
      POSTGRES_DB: llm_app
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
    depends_on:
      - llm-app

volumes:
  redis_data:
  postgres_data:

Nginx配置

upstream llm_backend {
    least_conn;
    server llm-app:8000 weight=1;
    keepalive 32;
}

server {
    listen 80;
    server_name api.example.com;

    # 请求大小限制
    client_max_body_size 10M;

    # 超时设置(LLM响应可能较慢)
    proxy_connect_timeout 60s;
    proxy_send_timeout 120s;
    proxy_read_timeout 120s;

    location / {
        proxy_pass http://llm_backend;
        proxy_http_version 1.1;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Connection "";
        
        # SSE支持
        proxy_buffering off;
        proxy_cache off;
    }

    location /health {
        proxy_pass http://llm_backend;
        access_log off;
    }
}

Kubernetes部署

Deployment配置

apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-app
  labels:
    app: llm-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: llm-app
  template:
    metadata:
      labels:
        app: llm-app
    spec:
      containers:
      - name: llm-app
        image: your-registry/llm-app:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: llm-secrets
              key: openai-api-key
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: llm-app-service
spec:
  selector:
    app: llm-app
  ports:
  - port: 80
    targetPort: 8000
  type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: llm-app-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: llm-app
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

性能优化

缓存策略

import redis
import hashlib
import json
from typing import Optional
from functools import wraps

# Redis连接
redis_client = redis.Redis(host='localhost', port=6379, db=0)

def cache_response(ttl: int = 3600):
    """响应缓存装饰器"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 生成缓存键
            cache_key = hashlib.md5(
                json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True).encode()
            ).hexdigest()
            
            # 尝试从缓存获取
            cached = redis_client.get(cache_key)
            if cached:
                return json.loads(cached)
            
            # 调用原函数
            result = await func(*args, **kwargs)
            
            # 写入缓存
            redis_client.setex(
                cache_key,
                ttl,
                json.dumps(result)
            )
            
            return result
        return wrapper
    return decorator

# 语义缓存
class SemanticCache:
    """基于语义相似度的缓存"""
    
    def __init__(self, embeddings, vectorstore, similarity_threshold: float = 0.95):
        self.embeddings = embeddings
        self.vectorstore = vectorstore
        self.threshold = similarity_threshold
    
    async def get(self, query: str) -> Optional[str]:
        """获取语义相似的缓存结果"""
        results = self.vectorstore.similarity_search_with_score(query, k=1)
        
        if results and results[0][1] >= self.threshold:
            return results[0][0].metadata.get("response")
        return None
    
    async def set(self, query: str, response: str):
        """缓存查询和响应"""
        from langchain_core.documents import Document
        doc = Document(
            page_content=query,
            metadata={"response": response}
        )
        self.vectorstore.add_documents([doc])

请求队列与限流

from fastapi import Request, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
import asyncio
from collections import deque

# 限流器
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/v1/chat")
@limiter.limit("10/minute")  # 每分钟10次请求
async def chat_limited(request: Request, chat_request: ChatRequest):
    return await chat(chat_request)

# 请求队列
class RequestQueue:
    """异步请求队列"""
    
    def __init__(self, max_concurrent: int = 10):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.queue = deque()
    
    async def process(self, func, *args, **kwargs):
        async with self.semaphore:
            return await func(*args, **kwargs)

request_queue = RequestQueue(max_concurrent=20)

@app.post("/v1/chat/queued")
async def chat_queued(request: ChatRequest):
    """排队处理的聊天接口"""
    return await request_queue.process(chat, request)

批处理优化

from typing import List
import asyncio

class BatchProcessor:
    """批量请求处理器"""
    
    def __init__(self, batch_size: int = 10, max_wait: float = 0.1):
        self.batch_size = batch_size
        self.max_wait = max_wait
        self.queue = asyncio.Queue()
        self.processing = False
    
    async def add(self, request: dict) -> dict:
        """添加请求到批次"""
        future = asyncio.Future()
        await self.queue.put((request, future))
        
        if not self.processing:
            asyncio.create_task(self._process_batch())
        
        return await future
    
    async def _process_batch(self):
        """处理批次"""
        self.processing = True
        batch = []
        futures = []
        
        try:
            # 收集批次
            deadline = asyncio.get_event_loop().time() + self.max_wait
            
            while len(batch) < self.batch_size:
                try:
                    timeout = max(0, deadline - asyncio.get_event_loop().time())
                    request, future = await asyncio.wait_for(
                        self.queue.get(),
                        timeout=timeout
                    )
                    batch.append(request)
                    futures.append(future)
                except asyncio.TimeoutError:
                    break
            
            if batch:
                # 批量处理
                results = await self._batch_call(batch)
                
                for future, result in zip(futures, results):
                    future.set_result(result)
        
        finally:
            self.processing = False
            
            # 检查是否还有待处理的请求
            if not self.queue.empty():
                asyncio.create_task(self._process_batch())
    
    async def _batch_call(self, requests: List[dict]) -> List[dict]:
        """批量调用API"""
        # 使用asyncio.gather并行处理
        tasks = [self._single_call(req) for req in requests]
        return await asyncio.gather(*tasks)
    
    async def _single_call(self, request: dict) -> dict:
        """单个请求处理"""
        response = await client.chat.completions.create(**request)
        return {"response": response.choices[0].message.content}

成本优化

Token监控与控制

import tiktoken
from dataclasses import dataclass
from typing import List

@dataclass
class UsageStats:
    prompt_tokens: int = 0
    completion_tokens: int = 0
    total_cost: float = 0.0

class TokenManager:
    """Token使用管理"""
    
    # 价格(每1M tokens)
    PRICING = {
        "gpt-4o": {"input": 2.50, "output": 10.00},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "gpt-4-turbo": {"input": 10.00, "output": 30.00},
    }
    
    def __init__(self, daily_budget: float = 100.0):
        self.daily_budget = daily_budget
        self.daily_usage = UsageStats()
        self.encoder = tiktoken.get_encoding("cl100k_base")
    
    def count_tokens(self, text: str) -> int:
        """计算token数量"""
        return len(self.encoder.encode(text))
    
    def estimate_cost(self, model: str, prompt_tokens: int, completion_tokens: int) -> float:
        """估算成本"""
        pricing = self.PRICING.get(model, self.PRICING["gpt-4o-mini"])
        cost = (
            prompt_tokens * pricing["input"] / 1_000_000 +
            completion_tokens * pricing["output"] / 1_000_000
        )
        return cost
    
    def check_budget(self, estimated_cost: float) -> bool:
        """检查预算"""
        return (self.daily_usage.total_cost + estimated_cost) <= self.daily_budget
    
    def record_usage(self, model: str, prompt_tokens: int, completion_tokens: int):
        """记录使用量"""
        self.daily_usage.prompt_tokens += prompt_tokens
        self.daily_usage.completion_tokens += completion_tokens
        self.daily_usage.total_cost += self.estimate_cost(
            model, prompt_tokens, completion_tokens
        )
    
    def truncate_context(self, messages: List[dict], max_tokens: int) -> List[dict]:
        """截断上下文以控制token"""
        total = 0
        result = []
        
        # 从最新消息开始保留
        for msg in reversed(messages):
            tokens = self.count_tokens(msg["content"])
            if total + tokens <= max_tokens:
                result.insert(0, msg)
                total += tokens
            else:
                break
        
        return result

token_manager = TokenManager(daily_budget=50.0)

@app.post("/v1/chat/budgeted")
async def chat_with_budget(request: ChatRequest):
    """带预算控制的聊天"""
    # 估算成本
    prompt_tokens = token_manager.count_tokens(request.message)
    estimated_completion = request.max_tokens
    estimated_cost = token_manager.estimate_cost(
        request.model, prompt_tokens, estimated_completion
    )
    
    # 检查预算
    if not token_manager.check_budget(estimated_cost):
        raise HTTPException(
            status_code=429,
            detail="Daily budget exceeded"
        )
    
    # 执行请求
    response = await client.chat.completions.create(
        model=request.model,
        messages=[{"role": "user", "content": request.message}],
        max_tokens=request.max_tokens
    )
    
    # 记录实际使用量
    token_manager.record_usage(
        request.model,
        response.usage.prompt_tokens,
        response.usage.completion_tokens
    )
    
    return {
        "response": response.choices[0].message.content,
        "usage": {
            "tokens": response.usage.total_tokens,
            "cost": token_manager.estimate_cost(
                request.model,
                response.usage.prompt_tokens,
                response.usage.completion_tokens
            )
        }
    }

模型选择策略

class ModelRouter:
    """智能模型路由"""
    
    def __init__(self):
        self.models = {
            "simple": "gpt-4o-mini",      # 简单任务
            "complex": "gpt-4o",           # 复杂任务
            "creative": "gpt-4o",          # 创意任务
        }
    
    async def classify_task(self, message: str) -> str:
        """分类任务复杂度"""
        # 简单启发式规则
        if len(message) < 50:
            return "simple"
        
        complex_keywords = ["分析", "比较", "评估", "设计", "规划"]
        if any(kw in message for kw in complex_keywords):
            return "complex"
        
        creative_keywords = ["创作", "写作", "故事", ""]
        if any(kw in message for kw in creative_keywords):
            return "creative"
        
        return "simple"
    
    async def route(self, message: str) -> str:
        """选择合适的模型"""
        task_type = await self.classify_task(message)
        return self.models[task_type]

model_router = ModelRouter()

@app.post("/v1/chat/smart")
async def chat_smart(request: ChatRequest):
    """智能路由的聊天接口"""
    # 自动选择模型
    model = await model_router.route(request.message)
    
    response = await client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": request.message}],
        temperature=request.temperature,
        max_tokens=request.max_tokens
    )
    
    return {
        "response": response.choices[0].message.content,
        "model_used": model
    }

监控与告警

指标收集

from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Response
import time

# Prometheus指标
REQUEST_COUNT = Counter(
    'llm_requests_total',
    'Total LLM API requests',
    ['model', 'status']
)

REQUEST_LATENCY = Histogram(
    'llm_request_latency_seconds',
    'LLM request latency',
    ['model'],
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60]
)

TOKEN_USAGE = Counter(
    'llm_tokens_total',
    'Total tokens used',
    ['model', 'type']  # type: prompt/completion
)

ACTIVE_REQUESTS = Gauge(
    'llm_active_requests',
    'Currently active requests'
)

DAILY_COST = Gauge(
    'llm_daily_cost_dollars',
    'Daily API cost in dollars'
)

# 中间件
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    ACTIVE_REQUESTS.inc()
    start_time = time.time()
    
    try:
        response = await call_next(request)
        status = "success"
    except Exception as e:
        status = "error"
        raise
    finally:
        ACTIVE_REQUESTS.dec()
        latency = time.time() - start_time
        
        if "/chat" in request.url.path:
            REQUEST_COUNT.labels(model="default", status=status).inc()
            REQUEST_LATENCY.labels(model="default").observe(latency)
    
    return response

# 指标端点
@app.get("/metrics")
async def metrics():
    return Response(
        generate_latest(),
        media_type="text/plain"
    )

日志记录

import logging
import json
from datetime import datetime
from pythonjsonlogger import jsonlogger

# 配置JSON日志
logger = logging.getLogger("llm_app")
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
    fmt='%(asctime)s %(levelname)s %(name)s %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

class LLMLogger:
    """LLM调用日志记录"""
    
    @staticmethod
    def log_request(
        request_id: str,
        model: str,
        messages: list,
        params: dict
    ):
        logger.info("LLM Request", extra={
            "event": "llm_request",
            "request_id": request_id,
            "model": model,
            "message_count": len(messages),
            "params": params,
            "timestamp": datetime.utcnow().isoformat()
        })
    
    @staticmethod
    def log_response(
        request_id: str,
        model: str,
        latency: float,
        tokens: dict,
        success: bool,
        error: str = None
    ):
        logger.info("LLM Response", extra={
            "event": "llm_response",
            "request_id": request_id,
            "model": model,
            "latency_ms": latency * 1000,
            "prompt_tokens": tokens.get("prompt", 0),
            "completion_tokens": tokens.get("completion", 0),
            "success": success,
            "error": error,
            "timestamp": datetime.utcnow().isoformat()
        })

告警配置

# alertmanager.yml
groups:
  - name: llm-alerts
    rules:
      - alert: HighErrorRate
        expr: rate(llm_requests_total{status="error"}[5m]) > 0.1
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "High LLM error rate"
          description: "Error rate is "

      - alert: HighLatency
        expr: histogram_quantile(0.95, rate(llm_request_latency_seconds_bucket[5m])) > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High LLM latency"
          description: "P95 latency is s"

      - alert: BudgetWarning
        expr: llm_daily_cost_dollars > 80
        for: 1m
        labels:
          severity: warning
        annotations:
          summary: "API cost approaching daily limit"
          description: "Current daily cost: $"

容错与降级

重试机制

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from openai import APIError, RateLimitError, APIConnectionError

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=60),
    retry=retry_if_exception_type((RateLimitError, APIConnectionError))
)
async def call_llm_with_retry(messages: list, model: str = "gpt-4o-mini"):
    """带重试的LLM调用"""
    return await client.chat.completions.create(
        model=model,
        messages=messages
    )

降级策略

class FallbackChain:
    """降级链"""
    
    def __init__(self):
        self.providers = [
            {"name": "openai", "model": "gpt-4o", "client": OpenAI()},
            {"name": "openai", "model": "gpt-4o-mini", "client": OpenAI()},
            {"name": "local", "model": "fallback", "client": None},
        ]
    
    async def call(self, messages: list) -> dict:
        """尝试调用,失败则降级"""
        last_error = None
        
        for provider in self.providers:
            try:
                if provider["name"] == "local":
                    # 本地降级响应
                    return {
                        "response": "抱歉,服务暂时不可用,请稍后重试。",
                        "fallback": True
                    }
                
                response = await provider["client"].chat.completions.create(
                    model=provider["model"],
                    messages=messages
                )
                
                return {
                    "response": response.choices[0].message.content,
                    "model": provider["model"],
                    "fallback": False
                }
                
            except Exception as e:
                last_error = e
                logger.warning(f"Provider {provider['name']} failed: {e}")
                continue
        
        raise last_error

fallback_chain = FallbackChain()

熔断器

from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    """熔断器"""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        half_open_requests: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_requests = half_open_requests
        
        self.state = CircuitState.CLOSED
        self.failures = 0
        self.last_failure_time = None
        self.half_open_successes = 0
    
    def can_execute(self) -> bool:
        """检查是否可以执行请求"""
        if self.state == CircuitState.CLOSED:
            return True
        
        if self.state == CircuitState.OPEN:
            if datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout):
                self.state = CircuitState.HALF_OPEN
                self.half_open_successes = 0
                return True
            return False
        
        # HALF_OPEN状态
        return True
    
    def record_success(self):
        """记录成功"""
        if self.state == CircuitState.HALF_OPEN:
            self.half_open_successes += 1
            if self.half_open_successes >= self.half_open_requests:
                self.state = CircuitState.CLOSED
                self.failures = 0
        else:
            self.failures = 0
    
    def record_failure(self):
        """记录失败"""
        self.failures += 1
        self.last_failure_time = datetime.now()
        
        if self.failures >= self.failure_threshold:
            self.state = CircuitState.OPEN

circuit_breaker = CircuitBreaker()

async def call_with_circuit_breaker(messages: list):
    """带熔断器的调用"""
    if not circuit_breaker.can_execute():
        raise HTTPException(
            status_code=503,
            detail="Service temporarily unavailable"
        )
    
    try:
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages
        )
        circuit_breaker.record_success()
        return response
    except Exception as e:
        circuit_breaker.record_failure()
        raise

总结

LLM应用的生产部署需要关注:

方面 关键点
架构 水平扩展、负载均衡、异步处理
性能 缓存、批处理、流式响应
成本 Token监控、模型路由、预算控制
可靠性 重试、降级、熔断
运维 监控、日志、告警

生产环境部署是一个持续优化的过程,需要根据实际业务需求不断调整。

参考资源

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:《 LLM应用开发——部署与优化 》

本文链接:http://localhost:3015/ai/LLM%E5%BA%94%E7%94%A8%E9%83%A8%E7%BD%B2%E4%B8%8E%E4%BC%98%E5%8C%96.html

本文最后一次更新为 天前,文章中的某些内容可能已过时!