构建有记忆的智能对话系统

前言

大语言模型本身是无状态的,每次请求都是独立的。为了构建连贯的对话体验,我们需要实现记忆管理机制。本文将深入探讨对话记忆的各种实现方式和最佳实践。


记忆管理概述

为什么需要记忆

LLM 的无状态特性带来的挑战:

问题 影响
上下文丢失 无法引用之前的对话内容
重复交互 用户需要反复提供相同信息
体验割裂 无法维持连贯的对话逻辑
个性化缺失 无法记住用户偏好和习惯

记忆类型

┌─────────────────────────────────────────────────────────────┐
│                     对话记忆类型                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │   短期记忆    │  │   工作记忆    │  │   长期记忆    │      │
│  │  Short-term  │  │   Working    │  │  Long-term   │      │
│  ├──────────────┤  ├──────────────┤  ├──────────────┤      │
│  │ • 当前会话    │  │ • 上下文窗口  │  │ • 用户画像    │      │
│  │ • 临时存储    │  │ • 活跃思考    │  │ • 历史摘要    │      │
│  │ • 快速访问    │  │ • Token限制  │  │ • 持久存储    │      │
│  └──────────────┘  └──────────────┘  └──────────────┘      │
│                                                             │
└─────────────────────────────────────────────────────────────┘

基础实现方案

1. 简单消息历史

最基础的记忆实现——保存完整对话历史:

from openai import OpenAI

class SimpleConversation:
    def __init__(self, system_prompt: str = "你是一个有帮助的助手"):
        self.client = OpenAI()
        self.messages = [
            {"role": "system", "content": system_prompt}
        ]
    
    def chat(self, user_input: str) -> str:
        # 添加用户消息
        self.messages.append({
            "role": "user",
            "content": user_input
        })
        
        # 调用 API
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=self.messages
        )
        
        # 获取助手回复
        assistant_message = response.choices[0].message.content
        
        # 保存助手回复
        self.messages.append({
            "role": "assistant",
            "content": assistant_message
        })
        
        return assistant_message
    
    def clear_history(self):
        """清空对话历史,只保留系统提示"""
        self.messages = [self.messages[0]]

# 使用示例
conv = SimpleConversation()
print(conv.chat("我叫小明"))
print(conv.chat("我叫什么名字?"))  # 能正确回答"小明"

2. 滑动窗口记忆

限制保留的消息数量,避免超出上下文限制:

from collections import deque
from typing import Optional

class SlidingWindowMemory:
    def __init__(
        self, 
        max_messages: int = 20,
        system_prompt: str = "你是一个有帮助的助手"
    ):
        self.client = OpenAI()
        self.system_prompt = {"role": "system", "content": system_prompt}
        self.max_messages = max_messages
        self.history = deque(maxlen=max_messages)
    
    def get_messages(self) -> list:
        """获取当前消息列表"""
        return [self.system_prompt] + list(self.history)
    
    def add_message(self, role: str, content: str):
        """添加消息到历史"""
        self.history.append({"role": role, "content": content})
    
    def chat(self, user_input: str) -> str:
        self.add_message("user", user_input)
        
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=self.get_messages()
        )
        
        assistant_message = response.choices[0].message.content
        self.add_message("assistant", assistant_message)
        
        return assistant_message

3. Token 限制记忆

基于 Token 数量而非消息数量进行限制:

import tiktoken

class TokenLimitedMemory:
    def __init__(
        self, 
        max_tokens: int = 4000,
        model: str = "gpt-4",
        system_prompt: str = "你是一个有帮助的助手"
    ):
        self.client = OpenAI()
        self.model = model
        self.max_tokens = max_tokens
        self.system_prompt = {"role": "system", "content": system_prompt}
        self.history = []
        
        # 初始化分词器
        self.encoding = tiktoken.encoding_for_model(model)
    
    def count_tokens(self, messages: list) -> int:
        """计算消息列表的 Token 数量"""
        total = 0
        for msg in messages:
            # 每条消息有固定开销
            total += 4  # role + content 的标记
            total += len(self.encoding.encode(msg["content"]))
        total += 2  # 消息结束标记
        return total
    
    def trim_history(self):
        """修剪历史以符合 Token 限制"""
        messages = [self.system_prompt] + self.history
        
        while self.count_tokens(messages) > self.max_tokens and len(self.history) > 2:
            # 移除最早的一对对话(用户+助手)
            self.history.pop(0)
            self.history.pop(0)
            messages = [self.system_prompt] + self.history
    
    def chat(self, user_input: str) -> str:
        # 添加用户消息
        self.history.append({"role": "user", "content": user_input})
        
        # 修剪以符合限制
        self.trim_history()
        
        # API 调用
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[self.system_prompt] + self.history
        )
        
        assistant_message = response.choices[0].message.content
        self.history.append({"role": "assistant", "content": assistant_message})
        
        return assistant_message

