构建有记忆的智能对话系统
前言
大语言模型本身是无状态的,每次请求都是独立的。为了构建连贯的对话体验,我们需要实现记忆管理机制。本文将深入探讨对话记忆的各种实现方式和最佳实践。
记忆管理概述
为什么需要记忆
LLM 的无状态特性带来的挑战:
| 问题 | 影响 |
|---|---|
| 上下文丢失 | 无法引用之前的对话内容 |
| 重复交互 | 用户需要反复提供相同信息 |
| 体验割裂 | 无法维持连贯的对话逻辑 |
| 个性化缺失 | 无法记住用户偏好和习惯 |
记忆类型
┌─────────────────────────────────────────────────────────────┐
│ 对话记忆类型 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 短期记忆 │ │ 工作记忆 │ │ 长期记忆 │ │
│ │ Short-term │ │ Working │ │ Long-term │ │
│ ├──────────────┤ ├──────────────┤ ├──────────────┤ │
│ │ • 当前会话 │ │ • 上下文窗口 │ │ • 用户画像 │ │
│ │ • 临时存储 │ │ • 活跃思考 │ │ • 历史摘要 │ │
│ │ • 快速访问 │ │ • Token限制 │ │ • 持久存储 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
基础实现方案
1. 简单消息历史
最基础的记忆实现——保存完整对话历史:
from openai import OpenAI
class SimpleConversation:
def __init__(self, system_prompt: str = "你是一个有帮助的助手"):
self.client = OpenAI()
self.messages = [
{"role": "system", "content": system_prompt}
]
def chat(self, user_input: str) -> str:
# 添加用户消息
self.messages.append({
"role": "user",
"content": user_input
})
# 调用 API
response = self.client.chat.completions.create(
model="gpt-4",
messages=self.messages
)
# 获取助手回复
assistant_message = response.choices[0].message.content
# 保存助手回复
self.messages.append({
"role": "assistant",
"content": assistant_message
})
return assistant_message
def clear_history(self):
"""清空对话历史,只保留系统提示"""
self.messages = [self.messages[0]]
# 使用示例
conv = SimpleConversation()
print(conv.chat("我叫小明"))
print(conv.chat("我叫什么名字?")) # 能正确回答"小明"
2. 滑动窗口记忆
限制保留的消息数量,避免超出上下文限制:
from collections import deque
from typing import Optional
class SlidingWindowMemory:
def __init__(
self,
max_messages: int = 20,
system_prompt: str = "你是一个有帮助的助手"
):
self.client = OpenAI()
self.system_prompt = {"role": "system", "content": system_prompt}
self.max_messages = max_messages
self.history = deque(maxlen=max_messages)
def get_messages(self) -> list:
"""获取当前消息列表"""
return [self.system_prompt] + list(self.history)
def add_message(self, role: str, content: str):
"""添加消息到历史"""
self.history.append({"role": role, "content": content})
def chat(self, user_input: str) -> str:
self.add_message("user", user_input)
response = self.client.chat.completions.create(
model="gpt-4",
messages=self.get_messages()
)
assistant_message = response.choices[0].message.content
self.add_message("assistant", assistant_message)
return assistant_message
3. Token 限制记忆
基于 Token 数量而非消息数量进行限制:
import tiktoken
class TokenLimitedMemory:
def __init__(
self,
max_tokens: int = 4000,
model: str = "gpt-4",
system_prompt: str = "你是一个有帮助的助手"
):
self.client = OpenAI()
self.model = model
self.max_tokens = max_tokens
self.system_prompt = {"role": "system", "content": system_prompt}
self.history = []
# 初始化分词器
self.encoding = tiktoken.encoding_for_model(model)
def count_tokens(self, messages: list) -> int:
"""计算消息列表的 Token 数量"""
total = 0
for msg in messages:
# 每条消息有固定开销
total += 4 # role + content 的标记
total += len(self.encoding.encode(msg["content"]))
total += 2 # 消息结束标记
return total
def trim_history(self):
"""修剪历史以符合 Token 限制"""
messages = [self.system_prompt] + self.history
while self.count_tokens(messages) > self.max_tokens and len(self.history) > 2:
# 移除最早的一对对话(用户+助手)
self.history.pop(0)
self.history.pop(0)
messages = [self.system_prompt] + self.history
def chat(self, user_input: str) -> str:
# 添加用户消息
self.history.append({"role": "user", "content": user_input})
# 修剪以符合限制
self.trim_history()
# API 调用
response = self.client.chat.completions.create(
model=self.model,
messages=[self.system_prompt] + self.history
)
assistant_message = response.choices[0].message.content
self.history.append({"role": "assistant", "content": assistant_message})
return assistant_message
记忆压缩与摘要
对话摘要策略
当对话变长时,使用摘要压缩历史信息:
class SummaryMemory:
def __init__(
self,
summarize_after: int = 10, # 10轮对话后开始摘要
keep_recent: int = 4, # 保留最近4轮对话
system_prompt: str = "你是一个有帮助的助手"
):
self.client = OpenAI()
self.system_prompt = system_prompt
self.summarize_after = summarize_after
self.keep_recent = keep_recent
self.history = []
self.summary = "" # 历史摘要
def create_summary(self, messages: list) -> str:
"""生成对话摘要"""
summary_prompt = f"""请简洁地总结以下对话的要点,包括:
- 讨论的主要话题
- 用户提供的关键信息
- 达成的结论或决定
对话内容:
{self._format_messages(messages)}
摘要:"""
response = self.client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": summary_prompt}],
max_tokens=500
)
return response.choices[0].message.content
def _format_messages(self, messages: list) -> str:
"""格式化消息为文本"""
lines = []
for msg in messages:
role = "用户" if msg["role"] == "user" else "助手"
lines.append(f"{role}: {msg['content']}")
return "\n".join(lines)
def maybe_summarize(self):
"""检查是否需要摘要"""
# 计算对话轮数(一轮 = 用户消息 + 助手消息)
turns = len(self.history) // 2
if turns >= self.summarize_after:
# 需要保留的最近消息数量
keep_count = self.keep_recent * 2
# 需要摘要的消息
to_summarize = self.history[:-keep_count]
# 创建摘要
new_summary = self.create_summary(to_summarize)
# 合并到现有摘要
if self.summary:
self.summary = f"{self.summary}\n\n最新进展:{new_summary}"
else:
self.summary = new_summary
# 只保留最近的消息
self.history = self.history[-keep_count:]
def get_messages(self) -> list:
"""构建发送给 API 的消息列表"""
messages = []
# 系统提示(包含摘要)
if self.summary:
system_content = f"""{self.system_prompt}
之前的对话摘要:
{self.summary}"""
else:
system_content = self.system_prompt
messages.append({"role": "system", "content": system_content})
messages.extend(self.history)
return messages
def chat(self, user_input: str) -> str:
self.history.append({"role": "user", "content": user_input})
# 检查是否需要摘要
self.maybe_summarize()
response = self.client.chat.completions.create(
model="gpt-4",
messages=self.get_messages()
)
assistant_message = response.choices[0].message.content
self.history.append({"role": "assistant", "content": assistant_message})
return assistant_message
渐进式摘要
保持摘要的增量更新:
class ProgressiveSummaryMemory:
"""渐进式摘要记忆 - 每轮对话后更新摘要"""
def __init__(self, system_prompt: str = "你是一个有帮助的助手"):
self.client = OpenAI()
self.system_prompt = system_prompt
self.current_summary = ""
self.buffer = [] # 缓冲区存储最近几轮对话
self.buffer_size = 4 # 保留最近2轮对话(4条消息)
def update_summary(self):
"""更新运行摘要"""
if len(self.buffer) < self.buffer_size:
return
# 取出最早的对话加入摘要
old_messages = self.buffer[:2]
self.buffer = self.buffer[2:]
prompt = f"""当前摘要:
{self.current_summary if self.current_summary else "(无)"}
新的对话:
{self._format_messages(old_messages)}
请更新摘要,整合新对话的关键信息。保持简洁,只记录重要的事实和上下文。
更新后的摘要:"""
response = self.client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
max_tokens=300
)
self.current_summary = response.choices[0].message.content
def _format_messages(self, messages: list) -> str:
lines = []
for msg in messages:
role = "用户" if msg["role"] == "user" else "助手"
lines.append(f"{role}: {msg['content']}")
return "\n".join(lines)
def chat(self, user_input: str) -> str:
self.buffer.append({"role": "user", "content": user_input})
# 构建消息
messages = [{"role": "system", "content": self.system_prompt}]
if self.current_summary:
messages.append({
"role": "system",
"content": f"对话历史摘要:{self.current_summary}"
})
messages.extend(self.buffer)
response = self.client.chat.completions.create(
model="gpt-4",
messages=messages
)
assistant_message = response.choices[0].message.content
self.buffer.append({"role": "assistant", "content": assistant_message})
# 更新摘要
self.update_summary()
return assistant_message
持久化存储
Redis 存储
使用 Redis 存储会话历史,支持分布式场景:
import json
import redis
from datetime import timedelta
from typing import Optional
class RedisConversationMemory:
def __init__(
self,
session_id: str,
redis_url: str = "redis://localhost:6379",
ttl_hours: int = 24,
max_messages: int = 50
):
self.client = OpenAI()
self.session_id = session_id
self.redis = redis.from_url(redis_url)
self.ttl = timedelta(hours=ttl_hours)
self.max_messages = max_messages
self.key = f"conversation:{session_id}"
def _get_history(self) -> list:
"""从 Redis 获取历史"""
data = self.redis.get(self.key)
if data:
return json.loads(data)
return []
def _save_history(self, history: list):
"""保存历史到 Redis"""
self.redis.setex(
self.key,
self.ttl,
json.dumps(history, ensure_ascii=False)
)
def add_message(self, role: str, content: str):
"""添加消息"""
history = self._get_history()
history.append({"role": role, "content": content})
# 限制消息数量
if len(history) > self.max_messages:
history = history[-self.max_messages:]
self._save_history(history)
def get_messages(self, system_prompt: str = "") -> list:
"""获取消息列表"""
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.extend(self._get_history())
return messages
def clear(self):
"""清空历史"""
self.redis.delete(self.key)
def chat(self, user_input: str, system_prompt: str = "你是一个有帮助的助手") -> str:
self.add_message("user", user_input)
response = self.client.chat.completions.create(
model="gpt-4",
messages=self.get_messages(system_prompt)
)
assistant_message = response.choices[0].message.content
self.add_message("assistant", assistant_message)
return assistant_message
# 使用示例
memory = RedisConversationMemory(session_id="user_123")
response = memory.chat("你好!")
数据库存储
使用 SQLAlchemy 进行数据库持久化:
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import json
Base = declarative_base()
class ConversationMessage(Base):
__tablename__ = "conversation_messages"
id = Column(Integer, primary_key=True)
session_id = Column(String(64), index=True)
role = Column(String(20))
content = Column(Text)
metadata = Column(JSON, default={})
created_at = Column(DateTime, default=datetime.utcnow)
class ConversationSummary(Base):
__tablename__ = "conversation_summaries"
id = Column(Integer, primary_key=True)
session_id = Column(String(64), unique=True)
summary = Column(Text)
updated_at = Column(DateTime, default=datetime.utcnow)
class DatabaseConversationMemory:
def __init__(
self,
session_id: str,
database_url: str = "sqlite:///conversations.db",
max_messages: int = 50
):
self.client = OpenAI()
self.session_id = session_id
self.max_messages = max_messages
# 初始化数据库
self.engine = create_engine(database_url)
Base.metadata.create_all(self.engine)
Session = sessionmaker(bind=self.engine)
self.db = Session()
def add_message(self, role: str, content: str, metadata: dict = None):
"""添加消息到数据库"""
msg = ConversationMessage(
session_id=self.session_id,
role=role,
content=content,
metadata=metadata or {}
)
self.db.add(msg)
self.db.commit()
def get_recent_messages(self, limit: int = None) -> list:
"""获取最近的消息"""
limit = limit or self.max_messages
messages = (
self.db.query(ConversationMessage)
.filter(ConversationMessage.session_id == self.session_id)
.order_by(ConversationMessage.created_at.desc())
.limit(limit)
.all()
)
# 反转顺序(从旧到新)
messages = list(reversed(messages))
return [{"role": m.role, "content": m.content} for m in messages]
def get_summary(self) -> Optional[str]:
"""获取会话摘要"""
summary = (
self.db.query(ConversationSummary)
.filter(ConversationSummary.session_id == self.session_id)
.first()
)
return summary.summary if summary else None
def update_summary(self, summary_text: str):
"""更新会话摘要"""
summary = (
self.db.query(ConversationSummary)
.filter(ConversationSummary.session_id == self.session_id)
.first()
)
if summary:
summary.summary = summary_text
summary.updated_at = datetime.utcnow()
else:
summary = ConversationSummary(
session_id=self.session_id,
summary=summary_text
)
self.db.add(summary)
self.db.commit()
def chat(self, user_input: str, system_prompt: str = "你是一个有帮助的助手") -> str:
self.add_message("user", user_input)
# 构建消息
messages = [{"role": "system", "content": system_prompt}]
# 添加摘要
summary = self.get_summary()
if summary:
messages.append({
"role": "system",
"content": f"之前的对话摘要:{summary}"
})
# 添加最近消息
messages.extend(self.get_recent_messages())
response = self.client.chat.completions.create(
model="gpt-4",
messages=messages
)
assistant_message = response.choices[0].message.content
self.add_message("assistant", assistant_message)
return assistant_message
LangChain Memory 组件
ConversationBufferMemory
最简单的缓冲记忆:
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
# 创建记忆组件
memory = ConversationBufferMemory()
# 创建对话链
llm = ChatOpenAI(model="gpt-4")
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True
)
# 对话
response1 = conversation.predict(input="我的名字是小明")
response2 = conversation.predict(input="我叫什么名字?")
# 查看记忆
print(memory.buffer)
ConversationBufferWindowMemory
滑动窗口记忆:
from langchain.memory import ConversationBufferWindowMemory
# 只保留最近 5 轮对话
memory = ConversationBufferWindowMemory(k=5)
conversation = ConversationChain(
llm=ChatOpenAI(model="gpt-4"),
memory=memory
)
ConversationSummaryMemory
摘要记忆:
from langchain.memory import ConversationSummaryMemory
# 使用 LLM 生成摘要
memory = ConversationSummaryMemory(llm=ChatOpenAI(model="gpt-4"))
conversation = ConversationChain(
llm=ChatOpenAI(model="gpt-4"),
memory=memory
)
# 对话后查看摘要
print(memory.buffer) # 显示当前摘要
ConversationSummaryBufferMemory
摘要 + 缓冲的混合模式:
from langchain.memory import ConversationSummaryBufferMemory
# 超过 2000 token 时开始摘要,保留最近的消息
memory = ConversationSummaryBufferMemory(
llm=ChatOpenAI(model="gpt-4"),
max_token_limit=2000
)
conversation = ConversationChain(
llm=ChatOpenAI(model="gpt-4"),
memory=memory
)
ConversationEntityMemory
实体记忆——记住关于实体的信息:
from langchain.memory import ConversationEntityMemory
memory = ConversationEntityMemory(llm=ChatOpenAI(model="gpt-4"))
conversation = ConversationChain(
llm=ChatOpenAI(model="gpt-4"),
memory=memory,
verbose=True
)
# 对话
conversation.predict(input="小明是一名软件工程师,他喜欢Python编程")
conversation.predict(input="小红是小明的同事,她擅长前端开发")
# 查看实体存储
print(memory.entity_store.store)
# 输出: {'小明': '软件工程师,喜欢Python编程', '小红': '小明的同事,擅长前端开发'}
向量存储记忆
基于相似度的记忆检索
from langchain.memory import VectorStoreRetrieverMemory
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
# 创建向量存储
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts(
[""], # 初始化
embedding=embeddings,
metadatas=[{"empty": True}]
)
# 创建检索器
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
# 创建向量记忆
memory = VectorStoreRetrieverMemory(retriever=retriever)
# 保存对话
memory.save_context(
{"input": "我最喜欢的食物是寿司"},
{"output": "寿司确实是很棒的选择!"}
)
memory.save_context(
{"input": "我周末喜欢去爬山"},
{"output": "户外运动很健康!"}
)
memory.save_context(
{"input": "我在北京工作"},
{"output": "北京是个充满机会的城市"}
)
# 检索相关记忆
relevant = memory.load_memory_variables({"prompt": "我的饮食习惯"})
print(relevant) # 会返回关于寿司的对话
自定义向量记忆
from typing import List, Dict
import numpy as np
from openai import OpenAI
class VectorMemory:
def __init__(self, top_k: int = 5):
self.client = OpenAI()
self.memories: List[Dict] = []
self.embeddings: List[List[float]] = []
self.top_k = top_k
def _get_embedding(self, text: str) -> List[float]:
"""获取文本嵌入"""
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
"""计算余弦相似度"""
a = np.array(a)
b = np.array(b)
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def add(self, content: str, metadata: Dict = None):
"""添加记忆"""
embedding = self._get_embedding(content)
self.memories.append({
"content": content,
"metadata": metadata or {}
})
self.embeddings.append(embedding)
def search(self, query: str) -> List[Dict]:
"""搜索相关记忆"""
if not self.memories:
return []
query_embedding = self._get_embedding(query)
# 计算相似度
similarities = [
(i, self._cosine_similarity(query_embedding, emb))
for i, emb in enumerate(self.embeddings)
]
# 排序并返回 top-k
similarities.sort(key=lambda x: x[1], reverse=True)
return [
{**self.memories[i], "similarity": sim}
for i, sim in similarities[:self.top_k]
]
def get_relevant_context(self, query: str) -> str:
"""获取相关上下文字符串"""
results = self.search(query)
return "\n".join([r["content"] for r in results])
# 使用示例
memory = VectorMemory(top_k=3)
memory.add("用户喜欢吃意大利面")
memory.add("用户的生日是3月15日")
memory.add("用户在上海工作")
memory.add("用户养了一只叫小花的猫")
relevant = memory.search("用户的宠物")
print(relevant)
多会话管理
会话管理器
from typing import Dict, Optional
import uuid
from datetime import datetime, timedelta
class SessionManager:
def __init__(
self,
session_timeout: int = 3600, # 会话超时时间(秒)
max_sessions: int = 1000 # 最大会话数
):
self.sessions: Dict[str, Dict] = {}
self.session_timeout = timedelta(seconds=session_timeout)
self.max_sessions = max_sessions
def create_session(self, user_id: str = None) -> str:
"""创建新会话"""
self._cleanup_expired()
session_id = str(uuid.uuid4())
self.sessions[session_id] = {
"user_id": user_id,
"messages": [],
"metadata": {},
"created_at": datetime.now(),
"last_access": datetime.now()
}
return session_id
def get_session(self, session_id: str) -> Optional[Dict]:
"""获取会话"""
if session_id not in self.sessions:
return None
session = self.sessions[session_id]
# 检查是否过期
if datetime.now() - session["last_access"] > self.session_timeout:
del self.sessions[session_id]
return None
# 更新最后访问时间
session["last_access"] = datetime.now()
return session
def add_message(self, session_id: str, role: str, content: str) -> bool:
"""添加消息到会话"""
session = self.get_session(session_id)
if not session:
return False
session["messages"].append({
"role": role,
"content": content,
"timestamp": datetime.now().isoformat()
})
return True
def get_messages(self, session_id: str) -> list:
"""获取会话消息"""
session = self.get_session(session_id)
if not session:
return []
return session["messages"]
def delete_session(self, session_id: str):
"""删除会话"""
if session_id in self.sessions:
del self.sessions[session_id]
def _cleanup_expired(self):
"""清理过期会话"""
now = datetime.now()
expired = [
sid for sid, session in self.sessions.items()
if now - session["last_access"] > self.session_timeout
]
for sid in expired:
del self.sessions[sid]
# 如果会话数超过限制,删除最旧的
if len(self.sessions) > self.max_sessions:
sorted_sessions = sorted(
self.sessions.items(),
key=lambda x: x[1]["last_access"]
)
for sid, _ in sorted_sessions[:len(self.sessions) - self.max_sessions]:
del self.sessions[sid]
FastAPI 集成示例
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# 全局会话管理器
session_manager = SessionManager()
class ChatRequest(BaseModel):
message: str
session_id: Optional[str] = None
class ChatResponse(BaseModel):
response: str
session_id: str
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
client = OpenAI()
# 获取或创建会话
if request.session_id:
session = session_manager.get_session(request.session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
session_id = request.session_id
else:
session_id = session_manager.create_session()
# 添加用户消息
session_manager.add_message(session_id, "user", request.message)
# 获取历史消息
messages = [
{"role": "system", "content": "你是一个有帮助的助手"}
]
messages.extend([
{"role": m["role"], "content": m["content"]}
for m in session_manager.get_messages(session_id)
])
# 调用 LLM
response = client.chat.completions.create(
model="gpt-4",
messages=messages
)
assistant_message = response.choices[0].message.content
# 保存助手回复
session_manager.add_message(session_id, "assistant", assistant_message)
return ChatResponse(
response=assistant_message,
session_id=session_id
)
@app.delete("/session/{session_id}")
async def delete_session(session_id: str):
session_manager.delete_session(session_id)
return {"status": "deleted"}
最佳实践
记忆策略选择
┌─────────────────────────────────────────────────────────────────┐
│ 记忆策略选择指南 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 场景 推荐策略 原因 │
│ ───────────────────────────────────────────────────────────── │
│ 简单问答 Buffer Memory 实现简单,足够使用 │
│ 长对话 Summary Memory 节省 Token │
│ 客服系统 Entity Memory 记住用户信息 │
│ 知识问答 Vector Memory 语义相关检索 │
│ 高并发 Redis + Summary 分布式 + 压缩 │
│ │
└─────────────────────────────────────────────────────────────────┘
设计原则
| 原则 | 说明 |
|---|---|
| 按需存储 | 只存储必要信息,避免冗余 |
| 及时清理 | 设置过期时间,自动清理旧数据 |
| 分层存储 | 热数据用内存,冷数据用数据库 |
| 安全考虑 | 敏感信息加密存储 |
| 可恢复性 | 关键对话需要持久化 |
性能优化建议
# 1. 批量操作
def batch_add_messages(messages: list, redis_client, session_id: str):
"""批量添加消息到 Redis"""
pipe = redis_client.pipeline()
for msg in messages:
pipe.rpush(f"conversation:{session_id}", json.dumps(msg))
pipe.execute()
# 2. 异步摘要
import asyncio
async def async_summarize(messages: list, client: OpenAI) -> str:
"""异步生成摘要"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: client.chat.completions.create(
model="gpt-4",
messages=[{
"role": "user",
"content": f"请总结以下对话:{messages}"
}]
).choices[0].message.content
)
# 3. 缓存嵌入向量
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_embedding(text: str) -> tuple:
"""缓存嵌入向量"""
client = OpenAI()
response = client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return tuple(response.data[0].embedding)
总结
对话记忆管理是构建智能对话系统的核心能力:
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 简单缓冲 | 实现简单 | Token 消耗大 | 短对话 |
| 滑动窗口 | 控制成本 | 丢失早期信息 | 一般对话 |
| 摘要记忆 | 保留要点 | 额外 API 调用 | 长对话 |
| 向量记忆 | 语义检索 | 实现复杂 | 知识问答 |
| 实体记忆 | 结构化存储 | 提取准确度 | 用户画像 |
选择合适的记忆策略,需要综合考虑:
- 对话场景和长度
- Token 成本预算
- 信息保留需求
- 系统复杂度容忍度
参考资源
版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。
(采用 CC BY-NC-SA 4.0 许可协议进行授权)
本文标题:《 LLM应用开发——对话记忆管理 》
本文链接:http://localhost:3015/ai/%E5%AF%B9%E8%AF%9D%E8%AE%B0%E5%BF%86%E7%AE%A1%E7%90%86.html
本文最后一次更新为 天前,文章中的某些内容可能已过时!
目录
- 前言
- 记忆管理概述
- 为什么需要记忆
- 记忆类型
- 基础实现方案
- 1. 简单消息历史
- 2. 滑动窗口记忆
- 3. Token 限制记忆
- 记忆压缩与摘要
- 对话摘要策略
- 渐进式摘要
- 持久化存储
- Redis 存储
- 数据库存储
- LangChain Memory 组件
- ConversationBufferMemory
- ConversationBufferWindowMemory
- ConversationSummaryMemory
- ConversationSummaryBufferMemory
- ConversationEntityMemory
- 向量存储记忆
- 基于相似度的记忆检索
- 自定义向量记忆
- 多会话管理
- 会话管理器
- FastAPI 集成示例
- 最佳实践
- 记忆策略选择
- 设计原则
- 性能优化建议
- 总结
- 参考资源