降低成本、提升响应速度的关键技术

前言

LLM API 调用成本高、延迟大,而实际应用中很多查询是重复或相似的。通过合理的缓存策略,可以显著降低成本并提升用户体验。本文将深入探讨 LLM 应用的各种缓存方案。


缓存概述

为什么 LLM 需要缓存

┌─────────────────────────────────────────────────────────────────┐
│                    LLM 缓存的价值                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  成本节省 ──────────────────────────────────────────────────   │
│  │ • GPT-4 每次调用成本可观                                      │
│  │ • 重复查询浪费资源                                            │
│  │ • 缓存命中可节省 90%+ 成本                                    │
│                                                                 │
│  性能提升 ──────────────────────────────────────────────────   │
│  │ • API 延迟 1-5 秒                                            │
│  │ • 缓存命中延迟 < 50ms                                        │
│  │ • 用户体验显著改善                                            │
│                                                                 │
│  稳定性 ────────────────────────────────────────────────────   │
│  │ • 减少对外部 API 依赖                                         │
│  │ • API 故障时可提供降级服务                                    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

缓存类型对比

缓存类型 命中条件 命中率 实现复杂度
精确匹配 完全相同
语义缓存 语义相似
模板缓存 模板相同
前缀缓存 前缀匹配

精确匹配缓存

基础实现

import hashlib
import json
from typing import Optional
from datetime import datetime, timedelta

class ExactMatchCache:
    """精确匹配缓存"""
    
    def __init__(self, ttl_hours: int = 24):
        self.cache = {}
        self.ttl = timedelta(hours=ttl_hours)
    
    def _make_key(self, messages: list, model: str) -> str:
        """生成缓存键"""
        content = json.dumps({
            "messages": messages,
            "model": model
        }, ensure_ascii=False, sort_keys=True)
        return hashlib.sha256(content.encode()).hexdigest()
    
    def get(self, messages: list, model: str) -> Optional[str]:
        """获取缓存"""
        key = self._make_key(messages, model)
        
        if key in self.cache:
            entry = self.cache[key]
            if datetime.now() - entry["timestamp"] < self.ttl:
                return entry["response"]
            else:
                del self.cache[key]
        
        return None
    
    def set(self, messages: list, model: str, response: str):
        """设置缓存"""
        key = self._make_key(messages, model)
        self.cache[key] = {
            "response": response,
            "timestamp": datetime.now()
        }

---

#### 进阶:语义缓存 (Semantic Cache)

精确匹配的命中率通常很低用户问如何学习 Python?”Python 学习路径是什么?”语义相同但精确匹配会失效语义缓存通过向量相似度解决这个问题

#### 核心流程
1.  **向量化**将用户的问题转换为 Embedding 向量
2.  **向量检索**在向量数据库中搜索相似度极高 > 0.95的历史问题
3.  **直接返回**如果找到直接返回历史答案

#### 使用 GPTCache 实现