记忆压缩与摘要

对话摘要策略

当对话变长时,使用摘要压缩历史信息:

class SummaryMemory:
    def __init__(
        self,
        summarize_after: int = 10,  # 10轮对话后开始摘要
        keep_recent: int = 4,        # 保留最近4轮对话
        system_prompt: str = "你是一个有帮助的助手"
    ):
        self.client = OpenAI()
        self.system_prompt = system_prompt
        self.summarize_after = summarize_after
        self.keep_recent = keep_recent
        self.history = []
        self.summary = ""  # 历史摘要
    
    def create_summary(self, messages: list) -> str:
        """生成对话摘要"""
        summary_prompt = f"""请简洁地总结以下对话的要点,包括:
- 讨论的主要话题
- 用户提供的关键信息
- 达成的结论或决定

对话内容:
{self._format_messages(messages)}

摘要:"""

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": summary_prompt}],
            max_tokens=500
        )
        return response.choices[0].message.content
    
    def _format_messages(self, messages: list) -> str:
        """格式化消息为文本"""
        lines = []
        for msg in messages:
            role = "用户" if msg["role"] == "user" else "助手"
            lines.append(f"{role}: {msg['content']}")
        return "\n".join(lines)
    
    def maybe_summarize(self):
        """检查是否需要摘要"""
        # 计算对话轮数(一轮 = 用户消息 + 助手消息)
        turns = len(self.history) // 2
        
        if turns >= self.summarize_after:
            # 需要保留的最近消息数量
            keep_count = self.keep_recent * 2
            
            # 需要摘要的消息
            to_summarize = self.history[:-keep_count]
            
            # 创建摘要
            new_summary = self.create_summary(to_summarize)
            
            # 合并到现有摘要
            if self.summary:
                self.summary = f"{self.summary}\n\n最新进展:{new_summary}"
            else:
                self.summary = new_summary
            
            # 只保留最近的消息
            self.history = self.history[-keep_count:]
    
    def get_messages(self) -> list:
        """构建发送给 API 的消息列表"""
        messages = []
        
        # 系统提示(包含摘要)
        if self.summary:
            system_content = f"""{self.system_prompt}

之前的对话摘要:
{self.summary}"""
        else:
            system_content = self.system_prompt
        
        messages.append({"role": "system", "content": system_content})
        messages.extend(self.history)
        
        return messages
    
    def chat(self, user_input: str) -> str:
        self.history.append({"role": "user", "content": user_input})
        
        # 检查是否需要摘要
        self.maybe_summarize()
        
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=self.get_messages()
        )
        
        assistant_message = response.choices[0].message.content
        self.history.append({"role": "assistant", "content": assistant_message})
        
        return assistant_message

渐进式摘要

保持摘要的增量更新:

class ProgressiveSummaryMemory:
    """渐进式摘要记忆 - 每轮对话后更新摘要"""
    
    def __init__(self, system_prompt: str = "你是一个有帮助的助手"):
        self.client = OpenAI()
        self.system_prompt = system_prompt
        self.current_summary = ""
        self.buffer = []  # 缓冲区存储最近几轮对话
        self.buffer_size = 4  # 保留最近2轮对话(4条消息)
    
    def update_summary(self):
        """更新运行摘要"""
        if len(self.buffer) < self.buffer_size:
            return
        
        # 取出最早的对话加入摘要
        old_messages = self.buffer[:2]
        self.buffer = self.buffer[2:]
        
        prompt = f"""当前摘要:
{self.current_summary if self.current_summary else "(无)"}

新的对话:
{self._format_messages(old_messages)}

请更新摘要,整合新对话的关键信息。保持简洁,只记录重要的事实和上下文。

更新后的摘要:"""

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=300
        )
        
        self.current_summary = response.choices[0].message.content
    
    def _format_messages(self, messages: list) -> str:
        lines = []
        for msg in messages:
            role = "用户" if msg["role"] == "user" else "助手"
            lines.append(f"{role}: {msg['content']}")
        return "\n".join(lines)
    
    def chat(self, user_input: str) -> str:
        self.buffer.append({"role": "user", "content": user_input})
        
        # 构建消息
        messages = [{"role": "system", "content": self.system_prompt}]
        
        if self.current_summary:
            messages.append({
                "role": "system",
                "content": f"对话历史摘要:{self.current_summary}"
            })
        
        messages.extend(self.buffer)
        
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=messages
        )
        
        assistant_message = response.choices[0].message.content
        self.buffer.append({"role": "assistant", "content": assistant_message})
        
        # 更新摘要
        self.update_summary()
        
        return assistant_message

