降低成本、提升响应速度的关键技术
前言
LLM API 调用成本高、延迟大,而实际应用中很多查询是重复或相似的。通过合理的缓存策略,可以显著降低成本并提升用户体验。本文将深入探讨 LLM 应用的各种缓存方案。
缓存概述
为什么 LLM 需要缓存
┌─────────────────────────────────────────────────────────────────┐
│ LLM 缓存的价值 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 成本节省 ────────────────────────────────────────────────── │
│ │ • GPT-4 每次调用成本可观 │
│ │ • 重复查询浪费资源 │
│ │ • 缓存命中可节省 90%+ 成本 │
│ │
│ 性能提升 ────────────────────────────────────────────────── │
│ │ • API 延迟 1-5 秒 │
│ │ • 缓存命中延迟 < 50ms │
│ │ • 用户体验显著改善 │
│ │
│ 稳定性 ──────────────────────────────────────────────────── │
│ │ • 减少对外部 API 依赖 │
│ │ • API 故障时可提供降级服务 │
│ │
└─────────────────────────────────────────────────────────────────┘
缓存类型对比
| 缓存类型 | 命中条件 | 命中率 | 实现复杂度 |
|---|---|---|---|
| 精确匹配 | 完全相同 | 低 | 低 |
| 语义缓存 | 语义相似 | 高 | 中 |
| 模板缓存 | 模板相同 | 中 | 低 |
| 前缀缓存 | 前缀匹配 | 中 | 中 |
精确匹配缓存
基础实现
import hashlib
import json
from typing import Optional
from datetime import datetime, timedelta
class ExactMatchCache:
"""精确匹配缓存"""
def __init__(self, ttl_hours: int = 24):
self.cache = {}
self.ttl = timedelta(hours=ttl_hours)
def _make_key(self, messages: list, model: str) -> str:
"""生成缓存键"""
content = json.dumps({
"messages": messages,
"model": model
}, ensure_ascii=False, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
def get(self, messages: list, model: str) -> Optional[str]:
"""获取缓存"""
key = self._make_key(messages, model)
if key in self.cache:
entry = self.cache[key]
if datetime.now() - entry["timestamp"] < self.ttl:
return entry["response"]
else:
del self.cache[key]
return None
def set(self, messages: list, model: str, response: str):
"""设置缓存"""
key = self._make_key(messages, model)
self.cache[key] = {
"response": response,
"timestamp": datetime.now()
}
---
#### 进阶:语义缓存 (Semantic Cache)
精确匹配的命中率通常很低。用户问“如何学习 Python?”和“Python 学习路径是什么?”语义相同,但精确匹配会失效。语义缓存通过向量相似度解决这个问题。
#### 核心流程
1. **向量化**:将用户的问题转换为 Embedding 向量。
2. **向量检索**:在向量数据库中搜索相似度极高(如 > 0.95)的历史问题。
3. **直接返回**:如果找到,直接返回历史答案。
#### 使用 GPTCache 实现
```python
from gptcache import cache
from gptcache.adapter import openai
from gptcache.embedding import Onnx
from gptcache.manager import CacheBase, VectorBase, get_data_manager
from gptcache.similarity_evaluation.distance import SearchDistanceEvaluation
# 1. 初始化 Embedding 和 存储
onnx = Onnx()
data_manager = get_data_manager(
CacheBase("sqlite"),
VectorBase("faiss", dimension=onnx.dimension)
)
# 2. 初始化缓存逻辑
cache.init(
embedding_handler=onnx.embed,
data_manager=data_manager,
similarity_evaluation=SearchDistanceEvaluation(),
)
# 3. 使用适配后的 OpenAI 客户端
# 如果语义相似度达标,它将直接从缓存返回,不消耗 OpenAI Token
response = openai.ChatCompletion.create(
model="gpt-4o",
messages=[{"role": "user", "content": "如何学习 Python?"}]
)
行业新标准:Prompt Caching (模型端缓存)
OpenAI 和 Anthropic 最近都推出了 Prompt Caching 功能。这与应用层缓存不同,它是发生在模型服务商内部的。
原理:KV Cache 复用
当你的 Prompt 包含大量重复的前缀(如长文档、复杂的 System Prompt)时,模型服务商会缓存这部分前缀的计算结果(KV Cache)。
- 成本:缓存部分通常有 50% 的折扣。
- 速度:首字延迟(TTFT)显著降低,因为不需要重新计算前缀。
最佳实践
- 将静态内容放在前面:将 System Prompt 和 Few-shot 示例放在消息列表的最前面。
- 保持前缀一致:避免在长 Prompt 的开头插入动态变量(如当前时间),否则缓存会失效。
分布式缓存:Redis 方案
在生产环境中,单机内存缓存无法满足需求。我们需要使用 Redis 来实现分布式缓存。
import redis
import json
class RedisLLMCache:
def __init__(self, host='localhost', port=6379):
self.client = redis.Redis(host=host, port=port, decode_responses=True)
def get_cache(self, prompt_hash):
return self.client.get(f"llm_cache:{prompt_hash}")
def set_cache(self, prompt_hash, response, expire_sec=3600):
self.client.setex(f"llm_cache:{prompt_hash}", expire_sec, response)
# 结合语义缓存:
# 1. 计算 Prompt Embedding
# 2. 在 Redis (使用 RedisVL) 中进行向量搜索
# 3. 命中则返回
缓存的风险与挑战
-
时效性问题:如果底层数据更新了(如天气、股价),缓存的答案就会变错。
- 对策:设置合理的 TTL,或者在 Prompt 中加入“当前日期”作为缓存键的一部分。
-
隐私泄露:如果缓存了包含用户私密信息的回答,并返回给了另一个用户,将造成严重后果。
-
对策:缓存键必须包含
user_id,或者只缓存通用的知识类查询。
-
对策:缓存键必须包含
-
语义偏差:语义缓存可能因为相似度阈值设置不当,返回了“看似相关但实际错误”的答案。
- 对策:设置极高的相似度阈值(如 0.98),并提供“不满意?重新生成”的按钮。
总结
缓存是 LLM 应用性能优化的“银弹”。
- 简单场景:使用精确匹配缓存。
- 高频交互:引入 GPTCache 进行语义缓存。
- 长文本/复杂 Agent:利用模型服务商的 Prompt Caching。
-
企业级应用:构建基于 Redis 的分布式语义缓存体系。 self.cache[key] = { “response”: response, “timestamp”: datetime.now() }
def stats(self) -> dict: “"”缓存统计””” return { “size”: len(self.cache), “memory_bytes”: sum( len(v[“response”].encode()) for v in self.cache.values() ) }
使用
cache = ExactMatchCache()
def cached_chat(messages: list, model: str = “gpt-4”) -> str: # 先检查缓存 cached = cache.get(messages, model) if cached: print(“缓存命中!”) return cached
# 调用 API
client = OpenAI()
response = client.chat.completions.create(
model=model,
messages=messages
)
result = response.choices[0].message.content
# 存入缓存
cache.set(messages, model, result)
return result ```
Redis 缓存实现
import redis
import json
import hashlib
from typing import Optional
class RedisLLMCache:
"""Redis LLM 缓存"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
prefix: str = "llm_cache",
ttl_seconds: int = 86400
):
self.redis = redis.from_url(redis_url)
self.prefix = prefix
self.ttl = ttl_seconds
self.hits = 0
self.misses = 0
def _make_key(self, messages: list, model: str, **kwargs) -> str:
"""生成缓存键"""
data = {
"messages": messages,
"model": model,
**kwargs
}
content = json.dumps(data, ensure_ascii=False, sort_keys=True)
hash_val = hashlib.sha256(content.encode()).hexdigest()
return f"{self.prefix}:{hash_val}"
def get(self, messages: list, model: str, **kwargs) -> Optional[str]:
"""获取缓存"""
key = self._make_key(messages, model, **kwargs)
result = self.redis.get(key)
if result:
self.hits += 1
return result.decode('utf-8')
self.misses += 1
return None
def set(self, messages: list, model: str, response: str, **kwargs):
"""设置缓存"""
key = self._make_key(messages, model, **kwargs)
self.redis.setex(key, self.ttl, response)
def invalidate(self, messages: list, model: str, **kwargs):
"""使缓存失效"""
key = self._make_key(messages, model, **kwargs)
self.redis.delete(key)
def clear_all(self):
"""清空所有缓存"""
keys = self.redis.keys(f"{self.prefix}:*")
if keys:
self.redis.delete(*keys)
def get_stats(self) -> dict:
"""获取统计信息"""
total = self.hits + self.misses
return {
"hits": self.hits,
"misses": self.misses,
"hit_rate": self.hits / total if total > 0 else 0,
"cache_size": len(self.redis.keys(f"{self.prefix}:*"))
}
# 装饰器方式使用
def with_cache(cache: RedisLLMCache):
"""缓存装饰器"""
def decorator(func):
def wrapper(messages: list, model: str = "gpt-4", **kwargs):
# 检查缓存
cached = cache.get(messages, model, **kwargs)
if cached:
return cached
# 调用原函数
result = func(messages, model, **kwargs)
# 存入缓存
cache.set(messages, model, result, **kwargs)
return result
return wrapper
return decorator
# 使用
cache = RedisLLMCache()
@with_cache(cache)
def chat(messages: list, model: str = "gpt-4", temperature: float = 0) -> str:
client = OpenAI()
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature
)
return response.choices[0].message.content
语义缓存
基于嵌入的语义缓存
import numpy as np
from typing import List, Optional, Tuple
from dataclasses import dataclass
@dataclass
class CacheEntry:
"""缓存条目"""
query: str
response: str
embedding: List[float]
timestamp: float
metadata: dict = None
class SemanticCache:
"""语义缓存 - 基于嵌入相似度"""
def __init__(
self,
similarity_threshold: float = 0.95,
max_entries: int = 10000
):
self.client = OpenAI()
self.threshold = similarity_threshold
self.max_entries = max_entries
self.entries: List[CacheEntry] = []
self.hits = 0
self.misses = 0
def _get_embedding(self, text: str) -> List[float]:
"""获取文本嵌入"""
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
"""余弦相似度"""
a = np.array(a)
b = np.array(b)
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def _find_similar(self, embedding: List[float]) -> Optional[Tuple[CacheEntry, float]]:
"""查找相似条目"""
if not self.entries:
return None
best_entry = None
best_similarity = 0
for entry in self.entries:
similarity = self._cosine_similarity(embedding, entry.embedding)
if similarity > best_similarity:
best_similarity = similarity
best_entry = entry
if best_similarity >= self.threshold:
return (best_entry, best_similarity)
return None
def get(self, query: str) -> Optional[dict]:
"""获取语义相似的缓存"""
embedding = self._get_embedding(query)
result = self._find_similar(embedding)
if result:
entry, similarity = result
self.hits += 1
return {
"response": entry.response,
"original_query": entry.query,
"similarity": similarity,
"cached": True
}
self.misses += 1
return None
def set(self, query: str, response: str, metadata: dict = None):
"""添加缓存条目"""
import time
embedding = self._get_embedding(query)
entry = CacheEntry(
query=query,
response=response,
embedding=embedding,
timestamp=time.time(),
metadata=metadata
)
self.entries.append(entry)
# 限制缓存大小
if len(self.entries) > self.max_entries:
# 移除最旧的条目
self.entries = self.entries[-self.max_entries:]
def get_stats(self) -> dict:
"""统计信息"""
total = self.hits + self.misses
return {
"entries": len(self.entries),
"hits": self.hits,
"misses": self.misses,
"hit_rate": self.hits / total if total > 0 else 0
}
# 使用
semantic_cache = SemanticCache(similarity_threshold=0.92)
def smart_chat(query: str, model: str = "gpt-4") -> dict:
"""带语义缓存的聊天"""
# 检查语义缓存
cached = semantic_cache.get(query)
if cached:
return cached
# 调用 API
client = OpenAI()
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": query}]
)
result = response.choices[0].message.content
# 存入缓存
semantic_cache.set(query, result)
return {
"response": result,
"cached": False
}
# 测试
print(smart_chat("什么是机器学习?")) # 首次查询
print(smart_chat("机器学习是什么?")) # 语义相似,命中缓存
print(smart_chat("请解释一下机器学习")) # 语义相似,命中缓存
使用向量数据库的语义缓存
import chromadb
from chromadb.utils import embedding_functions
class ChromaSemanticCache:
"""使用 Chroma 的语义缓存"""
def __init__(
self,
collection_name: str = "llm_cache",
similarity_threshold: float = 0.9,
persist_directory: str = "./cache_db"
):
# 初始化 Chroma
self.client = chromadb.PersistentClient(path=persist_directory)
# 使用 OpenAI 嵌入
self.embedding_func = embedding_functions.OpenAIEmbeddingFunction(
model_name="text-embedding-3-small"
)
# 获取或创建集合
self.collection = self.client.get_or_create_collection(
name=collection_name,
embedding_function=self.embedding_func,
metadata={"hnsw:space": "cosine"}
)
self.threshold = similarity_threshold
def get(self, query: str) -> Optional[dict]:
"""查找语义相似的缓存"""
results = self.collection.query(
query_texts=[query],
n_results=1,
include=["documents", "metadatas", "distances"]
)
if results["distances"][0]:
# Chroma 返回的是距离,转换为相似度
distance = results["distances"][0][0]
similarity = 1 - distance
if similarity >= self.threshold:
return {
"response": results["metadatas"][0][0]["response"],
"original_query": results["documents"][0][0],
"similarity": similarity,
"cached": True
}
return None
def set(self, query: str, response: str, metadata: dict = None):
"""添加缓存条目"""
import uuid
doc_id = str(uuid.uuid4())
meta = {"response": response}
if metadata:
meta.update(metadata)
self.collection.add(
ids=[doc_id],
documents=[query],
metadatas=[meta]
)
def clear(self):
"""清空缓存"""
self.client.delete_collection(self.collection.name)
self.collection = self.client.create_collection(
name=self.collection.name,
embedding_function=self.embedding_func
)
def get_stats(self) -> dict:
"""统计信息"""
return {
"count": self.collection.count()
}
GPTCache 实战
安装和基础使用
pip install gptcache
from gptcache import cache
from gptcache.adapter import openai
from gptcache.embedding import Onnx
from gptcache.manager import CacheBase, VectorBase, get_data_manager
from gptcache.similarity_evaluation.distance import SearchDistanceEvaluation
# 初始化嵌入模型
onnx = Onnx()
# 配置缓存存储
data_manager = get_data_manager(
CacheBase("sqlite"),
VectorBase("faiss", dimension=onnx.dimension)
)
# 初始化缓存
cache.init(
embedding_func=onnx.to_embeddings,
data_manager=data_manager,
similarity_evaluation=SearchDistanceEvaluation()
)
# 设置相似度阈值
cache.set_openai_key()
# 使用带缓存的 OpenAI 调用
def cached_openai_chat(messages: list) -> str:
"""带 GPTCache 的 OpenAI 调用"""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=messages
)
return response.choices[0].message.content
# 测试
print(cached_openai_chat([{"role": "user", "content": "什么是深度学习?"}]))
print(cached_openai_chat([{"role": "user", "content": "深度学习是什么"}])) # 应命中缓存
高级配置
from gptcache import cache
from gptcache.embedding import OpenAI as OpenAIEmbedding
from gptcache.manager import get_data_manager, CacheBase, VectorBase
from gptcache.similarity_evaluation import ExactMatchEvaluation
from gptcache.processor.pre import get_prompt
from gptcache.processor.post import first
# 使用 OpenAI 嵌入
embedding = OpenAIEmbedding()
# 使用 Redis 存储
cache_base = CacheBase(
"redis",
redis_config={
"host": "localhost",
"port": 6379,
"db": 0
}
)
# 使用 Milvus 向量存储
vector_base = VectorBase(
"milvus",
host="localhost",
port="19530",
dimension=embedding.dimension
)
data_manager = get_data_manager(cache_base, vector_base)
# 自定义预处理:只使用最后一条消息作为缓存键
def custom_pre_func(data, **params):
messages = data.get("messages", [])
if messages:
return messages[-1]["content"]
return ""
# 初始化缓存
cache.init(
pre_embedding_func=custom_pre_func,
embedding_func=embedding.to_embeddings,
data_manager=data_manager,
similarity_evaluation=ExactMatchEvaluation(),
post_process_messages_func=first
)
KV Cache 优化
Prompt 前缀缓存
from typing import Dict, List
import hashlib
class PrefixCache:
"""Prompt 前缀缓存"""
def __init__(self):
self.prefix_cache: Dict[str, dict] = {}
def _hash_prefix(self, messages: List[dict]) -> str:
"""计算消息前缀的哈希"""
content = json.dumps(messages, ensure_ascii=False)
return hashlib.sha256(content.encode()).hexdigest()[:16]
def get_cached_prefix(self, messages: List[dict]) -> tuple:
"""查找可复用的前缀"""
for i in range(len(messages), 0, -1):
prefix = messages[:i]
prefix_hash = self._hash_prefix(prefix)
if prefix_hash in self.prefix_cache:
return (
prefix_hash,
self.prefix_cache[prefix_hash],
messages[i:] # 剩余消息
)
return (None, None, messages)
def cache_prefix(self, messages: List[dict], context: dict):
"""缓存前缀"""
prefix_hash = self._hash_prefix(messages)
self.prefix_cache[prefix_hash] = context
# 模拟带前缀缓存的调用
class PrefixCachedClient:
"""支持前缀缓存的客户端"""
def __init__(self):
self.client = OpenAI()
self.prefix_cache = PrefixCache()
self.system_prompts_cache = {}
def chat(
self,
messages: List[dict],
model: str = "gpt-4",
system_prompt: str = None
) -> str:
"""带前缀缓存的聊天"""
# 构建完整消息
full_messages = []
if system_prompt:
# 系统提示通常可以缓存
if system_prompt not in self.system_prompts_cache:
self.system_prompts_cache[system_prompt] = {
"role": "system",
"content": system_prompt
}
full_messages.append(self.system_prompts_cache[system_prompt])
full_messages.extend(messages)
# 调用 API
response = self.client.chat.completions.create(
model=model,
messages=full_messages
)
return response.choices[0].message.content
def batch_chat(
self,
queries: List[str],
system_prompt: str,
model: str = "gpt-4"
) -> List[str]:
"""批量调用,复用系统提示"""
results = []
for query in queries:
messages = [{"role": "user", "content": query}]
result = self.chat(messages, model, system_prompt)
results.append(result)
return results
对话历史压缩缓存
class ConversationCache:
"""对话历史压缩缓存"""
def __init__(self, max_history: int = 10):
self.client = OpenAI()
self.max_history = max_history
self.summary_cache: Dict[str, str] = {} # session_id -> summary
def _create_summary(self, messages: List[dict]) -> str:
"""创建对话摘要"""
history_text = "\n".join([
f"{m['role']}: {m['content']}"
for m in messages
])
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "简洁地总结以下对话的要点,保留关键信息。"
},
{"role": "user", "content": history_text}
],
max_tokens=200
)
return response.choices[0].message.content
def get_context(
self,
session_id: str,
recent_messages: List[dict]
) -> List[dict]:
"""获取带摘要的上下文"""
context = []
# 添加历史摘要
if session_id in self.summary_cache:
context.append({
"role": "system",
"content": f"之前的对话摘要:{self.summary_cache[session_id]}"
})
# 添加最近消息
context.extend(recent_messages[-self.max_history:])
return context
def update_summary(
self,
session_id: str,
old_messages: List[dict]
):
"""更新摘要"""
if len(old_messages) > self.max_history:
messages_to_summarize = old_messages[:-self.max_history]
new_summary = self._create_summary(messages_to_summarize)
if session_id in self.summary_cache:
# 合并旧摘要
self.summary_cache[session_id] = f"{self.summary_cache[session_id]}\n{new_summary}"
else:
self.summary_cache[session_id] = new_summary
分布式缓存
多级缓存架构
from typing import Optional
from abc import ABC, abstractmethod
class CacheLayer(ABC):
"""缓存层抽象"""
@abstractmethod
def get(self, key: str) -> Optional[str]:
pass
@abstractmethod
def set(self, key: str, value: str, ttl: int = None):
pass
class LocalMemoryCache(CacheLayer):
"""本地内存缓存"""
def __init__(self, max_size: int = 1000):
from collections import OrderedDict
self.cache = OrderedDict()
self.max_size = max_size
def get(self, key: str) -> Optional[str]:
if key in self.cache:
# 移到末尾(LRU)
self.cache.move_to_end(key)
return self.cache[key]
return None
def set(self, key: str, value: str, ttl: int = None):
if key in self.cache:
self.cache.move_to_end(key)
else:
if len(self.cache) >= self.max_size:
self.cache.popitem(last=False)
self.cache[key] = value
class RedisCache(CacheLayer):
"""Redis 缓存"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
import redis
self.redis = redis.from_url(redis_url)
def get(self, key: str) -> Optional[str]:
result = self.redis.get(key)
return result.decode() if result else None
def set(self, key: str, value: str, ttl: int = 3600):
self.redis.setex(key, ttl, value)
class MultiLevelCache:
"""多级缓存"""
def __init__(self, layers: List[CacheLayer]):
self.layers = layers
def get(self, key: str) -> Optional[str]:
for i, layer in enumerate(self.layers):
value = layer.get(key)
if value is not None:
# 回填上层缓存
for j in range(i):
self.layers[j].set(key, value)
return value
return None
def set(self, key: str, value: str, ttl: int = 3600):
for layer in self.layers:
layer.set(key, value, ttl)
# 使用
cache = MultiLevelCache([
LocalMemoryCache(max_size=100), # L1: 本地内存
RedisCache() # L2: Redis
])
缓存策略
缓存键设计
class CacheKeyBuilder:
"""缓存键构建器"""
@staticmethod
def build_key(
messages: List[dict],
model: str,
temperature: float = 0,
**kwargs
) -> str:
"""构建规范化的缓存键"""
# 规范化消息
normalized_messages = []
for msg in messages:
normalized = {
"role": msg["role"],
"content": msg["content"].strip().lower()
}
normalized_messages.append(normalized)
# 只在 temperature=0 时才缓存(确定性输出)
if temperature > 0:
return None
key_data = {
"messages": normalized_messages,
"model": model,
"temperature": temperature
}
# 添加其他相关参数
for k, v in kwargs.items():
if k in ["max_tokens", "top_p", "frequency_penalty"]:
key_data[k] = v
content = json.dumps(key_data, ensure_ascii=False, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
缓存失效策略
class CacheInvalidation:
"""缓存失效管理"""
def __init__(self, cache: RedisLLMCache):
self.cache = cache
self.version = "v1" # 缓存版本
def invalidate_by_pattern(self, pattern: str):
"""按模式失效缓存"""
keys = self.cache.redis.keys(f"{self.cache.prefix}:{pattern}*")
if keys:
self.cache.redis.delete(*keys)
def invalidate_by_time(self, before_timestamp: float):
"""失效指定时间之前的缓存"""
# 需要在缓存中存储时间戳
pass
def bump_version(self):
"""升级缓存版本(使所有旧缓存失效)"""
self.version = f"v{int(self.version[1:]) + 1}"
def get_versioned_key(self, key: str) -> str:
"""获取带版本的键"""
return f"{self.version}:{key}"
最佳实践
何时使用缓存
| 场景 | 适合缓存 | 原因 |
|---|---|---|
| 常见问题回答 | ✅ | 查询重复度高 |
| 文档摘要 | ✅ | 同一文档结果固定 |
| 翻译 | ✅ | 结果确定 |
| 创意写作 | ❌ | 需要多样性 |
| 实时数据分析 | ❌ | 数据时效性 |
| 对话聊天 | 部分 | 可缓存系统提示 |
缓存配置建议
# 推荐配置
CACHE_CONFIG = {
# 精确匹配缓存
"exact_match": {
"enabled": True,
"ttl_hours": 24,
"max_entries": 10000
},
# 语义缓存
"semantic": {
"enabled": True,
"similarity_threshold": 0.92, # 不要太低
"embedding_model": "text-embedding-3-small"
},
# 监控
"monitoring": {
"track_hit_rate": True,
"track_latency": True,
"alert_hit_rate_below": 0.3 # 命中率过低告警
}
}
总结
缓存是 LLM 应用优化的关键技术:
| 缓存类型 | 命中率 | 实现难度 | 推荐场景 |
|---|---|---|---|
| 精确匹配 | 低 | 简单 | 高重复查询 |
| 语义缓存 | 高 | 中等 | 通用场景 |
| 前缀缓存 | 中 | 中等 | 长上下文 |
| GPTCache | 高 | 简单 | 快速集成 |
实施建议:
- 先实现精确匹配缓存
- 评估命中率决定是否升级
- 监控缓存效果
- 设置合理的失效策略
参考资源
版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。
(采用 CC BY-NC-SA 4.0 许可协议进行授权)
本文标题:《 LLM应用开发——缓存策略详解 》
本文链接:http://localhost:3015/ai/%E7%BC%93%E5%AD%98%E7%AD%96%E7%95%A5.html
本文最后一次更新为 天前,文章中的某些内容可能已过时!