```python
from gptcache import cache
from gptcache.adapter import openai
from gptcache.embedding import Onnx
from gptcache.manager import CacheBase, VectorBase, get_data_manager
from gptcache.similarity_evaluation.distance import SearchDistanceEvaluation

# 1. 初始化 Embedding 和 存储
onnx = Onnx()
data_manager = get_data_manager(
    CacheBase("sqlite"), 
    VectorBase("faiss", dimension=onnx.dimension)
)

# 2. 初始化缓存逻辑
cache.init(
    embedding_handler=onnx.embed,
    data_manager=data_manager,
    similarity_evaluation=SearchDistanceEvaluation(),
)

# 3. 使用适配后的 OpenAI 客户端
# 如果语义相似度达标,它将直接从缓存返回,不消耗 OpenAI Token
response = openai.ChatCompletion.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "如何学习 Python?"}]
)

行业新标准:Prompt Caching (模型端缓存)

OpenAI 和 Anthropic 最近都推出了 Prompt Caching 功能。这与应用层缓存不同,它是发生在模型服务商内部的。

原理:KV Cache 复用

当你的 Prompt 包含大量重复的前缀(如长文档、复杂的 System Prompt)时,模型服务商会缓存这部分前缀的计算结果(KV Cache)。

  • 成本:缓存部分通常有 50% 的折扣。
  • 速度:首字延迟(TTFT)显著降低,因为不需要重新计算前缀。

最佳实践

  • 将静态内容放在前面:将 System Prompt 和 Few-shot 示例放在消息列表的最前面。
  • 保持前缀一致:避免在长 Prompt 的开头插入动态变量(如当前时间),否则缓存会失效。

分布式缓存:Redis 方案

在生产环境中,单机内存缓存无法满足需求。我们需要使用 Redis 来实现分布式缓存。

import redis
import json

class RedisLLMCache:
    def __init__(self, host='localhost', port=6379):
        self.client = redis.Redis(host=host, port=port, decode_responses=True)
        
    def get_cache(self, prompt_hash):
        return self.client.get(f"llm_cache:{prompt_hash}")
        
    def set_cache(self, prompt_hash, response, expire_sec=3600):
        self.client.setex(f"llm_cache:{prompt_hash}", expire_sec, response)

# 结合语义缓存:
# 1. 计算 Prompt Embedding
# 2. 在 Redis (使用 RedisVL) 中进行向量搜索
# 3. 命中则返回

缓存的风险与挑战

  1. 时效性问题:如果底层数据更新了(如天气、股价),缓存的答案就会变错。
    • 对策:设置合理的 TTL,或者在 Prompt 中加入“当前日期”作为缓存键的一部分。
  2. 隐私泄露:如果缓存了包含用户私密信息的回答,并返回给了另一个用户,将造成严重后果。
    • 对策:缓存键必须包含 user_id,或者只缓存通用的知识类查询。
  3. 语义偏差:语义缓存可能因为相似度阈值设置不当,返回了“看似相关但实际错误”的答案。
    • 对策:设置极高的相似度阈值(如 0.98),并提供“不满意?重新生成”的按钮。

总结

缓存是 LLM 应用性能优化的“银弹”。

  • 简单场景:使用精确匹配缓存。
  • 高频交互:引入 GPTCache 进行语义缓存。
  • 长文本/复杂 Agent:利用模型服务商的 Prompt Caching
  • 企业级应用:构建基于 Redis 的分布式语义缓存体系。 self.cache[key] = { “response”: response, “timestamp”: datetime.now() }

    def stats(self) -> dict: “"”缓存统计””” return { “size”: len(self.cache), “memory_bytes”: sum( len(v[“response”].encode()) for v in self.cache.values() ) }

使用

cache = ExactMatchCache()

def cached_chat(messages: list, model: str = “gpt-4”) -> str: # 先检查缓存 cached = cache.get(messages, model) if cached: print(“缓存命中!”) return cached

# 调用 API
client = OpenAI()
response = client.chat.completions.create(
    model=model,
    messages=messages
)
result = response.choices[0].message.content

# 存入缓存
cache.set(messages, model, result)

return result ```

Redis 缓存实现

import redis
import json
import hashlib
from typing import Optional