持久化存储

Redis 存储

使用 Redis 存储会话历史,支持分布式场景:

import json
import redis
from datetime import timedelta
from typing import Optional

class RedisConversationMemory:
    def __init__(
        self,
        session_id: str,
        redis_url: str = "redis://localhost:6379",
        ttl_hours: int = 24,
        max_messages: int = 50
    ):
        self.client = OpenAI()
        self.session_id = session_id
        self.redis = redis.from_url(redis_url)
        self.ttl = timedelta(hours=ttl_hours)
        self.max_messages = max_messages
        self.key = f"conversation:{session_id}"
    
    def _get_history(self) -> list:
        """从 Redis 获取历史"""
        data = self.redis.get(self.key)
        if data:
            return json.loads(data)
        return []
    
    def _save_history(self, history: list):
        """保存历史到 Redis"""
        self.redis.setex(
            self.key,
            self.ttl,
            json.dumps(history, ensure_ascii=False)
        )
    
    def add_message(self, role: str, content: str):
        """添加消息"""
        history = self._get_history()
        history.append({"role": role, "content": content})
        
        # 限制消息数量
        if len(history) > self.max_messages:
            history = history[-self.max_messages:]
        
        self._save_history(history)
    
    def get_messages(self, system_prompt: str = "") -> list:
        """获取消息列表"""
        messages = []
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        messages.extend(self._get_history())
        return messages
    
    def clear(self):
        """清空历史"""
        self.redis.delete(self.key)
    
    def chat(self, user_input: str, system_prompt: str = "你是一个有帮助的助手") -> str:
        self.add_message("user", user_input)
        
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=self.get_messages(system_prompt)
        )
        
        assistant_message = response.choices[0].message.content
        self.add_message("assistant", assistant_message)
        
        return assistant_message

# 使用示例
memory = RedisConversationMemory(session_id="user_123")
response = memory.chat("你好!")

数据库存储

使用 SQLAlchemy 进行数据库持久化:

from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import json

Base = declarative_base()

class ConversationMessage(Base):
    __tablename__ = "conversation_messages"
    
    id = Column(Integer, primary_key=True)
    session_id = Column(String(64), index=True)
    role = Column(String(20))
    content = Column(Text)
    metadata = Column(JSON, default={})
    created_at = Column(DateTime, default=datetime.utcnow)

class ConversationSummary(Base):
    __tablename__ = "conversation_summaries"
    
    id = Column(Integer, primary_key=True)
    session_id = Column(String(64), unique=True)
    summary = Column(Text)
    updated_at = Column(DateTime, default=datetime.utcnow)

