已经是最新一篇文章了!
已经是最后一篇文章了!
从原理到生产级实现
前言
RAG(Retrieval-Augmented Generation,检索增强生成)是将检索系统与大语言模型结合的技术,能有效解决LLM的幻觉问题和知识更新问题。本文从原理到实践全面介绍RAG。
RAG概述
为什么需要RAG
| LLM痛点 | RAG解决方案 |
|---|---|
| 知识截止日期 | 检索最新文档 |
| 幻觉问题 | 基于真实文档回答 |
| 领域知识不足 | 注入私有知识库 |
| 无法引用来源 | 返回参考文档 |
RAG工作流程
用户问题
↓
Query处理(改写/扩展)
↓
向量检索 → 获取Top-K相关文档
↓
Prompt构建(问题 + 上下文)
↓
LLM生成回答
↓
后处理(引用、格式化)
数据处理:分块 (Chunking) 策略
分块是 RAG 中最关键的一步。块太小会丢失上下文,块太大会引入噪声且超出 LLM 窗口。
1. 递归字符分块 (Recursive Character Splitting)
最常用的方法。它尝试按优先级(如段落 \n\n、句子 \n、空格 ` `)进行切分,直到块大小符合要求。
from langchain.text_splitter import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50,
separators=["\n\n", "\n", "。", "!", "?", " ", ""]
)
chunks = text_splitter.split_text(long_document)
2. 语义分块 (Semantic Chunking)
基于语义变化点进行切分,而不是固定的字符数。
- 原理:计算相邻句子之间的 Embedding 相似度,当相似度大幅下降时,认为语义发生了转折,在此处切分。
评估体系:RAGAS 框架
如何量化 RAG 的好坏?RAGAS 提出了三个核心指标(RAG 三元组):
- Faithfulness (忠实度):回答是否完全基于检索到的上下文?(防止幻觉)
- Answer Relevance (回答相关性):回答是否直接解决了用户的问题?
- Context Precision (上下文精准度):检索到的 Top-K 文档中,真正有用的文档排在前面吗?
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevance, context_precision
# 准备评估数据
data = {
"question": ["什么是 RAG?"],
"answer": ["RAG 是检索增强生成..."],
"contexts": [["RAG 结合了检索和生成..."]],
"ground_truth": ["RAG 是一种利用外部知识库增强 LLM 输出的技术。"]
}
# 执行评估
result = evaluate(dataset, metrics=[faithfulness, answer_relevance, context_precision])
print(result)
基础RAG实现
完整Pipeline
from openai import OpenAI
from sentence_transformers import SentenceTransformer
import chromadb
from typing import List, Tuple
class SimpleRAG:
def __init__(self,
embedding_model: str = "BAAI/bge-large-zh-v1.5",
llm_model: str = "gpt-4o-mini"):
# 初始化组件
self.embedder = SentenceTransformer(embedding_model)
self.client = OpenAI()
self.llm_model = llm_model
# 初始化向量数据库
self.chroma = chromadb.Client()
self.collection = self.chroma.create_collection("documents")
def add_documents(self, documents: List[str], ids: List[str] = None):
"""添加文档到知识库"""
if ids is None:
ids = [f"doc_{i}" for i in range(len(documents))]
embeddings = self.embedder.encode(documents).tolist()
self.collection.add(
documents=documents,
embeddings=embeddings,
ids=ids
)
print(f"已添加 {len(documents)} 个文档")
def retrieve(self, query: str, top_k: int = 3) -> List[Tuple[str, float]]:
"""检索相关文档"""
results = self.collection.query(
query_texts=[query],
n_results=top_k,
include=["documents", "distances"]
)
docs = results['documents'][0]
distances = results['distances'][0]
return list(zip(docs, distances))
def generate(self, query: str, context: str) -> str:
"""使用LLM生成回答"""
prompt = f"""基于以下上下文信息回答用户问题。如果上下文中没有相关信息,请说明无法回答。
上下文:
{context}
用户问题:{query}
回答:"""
response = self.client.chat.completions.create(
model=self.llm_model,
messages=[
{"role": "system", "content": "你是一个有帮助的助手,基于提供的上下文回答问题。"},
{"role": "user", "content": prompt}
],
temperature=0.7
)
return response.choices[0].message.content
def query(self, question: str, top_k: int = 3) -> dict:
"""RAG查询"""
# 1. 检索
retrieved = self.retrieve(question, top_k)
context = "\n\n".join([doc for doc, _ in retrieved])
# 2. 生成
answer = self.generate(question, context)
return {
"question": question,
"answer": answer,
"sources": retrieved
}
# 使用示例
rag = SimpleRAG()
# 添加知识库文档
documents = [
"RAG(检索增强生成)是一种将检索系统与大语言模型结合的技术。它通过检索相关文档来增强LLM的回答能力。",
"向量数据库是专门用于存储和检索高维向量的数据库。常见的向量数据库包括Chroma、Milvus、Pinecone等。",
"Embedding是将文本转换为向量表示的技术。好的Embedding能够捕捉文本的语义信息。",
"LangChain是一个用于构建LLM应用的框架,提供了RAG、Agent等常用功能的封装。",
]
rag.add_documents(documents)
# 查询
result = rag.query("什么是RAG?它有什么用?")
print(f"问题: {result['question']}")
print(f"回答: {result['answer']}")
print(f"\n参考文档:")
for doc, score in result['sources']:
print(f" [{score:.4f}] {doc[:50]}...")
文档处理
文档加载
from pathlib import Path
import PyPDF2
import docx
class DocumentLoader:
@staticmethod
def load_txt(file_path: str) -> str:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
@staticmethod
def load_pdf(file_path: str) -> str:
text = ""
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
for page in reader.pages:
text += page.extract_text() + "\n"
return text
@staticmethod
def load_docx(file_path: str) -> str:
doc = docx.Document(file_path)
return "\n".join([para.text for para in doc.paragraphs])
@staticmethod
def load_file(file_path: str) -> str:
path = Path(file_path)
suffix = path.suffix.lower()
loaders = {
'.txt': DocumentLoader.load_txt,
'.md': DocumentLoader.load_txt,
'.pdf': DocumentLoader.load_pdf,
'.docx': DocumentLoader.load_docx,
}
if suffix in loaders:
return loaders[suffix](file_path)
else:
raise ValueError(f"不支持的文件格式: {suffix}")
文档分块策略
from typing import List
import re
class TextSplitter:
def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def split_by_char(self, text: str) -> List[str]:
"""按字符数分割"""
chunks = []
start = 0
while start < len(text):
end = start + self.chunk_size
chunk = text[start:end]
chunks.append(chunk.strip())
start = end - self.chunk_overlap
return [c for c in chunks if c]
def split_by_sentence(self, text: str) -> List[str]:
"""按句子分割,保持语义完整"""
# 按句号、问号、感叹号分句
sentences = re.split(r'([。!?\.\!\?])', text)
# 重新组合句子和标点
sentences = [''.join(sentences[i:i+2])
for i in range(0, len(sentences)-1, 2)]
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) <= self.chunk_size:
current_chunk += sentence
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
def split_by_paragraph(self, text: str) -> List[str]:
"""按段落分割"""
paragraphs = text.split('\n\n')
chunks = []
current_chunk = ""
for para in paragraphs:
para = para.strip()
if not para:
continue
if len(current_chunk) + len(para) <= self.chunk_size:
current_chunk += para + "\n\n"
else:
if current_chunk:
chunks.append(current_chunk.strip())
# 段落太长则进一步分割
if len(para) > self.chunk_size:
sub_chunks = self.split_by_char(para)
chunks.extend(sub_chunks)
current_chunk = ""
else:
current_chunk = para + "\n\n"
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
# 使用示例
splitter = TextSplitter(chunk_size=300, chunk_overlap=30)
text = "这是一段很长的文本..." * 50
chunks = splitter.split_by_sentence(text)
print(f"分割成 {len(chunks)} 个块")
高级检索策略
Query改写
def rewrite_query(client, query: str) -> List[str]:
"""使用LLM改写查询,生成多个变体"""
prompt = f"""将以下查询改写成3个不同的表述方式,以便更好地检索相关文档。
原始查询:{query}
请直接输出3个改写后的查询,每行一个:"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.7
)
rewritten = response.choices[0].message.content.strip().split('\n')
return [q.strip() for q in rewritten if q.strip()]
# 使用示例
queries = rewrite_query(client, "RAG怎么提高效果")
# 可能返回:
# ["如何优化RAG系统的性能", "提升RAG检索质量的方法", "RAG效果优化技巧"]
混合检索
from rank_bm25 import BM25Okapi
import jieba
class HybridRetriever:
def __init__(self, documents: List[str], embedder):
self.documents = documents
self.embedder = embedder
# 向量检索
self.embeddings = embedder.encode(documents)
# BM25关键词检索
tokenized_docs = [list(jieba.cut(doc)) for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
def search(self, query: str, top_k: int = 5,
alpha: float = 0.5) -> List[Tuple[str, float]]:
"""混合检索:结合向量检索和BM25"""
# 向量检索分数
query_vec = self.embedder.encode([query])[0]
vector_scores = np.dot(self.embeddings, query_vec)
vector_scores = (vector_scores - vector_scores.min()) / \
(vector_scores.max() - vector_scores.min() + 1e-8)
# BM25分数
tokenized_query = list(jieba.cut(query))
bm25_scores = np.array(self.bm25.get_scores(tokenized_query))
bm25_scores = (bm25_scores - bm25_scores.min()) / \
(bm25_scores.max() - bm25_scores.min() + 1e-8)
# 混合分数
hybrid_scores = alpha * vector_scores + (1 - alpha) * bm25_scores
# 获取Top-K
top_indices = np.argsort(hybrid_scores)[-top_k:][::-1]
results = []
for idx in top_indices:
results.append((self.documents[idx], float(hybrid_scores[idx])))
return results
Reranking重排序
from sentence_transformers import CrossEncoder
class Reranker:
def __init__(self, model_name: str = "BAAI/bge-reranker-large"):
self.model = CrossEncoder(model_name)
def rerank(self, query: str, documents: List[str],
top_k: int = 3) -> List[Tuple[str, float]]:
"""使用CrossEncoder重排序"""
pairs = [[query, doc] for doc in documents]
scores = self.model.predict(pairs)
# 排序
doc_scores = list(zip(documents, scores))
doc_scores.sort(key=lambda x: x[1], reverse=True)
return doc_scores[:top_k]
# 使用示例
reranker = Reranker()
# 先用向量检索获取Top-20,再用Reranker精排到Top-3
candidates = retriever.search(query, top_k=20)
reranked = reranker.rerank(query, [doc for doc, _ in candidates], top_k=3)
生产级RAG架构
class ProductionRAG:
def __init__(self, config: dict):
self.config = config
self._init_components()
def _init_components(self):
"""初始化各组件"""
# Embedding模型
self.embedder = SentenceTransformer(
self.config.get("embedding_model", "BAAI/bge-large-zh-v1.5")
)
# 向量数据库
self.vector_db = self._init_vector_db()
# Reranker
self.reranker = Reranker()
# LLM客户端
self.llm_client = OpenAI()
def _init_vector_db(self):
"""初始化向量数据库"""
# 可以是Milvus、Qdrant等
pass
def query(self, question: str) -> dict:
"""完整RAG查询流程"""
# 1. Query处理
processed_query = self._process_query(question)
# 2. 检索
candidates = self._retrieve(processed_query, top_k=20)
# 3. 重排序
reranked = self._rerank(question, candidates, top_k=5)
# 4. 过滤低相关性结果
filtered = self._filter_by_threshold(reranked, threshold=0.5)
# 5. 构建Prompt
prompt = self._build_prompt(question, filtered)
# 6. 生成回答
answer = self._generate(prompt)
# 7. 后处理
result = self._post_process(answer, filtered)
return result
def _process_query(self, query: str) -> str:
"""查询预处理"""
# 可以包含:query改写、意图识别等
return query
def _retrieve(self, query: str, top_k: int) -> List[dict]:
"""检索"""
# 混合检索
pass
def _rerank(self, query: str, docs: List[dict], top_k: int) -> List[dict]:
"""重排序"""
texts = [doc['text'] for doc in docs]
reranked = self.reranker.rerank(query, texts, top_k)
return reranked
def _filter_by_threshold(self, docs: List, threshold: float) -> List:
"""过滤低相关性结果"""
return [(doc, score) for doc, score in docs if score >= threshold]
def _build_prompt(self, query: str, context_docs: List) -> str:
"""构建Prompt"""
context = "\n\n".join([f"[{i+1}] {doc}"
for i, (doc, _) in enumerate(context_docs)])
return f"""请基于以下参考资料回答用户问题。回答时请引用资料编号。
参考资料:
{context}
用户问题:{query}
请给出准确、有条理的回答:"""
def _generate(self, prompt: str) -> str:
"""LLM生成"""
response = self.llm_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
return response.choices[0].message.content
def _post_process(self, answer: str, sources: List) -> dict:
"""后处理"""
return {
"answer": answer,
"sources": [{"text": doc[:200], "score": score}
for doc, score in sources]
}
常见问题
Q1: 检索结果不相关怎么办?
- 调整分块策略,保持语义完整
- 使用混合检索(向量+关键词)
- 添加Reranker重排序
- 优化Embedding模型选择
Q2: 回答出现幻觉怎么办?
- 降低LLM temperature
- 在Prompt中强调”只基于上下文回答”
- 添加事实核查步骤
Q3: 如何处理长文档?
- 使用合适的分块策略
- 考虑层次化索引
- 使用支持长上下文的LLM
Q4: 如何评估RAG效果?
- 检索评估:Recall@K, MRR
- 生成评估:BLEU, ROUGE, 人工评估
- 端到端评估:回答准确率、用户满意度
总结
| 组件 | 作用 | 优化方向 |
|---|---|---|
| 文档分块 | 保持语义完整 | 按句子/段落分割 |
| 检索 | 获取相关文档 | 混合检索、多路召回 |
| 重排序 | 精细排序 | CrossEncoder |
| 生成 | 产出回答 | Prompt工程、低温度 |
参考资料
版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。
(采用 CC BY-NC-SA 4.0 许可协议进行授权)
本文标题:《 LLM应用开发——RAG检索增强生成 》
本文链接:http://localhost:3015/ai/RAG%E6%A3%80%E7%B4%A2%E5%A2%9E%E5%BC%BA%E7%94%9F%E6%88%90.html
本文最后一次更新为 天前,文章中的某些内容可能已过时!