class RedisLLMCache:
    """Redis LLM 缓存"""
    
    def __init__(
        self,
        redis_url: str = "redis://localhost:6379",
        prefix: str = "llm_cache",
        ttl_seconds: int = 86400
    ):
        self.redis = redis.from_url(redis_url)
        self.prefix = prefix
        self.ttl = ttl_seconds
        self.hits = 0
        self.misses = 0
    
    def _make_key(self, messages: list, model: str, **kwargs) -> str:
        """生成缓存键"""
        data = {
            "messages": messages,
            "model": model,
            **kwargs
        }
        content = json.dumps(data, ensure_ascii=False, sort_keys=True)
        hash_val = hashlib.sha256(content.encode()).hexdigest()
        return f"{self.prefix}:{hash_val}"
    
    def get(self, messages: list, model: str, **kwargs) -> Optional[str]:
        """获取缓存"""
        key = self._make_key(messages, model, **kwargs)
        result = self.redis.get(key)
        
        if result:
            self.hits += 1
            return result.decode('utf-8')
        
        self.misses += 1
        return None
    
    def set(self, messages: list, model: str, response: str, **kwargs):
        """设置缓存"""
        key = self._make_key(messages, model, **kwargs)
        self.redis.setex(key, self.ttl, response)
    
    def invalidate(self, messages: list, model: str, **kwargs):
        """使缓存失效"""
        key = self._make_key(messages, model, **kwargs)
        self.redis.delete(key)
    
    def clear_all(self):
        """清空所有缓存"""
        keys = self.redis.keys(f"{self.prefix}:*")
        if keys:
            self.redis.delete(*keys)
    
    def get_stats(self) -> dict:
        """获取统计信息"""
        total = self.hits + self.misses
        return {
            "hits": self.hits,
            "misses": self.misses,
            "hit_rate": self.hits / total if total > 0 else 0,
            "cache_size": len(self.redis.keys(f"{self.prefix}:*"))
        }

# 装饰器方式使用
def with_cache(cache: RedisLLMCache):
    """缓存装饰器"""
    def decorator(func):
        def wrapper(messages: list, model: str = "gpt-4", **kwargs):
            # 检查缓存
            cached = cache.get(messages, model, **kwargs)
            if cached:
                return cached
            
            # 调用原函数
            result = func(messages, model, **kwargs)
            
            # 存入缓存
            cache.set(messages, model, result, **kwargs)
            
            return result
        return wrapper
    return decorator

# 使用
cache = RedisLLMCache()

@with_cache(cache)
def chat(messages: list, model: str = "gpt-4", temperature: float = 0) -> str:
    client = OpenAI()
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=temperature
    )
    return response.choices[0].message.content

语义缓存

基于嵌入的语义缓存

import numpy as np
from typing import List, Optional, Tuple
from dataclasses import dataclass

@dataclass
class CacheEntry:
    """缓存条目"""
    query: str
    response: str
    embedding: List[float]
    timestamp: float
    metadata: dict = None

class SemanticCache:
    """语义缓存 - 基于嵌入相似度"""
    
    def __init__(
        self,
        similarity_threshold: float = 0.95,
        max_entries: int = 10000
    ):
        self.client = OpenAI()
        self.threshold = similarity_threshold
        self.max_entries = max_entries
        self.entries: List[CacheEntry] = []
        self.hits = 0
        self.misses = 0
    
    def _get_embedding(self, text: str) -> List[float]:
        """获取文本嵌入"""
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding
    
    def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
        """余弦相似度"""
        a = np.array(a)
        b = np.array(b)
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    
    def _find_similar(self, embedding: List[float]) -> Optional[Tuple[CacheEntry, float]]:
        """查找相似条目"""
        if not self.entries:
            return None
        
        best_entry = None
        best_similarity = 0
        
        for entry in self.entries:
            similarity = self._cosine_similarity(embedding, entry.embedding)
            if similarity > best_similarity:
                best_similarity = similarity
                best_entry = entry
        
        if best_similarity >= self.threshold:
            return (best_entry, best_similarity)
        
        return None
    
    def get(self, query: str) -> Optional[dict]:
        """获取语义相似的缓存"""
        embedding = self._get_embedding(query)
        result = self._find_similar(embedding)
        
        if result:
            entry, similarity = result
            self.hits += 1
            return {
                "response": entry.response,
                "original_query": entry.query,
                "similarity": similarity,
                "cached": True
            }
        
        self.misses += 1
        return None
    
    def set(self, query: str, response: str, metadata: dict = None):
        """添加缓存条目"""
        import time
        
        embedding = self._get_embedding(query)
        
        entry = CacheEntry(
            query=query,
            response=response,
            embedding=embedding,
            timestamp=time.time(),
            metadata=metadata
        )
        
        self.entries.append(entry)
        
        # 限制缓存大小
        if len(self.entries) > self.max_entries:
            # 移除最旧的条目
            self.entries = self.entries[-self.max_entries:]
    
    def get_stats(self) -> dict:
        """统计信息"""
        total = self.hits + self.misses
        return {
            "entries": len(self.entries),
            "hits": self.hits,
            "misses": self.misses,
            "hit_rate": self.hits / total if total > 0 else 0
        }