class DatabaseConversationMemory:
    def __init__(
        self,
        session_id: str,
        database_url: str = "sqlite:///conversations.db",
        max_messages: int = 50
    ):
        self.client = OpenAI()
        self.session_id = session_id
        self.max_messages = max_messages
        
        # 初始化数据库
        self.engine = create_engine(database_url)
        Base.metadata.create_all(self.engine)
        Session = sessionmaker(bind=self.engine)
        self.db = Session()
    
    def add_message(self, role: str, content: str, metadata: dict = None):
        """添加消息到数据库"""
        msg = ConversationMessage(
            session_id=self.session_id,
            role=role,
            content=content,
            metadata=metadata or {}
        )
        self.db.add(msg)
        self.db.commit()
    
    def get_recent_messages(self, limit: int = None) -> list:
        """获取最近的消息"""
        limit = limit or self.max_messages
        messages = (
            self.db.query(ConversationMessage)
            .filter(ConversationMessage.session_id == self.session_id)
            .order_by(ConversationMessage.created_at.desc())
            .limit(limit)
            .all()
        )
        # 反转顺序(从旧到新)
        messages = list(reversed(messages))
        return [{"role": m.role, "content": m.content} for m in messages]
    
    def get_summary(self) -> Optional[str]:
        """获取会话摘要"""
        summary = (
            self.db.query(ConversationSummary)
            .filter(ConversationSummary.session_id == self.session_id)
            .first()
        )
        return summary.summary if summary else None
    
    def update_summary(self, summary_text: str):
        """更新会话摘要"""
        summary = (
            self.db.query(ConversationSummary)
            .filter(ConversationSummary.session_id == self.session_id)
            .first()
        )
        
        if summary:
            summary.summary = summary_text
            summary.updated_at = datetime.utcnow()
        else:
            summary = ConversationSummary(
                session_id=self.session_id,
                summary=summary_text
            )
            self.db.add(summary)
        
        self.db.commit()
    
    def chat(self, user_input: str, system_prompt: str = "你是一个有帮助的助手") -> str:
        self.add_message("user", user_input)
        
        # 构建消息
        messages = [{"role": "system", "content": system_prompt}]
        
        # 添加摘要
        summary = self.get_summary()
        if summary:
            messages.append({
                "role": "system",
                "content": f"之前的对话摘要:{summary}"
            })
        
        # 添加最近消息
        messages.extend(self.get_recent_messages())
        
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=messages
        )
        
        assistant_message = response.choices[0].message.content
        self.add_message("assistant", assistant_message)
        
        return assistant_message

LangChain Memory 组件

ConversationBufferMemory

最简单的缓冲记忆:

from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain

# 创建记忆组件
memory = ConversationBufferMemory()

# 创建对话链
llm = ChatOpenAI(model="gpt-4")
conversation = ConversationChain(
    llm=llm,
    memory=memory,
    verbose=True
)

# 对话
response1 = conversation.predict(input="我的名字是小明")
response2 = conversation.predict(input="我叫什么名字?")

# 查看记忆
print(memory.buffer)

ConversationBufferWindowMemory

滑动窗口记忆:

from langchain.memory import ConversationBufferWindowMemory

# 只保留最近 5 轮对话
memory = ConversationBufferWindowMemory(k=5)

conversation = ConversationChain(
    llm=ChatOpenAI(model="gpt-4"),
    memory=memory
)

ConversationSummaryMemory

摘要记忆:

from langchain.memory import ConversationSummaryMemory

# 使用 LLM 生成摘要
memory = ConversationSummaryMemory(llm=ChatOpenAI(model="gpt-4"))

conversation = ConversationChain(
    llm=ChatOpenAI(model="gpt-4"),
    memory=memory
)

# 对话后查看摘要
print(memory.buffer)  # 显示当前摘要

ConversationSummaryBufferMemory

摘要 + 缓冲的混合模式:

from langchain.memory import ConversationSummaryBufferMemory

# 超过 2000 token 时开始摘要,保留最近的消息
memory = ConversationSummaryBufferMemory(
    llm=ChatOpenAI(model="gpt-4"),
    max_token_limit=2000
)

conversation = ConversationChain(
    llm=ChatOpenAI(model="gpt-4"),
    memory=memory
)

ConversationEntityMemory

实体记忆——记住关于实体的信息:

from langchain.memory import ConversationEntityMemory

memory = ConversationEntityMemory(llm=ChatOpenAI(model="gpt-4"))

conversation = ConversationChain(
    llm=ChatOpenAI(model="gpt-4"),
    memory=memory,
    verbose=True
)

# 对话
conversation.predict(input="小明是一名软件工程师,他喜欢Python编程")
conversation.predict(input="小红是小明的同事,她擅长前端开发")

# 查看实体存储
print(memory.entity_store.store)
# 输出: {'小明': '软件工程师,喜欢Python编程', '小红': '小明的同事,擅长前端开发'}

向量存储记忆

基于相似度的记忆检索

from langchain.memory import VectorStoreRetrieverMemory
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS

# 创建向量存储
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts(
    [""],  # 初始化
    embedding=embeddings,
    metadatas=[{"empty": True}]
)

# 创建检索器
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})