# 使用
semantic_cache = SemanticCache(similarity_threshold=0.92)

def smart_chat(query: str, model: str = "gpt-4") -> dict:
    """带语义缓存的聊天"""
    # 检查语义缓存
    cached = semantic_cache.get(query)
    if cached:
        return cached
    
    # 调用 API
    client = OpenAI()
    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": query}]
    )
    result = response.choices[0].message.content
    
    # 存入缓存
    semantic_cache.set(query, result)
    
    return {
        "response": result,
        "cached": False
    }

# 测试
print(smart_chat("什么是机器学习?"))  # 首次查询
print(smart_chat("机器学习是什么?"))  # 语义相似,命中缓存
print(smart_chat("请解释一下机器学习"))  # 语义相似,命中缓存

使用向量数据库的语义缓存

import chromadb
from chromadb.utils import embedding_functions

class ChromaSemanticCache:
    """使用 Chroma 的语义缓存"""
    
    def __init__(
        self,
        collection_name: str = "llm_cache",
        similarity_threshold: float = 0.9,
        persist_directory: str = "./cache_db"
    ):
        # 初始化 Chroma
        self.client = chromadb.PersistentClient(path=persist_directory)
        
        # 使用 OpenAI 嵌入
        self.embedding_func = embedding_functions.OpenAIEmbeddingFunction(
            model_name="text-embedding-3-small"
        )
        
        # 获取或创建集合
        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            embedding_function=self.embedding_func,
            metadata={"hnsw:space": "cosine"}
        )
        
        self.threshold = similarity_threshold
    
    def get(self, query: str) -> Optional[dict]:
        """查找语义相似的缓存"""
        results = self.collection.query(
            query_texts=[query],
            n_results=1,
            include=["documents", "metadatas", "distances"]
        )
        
        if results["distances"][0]:
            # Chroma 返回的是距离,转换为相似度
            distance = results["distances"][0][0]
            similarity = 1 - distance
            
            if similarity >= self.threshold:
                return {
                    "response": results["metadatas"][0][0]["response"],
                    "original_query": results["documents"][0][0],
                    "similarity": similarity,
                    "cached": True
                }
        
        return None
    
    def set(self, query: str, response: str, metadata: dict = None):
        """添加缓存条目"""
        import uuid
        
        doc_id = str(uuid.uuid4())
        
        meta = {"response": response}
        if metadata:
            meta.update(metadata)
        
        self.collection.add(
            ids=[doc_id],
            documents=[query],
            metadatas=[meta]
        )
    
    def clear(self):
        """清空缓存"""
        self.client.delete_collection(self.collection.name)
        self.collection = self.client.create_collection(
            name=self.collection.name,
            embedding_function=self.embedding_func
        )
    
    def get_stats(self) -> dict:
        """统计信息"""
        return {
            "count": self.collection.count()
        }

GPTCache 实战

安装和基础使用

pip install gptcache
from gptcache import cache
from gptcache.adapter import openai
from gptcache.embedding import Onnx
from gptcache.manager import CacheBase, VectorBase, get_data_manager
from gptcache.similarity_evaluation.distance import SearchDistanceEvaluation

# 初始化嵌入模型
onnx = Onnx()

# 配置缓存存储
data_manager = get_data_manager(
    CacheBase("sqlite"),
    VectorBase("faiss", dimension=onnx.dimension)
)

# 初始化缓存
cache.init(
    embedding_func=onnx.to_embeddings,
    data_manager=data_manager,
    similarity_evaluation=SearchDistanceEvaluation()
)

# 设置相似度阈值
cache.set_openai_key()

# 使用带缓存的 OpenAI 调用
def cached_openai_chat(messages: list) -> str:
    """带 GPTCache 的 OpenAI 调用"""
    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=messages
    )
    return response.choices[0].message.content

# 测试
print(cached_openai_chat([{"role": "user", "content": "什么是深度学习?"}]))
print(cached_openai_chat([{"role": "user", "content": "深度学习是什么"}]))  # 应命中缓存

高级配置

from gptcache import cache
from gptcache.embedding import OpenAI as OpenAIEmbedding
from gptcache.manager import get_data_manager, CacheBase, VectorBase
from gptcache.similarity_evaluation import ExactMatchEvaluation
from gptcache.processor.pre import get_prompt
from gptcache.processor.post import first

# 使用 OpenAI 嵌入
embedding = OpenAIEmbedding()

# 使用 Redis 存储
cache_base = CacheBase(
    "redis",
    redis_config={
        "host": "localhost",
        "port": 6379,
        "db": 0
    }
)

# 使用 Milvus 向量存储
vector_base = VectorBase(
    "milvus",
    host="localhost",
    port="19530",
    dimension=embedding.dimension
)

data_manager = get_data_manager(cache_base, vector_base)

# 自定义预处理:只使用最后一条消息作为缓存键
def custom_pre_func(data, **params):
    messages = data.get("messages", [])
    if messages:
        return messages[-1]["content"]
    return ""

# 初始化缓存
cache.init(
    pre_embedding_func=custom_pre_func,
    embedding_func=embedding.to_embeddings,
    data_manager=data_manager,
    similarity_evaluation=ExactMatchEvaluation(),
    post_process_messages_func=first
)

KV Cache 优化

Prompt 前缀缓存

from typing import Dict, List
import hashlib

class PrefixCache:
    """Prompt 前缀缓存"""
    
    def __init__(self):
        self.prefix_cache: Dict[str, dict] = {}
    
    def _hash_prefix(self, messages: List[dict]) -> str:
        """计算消息前缀的哈希"""
        content = json.dumps(messages, ensure_ascii=False)
        return hashlib.sha256(content.encode()).hexdigest()[:16]
    
    def get_cached_prefix(self, messages: List[dict]) -> tuple:
        """查找可复用的前缀"""
        for i in range(len(messages), 0, -1):
            prefix = messages[:i]
            prefix_hash = self._hash_prefix(prefix)
            
            if prefix_hash in self.prefix_cache:
                return (
                    prefix_hash,
                    self.prefix_cache[prefix_hash],
                    messages[i:]  # 剩余消息
                )
        
        return (None, None, messages)
    
    def cache_prefix(self, messages: List[dict], context: dict):
        """缓存前缀"""
        prefix_hash = self._hash_prefix(messages)
        self.prefix_cache[prefix_hash] = context

# 模拟带前缀缓存的调用
class PrefixCachedClient:
    """支持前缀缓存的客户端"""
    
    def __init__(self):
        self.client = OpenAI()
        self.prefix_cache = PrefixCache()
        self.system_prompts_cache = {}
    
    def chat(
        self,
        messages: List[dict],
        model: str = "gpt-4",
        system_prompt: str = None
    ) -> str:
        """带前缀缓存的聊天"""
        
        # 构建完整消息
        full_messages = []
        
        if system_prompt:
            # 系统提示通常可以缓存
            if system_prompt not in self.system_prompts_cache:
                self.system_prompts_cache[system_prompt] = {
                    "role": "system",
                    "content": system_prompt
                }
            full_messages.append(self.system_prompts_cache[system_prompt])
        
        full_messages.extend(messages)
        
        # 调用 API
        response = self.client.chat.completions.create(
            model=model,
            messages=full_messages
        )
        
        return response.choices[0].message.content
    
    def batch_chat(
        self,
        queries: List[str],
        system_prompt: str,
        model: str = "gpt-4"
    ) -> List[str]:
        """批量调用,复用系统提示"""
        results = []
        
        for query in queries:
            messages = [{"role": "user", "content": query}]
            result = self.chat(messages, model, system_prompt)
            results.append(result)
        
        return results