# 创建向量记忆
memory = VectorStoreRetrieverMemory(retriever=retriever)

# 保存对话
memory.save_context(
    {"input": "我最喜欢的食物是寿司"},
    {"output": "寿司确实是很棒的选择!"}
)
memory.save_context(
    {"input": "我周末喜欢去爬山"},
    {"output": "户外运动很健康!"}
)
memory.save_context(
    {"input": "我在北京工作"},
    {"output": "北京是个充满机会的城市"}
)

# 检索相关记忆
relevant = memory.load_memory_variables({"prompt": "我的饮食习惯"})
print(relevant)  # 会返回关于寿司的对话

自定义向量记忆

from typing import List, Dict
import numpy as np
from openai import OpenAI

class VectorMemory:
    def __init__(self, top_k: int = 5):
        self.client = OpenAI()
        self.memories: List[Dict] = []
        self.embeddings: List[List[float]] = []
        self.top_k = top_k
    
    def _get_embedding(self, text: str) -> List[float]:
        """获取文本嵌入"""
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding
    
    def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
        """计算余弦相似度"""
        a = np.array(a)
        b = np.array(b)
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    
    def add(self, content: str, metadata: Dict = None):
        """添加记忆"""
        embedding = self._get_embedding(content)
        self.memories.append({
            "content": content,
            "metadata": metadata or {}
        })
        self.embeddings.append(embedding)
    
    def search(self, query: str) -> List[Dict]:
        """搜索相关记忆"""
        if not self.memories:
            return []
        
        query_embedding = self._get_embedding(query)
        
        # 计算相似度
        similarities = [
            (i, self._cosine_similarity(query_embedding, emb))
            for i, emb in enumerate(self.embeddings)
        ]
        
        # 排序并返回 top-k
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        return [
            {**self.memories[i], "similarity": sim}
            for i, sim in similarities[:self.top_k]
        ]
    
    def get_relevant_context(self, query: str) -> str:
        """获取相关上下文字符串"""
        results = self.search(query)
        return "\n".join([r["content"] for r in results])

# 使用示例
memory = VectorMemory(top_k=3)
memory.add("用户喜欢吃意大利面")
memory.add("用户的生日是3月15日")
memory.add("用户在上海工作")
memory.add("用户养了一只叫小花的猫")

relevant = memory.search("用户的宠物")
print(relevant)

多会话管理

会话管理器

from typing import Dict, Optional
import uuid
from datetime import datetime, timedelta

class SessionManager:
    def __init__(
        self,
        session_timeout: int = 3600,  # 会话超时时间(秒)
        max_sessions: int = 1000      # 最大会话数
    ):
        self.sessions: Dict[str, Dict] = {}
        self.session_timeout = timedelta(seconds=session_timeout)
        self.max_sessions = max_sessions
    
    def create_session(self, user_id: str = None) -> str:
        """创建新会话"""
        self._cleanup_expired()
        
        session_id = str(uuid.uuid4())
        self.sessions[session_id] = {
            "user_id": user_id,
            "messages": [],
            "metadata": {},
            "created_at": datetime.now(),
            "last_access": datetime.now()
        }
        
        return session_id
    
    def get_session(self, session_id: str) -> Optional[Dict]:
        """获取会话"""
        if session_id not in self.sessions:
            return None
        
        session = self.sessions[session_id]
        
        # 检查是否过期
        if datetime.now() - session["last_access"] > self.session_timeout:
            del self.sessions[session_id]
            return None
        
        # 更新最后访问时间
        session["last_access"] = datetime.now()
        return session
    
    def add_message(self, session_id: str, role: str, content: str) -> bool:
        """添加消息到会话"""
        session = self.get_session(session_id)
        if not session:
            return False
        
        session["messages"].append({
            "role": role,
            "content": content,
            "timestamp": datetime.now().isoformat()
        })
        return True
    
    def get_messages(self, session_id: str) -> list:
        """获取会话消息"""
        session = self.get_session(session_id)
        if not session:
            return []
        return session["messages"]
    
    def delete_session(self, session_id: str):
        """删除会话"""
        if session_id in self.sessions:
            del self.sessions[session_id]
    
    def _cleanup_expired(self):
        """清理过期会话"""
        now = datetime.now()
        expired = [
            sid for sid, session in self.sessions.items()
            if now - session["last_access"] > self.session_timeout
        ]
        for sid in expired:
            del self.sessions[sid]
        
        # 如果会话数超过限制,删除最旧的
        if len(self.sessions) > self.max_sessions:
            sorted_sessions = sorted(
                self.sessions.items(),
                key=lambda x: x[1]["last_access"]
            )
            for sid, _ in sorted_sessions[:len(self.sessions) - self.max_sessions]:
                del self.sessions[sid]