对话历史压缩缓存

class ConversationCache:
    """对话历史压缩缓存"""
    
    def __init__(self, max_history: int = 10):
        self.client = OpenAI()
        self.max_history = max_history
        self.summary_cache: Dict[str, str] = {}  # session_id -> summary
    
    def _create_summary(self, messages: List[dict]) -> str:
        """创建对话摘要"""
        history_text = "\n".join([
            f"{m['role']}: {m['content']}"
            for m in messages
        ])
        
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": "简洁地总结以下对话的要点,保留关键信息。"
                },
                {"role": "user", "content": history_text}
            ],
            max_tokens=200
        )
        
        return response.choices[0].message.content
    
    def get_context(
        self,
        session_id: str,
        recent_messages: List[dict]
    ) -> List[dict]:
        """获取带摘要的上下文"""
        context = []
        
        # 添加历史摘要
        if session_id in self.summary_cache:
            context.append({
                "role": "system",
                "content": f"之前的对话摘要:{self.summary_cache[session_id]}"
            })
        
        # 添加最近消息
        context.extend(recent_messages[-self.max_history:])
        
        return context
    
    def update_summary(
        self,
        session_id: str,
        old_messages: List[dict]
    ):
        """更新摘要"""
        if len(old_messages) > self.max_history:
            messages_to_summarize = old_messages[:-self.max_history]
            
            new_summary = self._create_summary(messages_to_summarize)
            
            if session_id in self.summary_cache:
                # 合并旧摘要
                self.summary_cache[session_id] = f"{self.summary_cache[session_id]}\n{new_summary}"
            else:
                self.summary_cache[session_id] = new_summary

分布式缓存

多级缓存架构

from typing import Optional
from abc import ABC, abstractmethod

class CacheLayer(ABC):
    """缓存层抽象"""
    
    @abstractmethod
    def get(self, key: str) -> Optional[str]:
        pass
    
    @abstractmethod
    def set(self, key: str, value: str, ttl: int = None):
        pass

class LocalMemoryCache(CacheLayer):
    """本地内存缓存"""
    
    def __init__(self, max_size: int = 1000):
        from collections import OrderedDict
        self.cache = OrderedDict()
        self.max_size = max_size
    
    def get(self, key: str) -> Optional[str]:
        if key in self.cache:
            # 移到末尾(LRU)
            self.cache.move_to_end(key)
            return self.cache[key]
        return None
    
    def set(self, key: str, value: str, ttl: int = None):
        if key in self.cache:
            self.cache.move_to_end(key)
        else:
            if len(self.cache) >= self.max_size:
                self.cache.popitem(last=False)
        self.cache[key] = value

class RedisCache(CacheLayer):
    """Redis 缓存"""
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        import redis
        self.redis = redis.from_url(redis_url)
    
    def get(self, key: str) -> Optional[str]:
        result = self.redis.get(key)
        return result.decode() if result else None
    
    def set(self, key: str, value: str, ttl: int = 3600):
        self.redis.setex(key, ttl, value)

class MultiLevelCache:
    """多级缓存"""
    
    def __init__(self, layers: List[CacheLayer]):
        self.layers = layers
    
    def get(self, key: str) -> Optional[str]:
        for i, layer in enumerate(self.layers):
            value = layer.get(key)
            if value is not None:
                # 回填上层缓存
                for j in range(i):
                    self.layers[j].set(key, value)
                return value
        return None
    
    def set(self, key: str, value: str, ttl: int = 3600):
        for layer in self.layers:
            layer.set(key, value, ttl)

# 使用
cache = MultiLevelCache([
    LocalMemoryCache(max_size=100),  # L1: 本地内存
    RedisCache()                      # L2: Redis
])

缓存策略

缓存键设计

class CacheKeyBuilder:
    """缓存键构建器"""
    
    @staticmethod
    def build_key(
        messages: List[dict],
        model: str,
        temperature: float = 0,
        **kwargs
    ) -> str:
        """构建规范化的缓存键"""
        # 规范化消息
        normalized_messages = []
        for msg in messages:
            normalized = {
                "role": msg["role"],
                "content": msg["content"].strip().lower()
            }
            normalized_messages.append(normalized)
        
        # 只在 temperature=0 时才缓存(确定性输出)
        if temperature > 0:
            return None
        
        key_data = {
            "messages": normalized_messages,
            "model": model,
            "temperature": temperature
        }
        
        # 添加其他相关参数
        for k, v in kwargs.items():
            if k in ["max_tokens", "top_p", "frequency_penalty"]:
                key_data[k] = v
        
        content = json.dumps(key_data, ensure_ascii=False, sort_keys=True)
        return hashlib.sha256(content.encode()).hexdigest()

缓存失效策略

class CacheInvalidation:
    """缓存失效管理"""
    
    def __init__(self, cache: RedisLLMCache):
        self.cache = cache
        self.version = "v1"  # 缓存版本
    
    def invalidate_by_pattern(self, pattern: str):
        """按模式失效缓存"""
        keys = self.cache.redis.keys(f"{self.cache.prefix}:{pattern}*")
        if keys:
            self.cache.redis.delete(*keys)
    
    def invalidate_by_time(self, before_timestamp: float):
        """失效指定时间之前的缓存"""
        # 需要在缓存中存储时间戳
        pass
    
    def bump_version(self):
        """升级缓存版本(使所有旧缓存失效)"""
        self.version = f"v{int(self.version[1:]) + 1}"
    
    def get_versioned_key(self, key: str) -> str:
        """获取带版本的键"""
        return f"{self.version}:{key}"

最佳实践

何时使用缓存

场景 适合缓存 原因
常见问题回答 查询重复度高
文档摘要 同一文档结果固定
翻译 结果确定
创意写作 需要多样性
实时数据分析 数据时效性
对话聊天 部分 可缓存系统提示

缓存配置建议

# 推荐配置
CACHE_CONFIG = {
    # 精确匹配缓存
    "exact_match": {
        "enabled": True,
        "ttl_hours": 24,
        "max_entries": 10000
    },
    
    # 语义缓存
    "semantic": {
        "enabled": True,
        "similarity_threshold": 0.92,  # 不要太低
        "embedding_model": "text-embedding-3-small"
    },
    
    # 监控
    "monitoring": {
        "track_hit_rate": True,
        "track_latency": True,
        "alert_hit_rate_below": 0.3  # 命中率过低告警
    }
}

总结

缓存是 LLM 应用优化的关键技术:

缓存类型 命中率 实现难度 推荐场景
精确匹配 简单 高重复查询
语义缓存 中等 通用场景
前缀缓存 中等 长上下文
GPTCache 简单 快速集成

实施建议:

  1. 先实现精确匹配缓存
  2. 评估命中率决定是否升级
  3. 监控缓存效果
  4. 设置合理的失效策略

参考资源

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:《 LLM应用开发——缓存策略详解 》

本文链接:http://localhost:3015/ai/%E7%BC%93%E5%AD%98%E7%AD%96%E7%95%A5.html

本文最后一次更新为 天前,文章中的某些内容可能已过时!