FastAPI 集成示例

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# 全局会话管理器
session_manager = SessionManager()

class ChatRequest(BaseModel):
    message: str
    session_id: Optional[str] = None

class ChatResponse(BaseModel):
    response: str
    session_id: str

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    client = OpenAI()
    
    # 获取或创建会话
    if request.session_id:
        session = session_manager.get_session(request.session_id)
        if not session:
            raise HTTPException(status_code=404, detail="Session not found")
        session_id = request.session_id
    else:
        session_id = session_manager.create_session()
    
    # 添加用户消息
    session_manager.add_message(session_id, "user", request.message)
    
    # 获取历史消息
    messages = [
        {"role": "system", "content": "你是一个有帮助的助手"}
    ]
    messages.extend([
        {"role": m["role"], "content": m["content"]}
        for m in session_manager.get_messages(session_id)
    ])
    
    # 调用 LLM
    response = client.chat.completions.create(
        model="gpt-4",
        messages=messages
    )
    
    assistant_message = response.choices[0].message.content
    
    # 保存助手回复
    session_manager.add_message(session_id, "assistant", assistant_message)
    
    return ChatResponse(
        response=assistant_message,
        session_id=session_id
    )

@app.delete("/session/{session_id}")
async def delete_session(session_id: str):
    session_manager.delete_session(session_id)
    return {"status": "deleted"}

最佳实践

记忆策略选择

┌─────────────────────────────────────────────────────────────────┐
│                    记忆策略选择指南                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  场景                    推荐策略              原因              │
│  ─────────────────────────────────────────────────────────────  │
│  简单问答              Buffer Memory        实现简单,足够使用   │
│  长对话                Summary Memory       节省 Token          │
│  客服系统              Entity Memory        记住用户信息        │
│  知识问答              Vector Memory        语义相关检索        │
│  高并发               Redis + Summary       分布式 + 压缩       │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

设计原则

原则 说明
按需存储 只存储必要信息,避免冗余
及时清理 设置过期时间,自动清理旧数据
分层存储 热数据用内存,冷数据用数据库
安全考虑 敏感信息加密存储
可恢复性 关键对话需要持久化

性能优化建议

# 1. 批量操作
def batch_add_messages(messages: list, redis_client, session_id: str):
    """批量添加消息到 Redis"""
    pipe = redis_client.pipeline()
    for msg in messages:
        pipe.rpush(f"conversation:{session_id}", json.dumps(msg))
    pipe.execute()

# 2. 异步摘要
import asyncio

async def async_summarize(messages: list, client: OpenAI) -> str:
    """异步生成摘要"""
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(
        None,
        lambda: client.chat.completions.create(
            model="gpt-4",
            messages=[{
                "role": "user",
                "content": f"请总结以下对话:{messages}"
            }]
        ).choices[0].message.content
    )

# 3. 缓存嵌入向量
from functools import lru_cache

@lru_cache(maxsize=1000)
def cached_embedding(text: str) -> tuple:
    """缓存嵌入向量"""
    client = OpenAI()
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    return tuple(response.data[0].embedding)

总结

对话记忆管理是构建智能对话系统的核心能力:

方案 优点 缺点 适用场景
简单缓冲 实现简单 Token 消耗大 短对话
滑动窗口 控制成本 丢失早期信息 一般对话
摘要记忆 保留要点 额外 API 调用 长对话
向量记忆 语义检索 实现复杂 知识问答
实体记忆 结构化存储 提取准确度 用户画像

选择合适的记忆策略,需要综合考虑:

  • 对话场景和长度
  • Token 成本预算
  • 信息保留需求
  • 系统复杂度容忍度

参考资源

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:《 LLM应用开发——对话记忆管理 》

本文链接:http://localhost:3015/ai/%E5%AF%B9%E8%AF%9D%E8%AE%B0%E5%BF%86%E7%AE%A1%E7%90%86.html

本文最后一次更新为 天前,文章中的某些内容可能已过时!