图结构增强的检索增强生成技术
前言
传统 RAG 基于向量相似度检索,难以捕捉实体间的复杂关系。GraphRAG 将知识图谱与 RAG 结合,通过图结构表示实体关系,实现更精准的知识检索和推理。本文深入介绍 GraphRAG 的原理与实现。
GraphRAG 概述
为什么需要 GraphRAG
┌─────────────────────────────────────────────────────────────────┐
│ 传统 RAG vs GraphRAG │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 传统 RAG: │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 文档块1 │ │ 文档块2 │ │ 文档块3 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ │ │
│ ┌───────┴───────┐ │
│ │ 向量相似度 │ 只能捕捉语义相似 │
│ └───────────────┘ │
│ │
│ GraphRAG: │
│ ┌─────────────────────────────────┐ │
│ │ 知识图谱 │ │
│ │ [实体A]──关系──[实体B] │ │
│ │ │ │ │ │
│ │ ▼ ▼ │ │
│ │ [实体C]──关系──[实体D] │ │
│ └─────────────────────────────────┘ │
│ │ │
│ 捕捉实体关系 + 多跳推理 │
│ │
└─────────────────────────────────────────────────────────────────┘
GraphRAG 优势
| 特性 | 传统 RAG | GraphRAG |
|---|---|---|
| 检索方式 | 向量相似度 | 图遍历 + 向量 |
| 关系理解 | 隐式 | 显式 |
| 多跳推理 | 困难 | 原生支持 |
| 全局视图 | 缺乏 | 社区摘要 |
| 可解释性 | 低 | 高 |
知识图谱基础
三元组与实体关系
from dataclasses import dataclass
from typing import List, Dict, Optional
@dataclass
class Entity:
"""实体"""
id: str
name: str
type: str
properties: Dict = None
@dataclass
class Relation:
"""关系"""
source: str # 源实体 ID
target: str # 目标实体 ID
type: str # 关系类型
properties: Dict = None
@dataclass
class Triple:
"""三元组"""
subject: Entity
predicate: str
object: Entity
class KnowledgeGraph:
def __init__(self):
self.entities = {}
self.relations = []
def add_triple(self, subject: str, predicate: str, obj: str):
"""添加三元组"""
self.relations.append((subject, predicate, obj))
---
#### GraphRAG 核心算法:Leiden 社区发现
Microsoft GraphRAG 的核心创新在于它不仅仅是检索“点”,而是检索“社区”。
#### 1. 索引阶段 (Indexing)
- **实体提取**:从文本中提取实体和关系。
- **图构建**:构建全局知识图谱。
- **社区检测 (Leiden Algorithm)**:使用 Leiden 算法将图划分为多个层级的“社区”(Communities)。
- **社区摘要**:为每个社区生成摘要。这是 GraphRAG 能够回答“全局性问题”的关键。
#### 2. 检索阶段 (Retrieval)
#### Global Search (全局搜索)
适用于回答“这组文档的主题是什么?”这类宏观问题。
- **逻辑**:检索所有相关社区的摘要,由 LLM 汇总生成最终答案。
#### Local Search (局部搜索)
适用于回答关于特定实体及其关系的问题。
- **逻辑**:结合向量检索(找到相关实体)和图遍历(找到关联实体和关系),将这些信息作为上下文发给 LLM。
```python
# GraphRAG 检索逻辑伪代码
class GraphRAG:
def local_search(self, query: str):
# 1. 向量检索找到核心实体
entities = self.vector_db.search(query)
# 2. 图数据库查询关联关系 (1-hop, 2-hop)
subgraph = self.graph_db.get_neighbors(entities)
# 3. 结合上下文生成
context = f"实体信息: {entities}\n关系网络: {subgraph}"
return self.llm.generate(query, context)
def global_search(self, query: str):
# 1. 检索预先生成的社区摘要
community_summaries = self.graph_db.get_community_summaries()
# 2. 汇总生成
return self.llm.generate(query, community_summaries)
工业级实现:Neo4j + LangChain
在生产环境中,我们通常使用 Neo4j 作为图数据库。
Cypher 语句生成
LLM 可以根据自然语言生成 Cypher 查询语句,直接从图数据库中提取数据。
from langchain_community.graphs import Neo4jGraph
from langchain.chains import GraphCypherQAChain
from langchain_openai import ChatOpenAI
# 1. 连接 Neo4j
graph = Neo4jGraph(url="bolt://localhost:7687", username="neo4j", password="password")
# 2. 创建 Cypher QA 链
chain = GraphCypherQAChain.from_llm(
ChatOpenAI(temperature=0),
graph=graph,
verbose=True
)
# 3. 查询
result = chain.run("谁是《盗梦空间》的导演?他还有哪些作品?")
print(result)
进阶:LightRAG 与 混合 RAG
LightRAG 是最近提出的优化方案,它试图在保持 GraphRAG 强大能力的同时,降低索引成本。
- 双层索引:同时维护低层级(具体实体)和高层级(抽象概念)的索引。
- 增量更新:支持在不重新构建整个图的情况下添加新文档。
混合 RAG (Hybrid RAG):
在实际应用中,最强的方案通常是:
最终答案 = 向量检索结果 + 图检索结果 + 社区摘要
总结
GraphRAG 是 RAG 技术的“下一站”。
- 传统 RAG 解决了“有没有”的问题。
- GraphRAG 解决了“为什么”和“全局观”的问题。
虽然 GraphRAG 的索引成本较高(需要多次 LLM 调用来提取关系和生成摘要),但对于需要深度推理和处理复杂关系的场景(如金融审计、法律分析、科研综述),它是目前唯一的选择。 “"”简单知识图谱”””
def __init__(self):
self.entities: Dict[str, Entity] = {}
self.relations: List[Relation] = []
def add_entity(self, entity: Entity):
"""添加实体"""
self.entities[entity.id] = entity
def add_relation(self, relation: Relation):
"""添加关系"""
self.relations.append(relation)
def get_neighbors(self, entity_id: str) -> List[tuple]:
"""获取邻居实体"""
neighbors = []
for rel in self.relations:
if rel.source == entity_id:
neighbors.append((rel.type, self.entities.get(rel.target)))
elif rel.target == entity_id:
neighbors.append((f"reverse_{rel.type}", self.entities.get(rel.source)))
return neighbors
def find_path(
self,
start_id: str,
end_id: str,
max_hops: int = 3
) -> List[List[str]]:
"""查找两实体间的路径"""
from collections import deque
if start_id == end_id:
return [[start_id]]
queue = deque([(start_id, [start_id])])
visited = {start_id}
paths = []
while queue:
current, path = queue.popleft()
if len(path) > max_hops + 1:
continue
for rel_type, neighbor in self.get_neighbors(current):
if neighbor is None:
continue
new_path = path + [f"--{rel_type}-->", neighbor.id]
if neighbor.id == end_id:
paths.append(new_path)
elif neighbor.id not in visited:
visited.add(neighbor.id)
queue.append((neighbor.id, new_path))
return paths
示例
kg = KnowledgeGraph()
添加实体
kg.add_entity(Entity(“gpt4”, “GPT-4”, “Model”)) kg.add_entity(Entity(“openai”, “OpenAI”, “Company”)) kg.add_entity(Entity(“transformer”, “Transformer”, “Architecture”))
添加关系
kg.add_relation(Relation(“gpt4”, “openai”, “developed_by”)) kg.add_relation(Relation(“gpt4”, “transformer”, “based_on”))
#### 使用 Neo4j 存储
```python
from neo4j import GraphDatabase
class Neo4jKnowledgeGraph:
"""Neo4j 知识图谱"""
def __init__(self, uri: str, user: str, password: str):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
self.driver.close()
def create_entity(self, entity: Entity):
"""创建实体节点"""
with self.driver.session() as session:
query = """
MERGE (e:Entity {id: $id})
SET e.name = $name, e.type = $type
"""
session.run(query, id=entity.id, name=entity.name, type=entity.type)
def create_relation(self, relation: Relation):
"""创建关系"""
with self.driver.session() as session:
query = f"""
MATCH (a:Entity )
MATCH (b:Entity )
MERGE (a)-[r:{relation.type}]->(b)
"""
session.run(query, source=relation.source, target=relation.target)
def query_subgraph(self, entity_id: str, hops: int = 2) -> dict:
"""查询子图"""
with self.driver.session() as session:
query = f"""
MATCH path = (e:Entity )-[*1..{hops}]-(related)
RETURN path
"""
result = session.run(query, id=entity_id)
nodes = set()
edges = []
for record in result:
path = record["path"]
for node in path.nodes:
nodes.add((node["id"], node["name"], node["type"]))
for rel in path.relationships:
edges.append((
rel.start_node["id"],
rel.type,
rel.end_node["id"]
))
return {"nodes": list(nodes), "edges": edges}
def find_shortest_path(self, start_id: str, end_id: str) -> list:
"""查找最短路径"""
with self.driver.session() as session:
query = """
MATCH path = shortestPath(
(a:Entity {id: $start})-[*]-(b:Entity {id: $end})
)
RETURN path
"""
result = session.run(query, start=start_id, end=end_id)
for record in result:
path = record["path"]
return [
{"node": node["name"], "type": node["type"]}
for node in path.nodes
]
return []
# 使用
neo4j_kg = Neo4jKnowledgeGraph(
"bolt://localhost:7687",
"neo4j",
"password"
)
知识抽取
使用 LLM 抽取实体和关系 (Pydantic 结构化输出)
在生产环境中,使用简单的 JSON Prompt 容易导致解析失败。推荐使用 LangChain 的 with_structured_output 配合 Pydantic 模型。
from typing import List, Optional
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
# 1. 定义数据模型
class Entity(BaseModel):
name: str = Field(description="实体的名称,如 'OpenAI'")
type: str = Field(description="实体的类型,如 'Organization', 'Person', 'Technology'")
description: str = Field(description="实体的简短描述")
class Relation(BaseModel):
source: str = Field(description="源实体的名称")
target: str = Field(description="目标实体的名称")
type: str = Field(description="关系类型,如 'developed_by', 'works_at'")
description: str = Field(description="关系的详细描述")
class KnowledgeGraph(BaseModel):
entities: List[Entity]
relations: List[Relation]
# 2. 配置抽取器
class KnowledgeExtractor:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
# 使用结构化输出确保 100% 解析成功
self.structured_llm = self.llm.with_structured_output(KnowledgeGraph)
def extract(self, text: str) -> KnowledgeGraph:
prompt = f"""你是一个专业的知识图谱构建专家。请从以下文本中提取所有的实体和它们之间的关系。
文本内容:
{text}
要求:
1. 确保实体名称的一致性(如 'OpenAI' 和 'OpenAI Inc' 应统一)。
2. 关系必须是双向或有向的逻辑连接。
3. 描述应包含文本中的关键细节。
"""
return self.structured_llm.invoke(prompt)
# 使用示例
extractor = KnowledgeExtractor()
text = "OpenAI 开发了 GPT-4,Sam Altman 是其首席执行官。"
graph_data = extractor.extract(text)
for entity in graph_data.entities:
print(f"实体: {entity.name} [{entity.type}]")
实体对齐与消歧 (Entity Resolution)
在构建大规模图谱时,同一个实体可能以不同名称出现。
import networkx as nx
from sklearn.metrics.pairwise import cosine_similarity
def resolve_entities(graph: nx.Graph, threshold: float = 0.85):
"""
简单的实体对齐逻辑:
1. 计算实体名称的向量相似度
2. 如果相似度高于阈值,则合并节点
"""
# 伪代码实现
nodes = list(graph.nodes(data=True))
for i in range(len(nodes)):
for j in range(i + 1, len(nodes)):
name_i, name_j = nodes[i][1]['name'], nodes[j][1]['name']
if calculate_similarity(name_i, name_j) > threshold:
# 合并节点 j 到节点 i
graph = nx.contracted_nodes(graph, nodes[i][0], nodes[j][0])
return graph
批量构建知识图谱
class KnowledgeGraphBuilder:
"""知识图谱构建器"""
def __init__(self, kg: KnowledgeGraph):
self.kg = kg
self.extractor = KnowledgeExtractor()
def build_from_documents(self, documents: List[str]) -> dict:
"""从文档列表构建知识图谱"""
total_entities = 0
total_relations = 0
for doc in documents:
result = self.extractor.extract_from_document(doc)
for entity in result["entities"]:
self.kg.add_entity(entity)
total_entities += 1
for relation in result["relations"]:
self.kg.add_relation(relation)
total_relations += 1
return {
"entities": total_entities,
"relations": total_relations
}
def merge_entities(self) -> int:
"""合并重复实体"""
# 简单的名称相似度合并
merged = 0
# 实现实体对齐逻辑...
return merged
def export_to_json(self, filepath: str):
"""导出为 JSON"""
data = {
"entities": [
{
"id": e.id,
"name": e.name,
"type": e.type,
"properties": e.properties
}
for e in self.kg.entities.values()
],
"relations": [
{
"source": r.source,
"target": r.target,
"type": r.type,
"properties": r.properties
}
for r in self.kg.relations
]
}
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
GraphRAG 核心算法实现
Microsoft GraphRAG 的核心在于分层社区发现和全局摘要检索。
1. 分层社区发现 (Leiden Algorithm)
Leiden 算法比 Louvain 更稳定,能发现更高质量的社区。
import networkx as nx
from graspologic.partition import hierarchical_leiden
def build_hierarchical_communities(graph: nx.Graph):
"""
构建分层社区结构
返回: { level: { community_id: [node_ids] } }
"""
# 使用 graspologic 提供的 Leiden 实现
community_mapping = hierarchical_leiden(graph, max_cluster_size=10)
hierarchical_results = {}
for node_id, clusters in community_mapping.items():
for level, cluster_id in enumerate(clusters):
if level not in hierarchical_results:
hierarchical_results[level] = {}
if cluster_id not in hierarchical_results[level]:
hierarchical_results[level][cluster_id] = []
hierarchical_results[level][cluster_id].append(node_id)
return hierarchical_results
2. 全局搜索 (Global Search) 的 Map-Reduce 模式
全局搜索不依赖向量相似度,而是遍历所有社区摘要,通过 LLM 进行分布式评分和汇总。
class GlobalSearcher:
def __init__(self, community_summaries: Dict[str, str]):
self.summaries = community_summaries
self.llm = ChatOpenAI(model="gpt-4o")
async def search(self, query: str):
# Step 1: Map - 并行对每个社区摘要进行评分和提取相关点
map_prompts = [
f"问题: {query}\n社区摘要: {summary}\n请提取相关信息并给出 0-10 的相关性评分。"
for summary in self.summaries.values()
]
map_results = await self.llm.abatch(map_prompts)
# Step 2: Filter - 过滤低分结果
filtered_results = [res for res in map_results if self._get_score(res) > 5]
# Step 3: Reduce - 汇总所有高相关性信息生成最终答案
reduce_prompt = f"基于以下多维度的社区信息,回答问题:{query}\n\n信息:\n" + "\n".join(filtered_results)
return self.llm.invoke(reduce_prompt)
def _get_score(self, response):
# 从 LLM 回复中解析分数的逻辑
return 7 # 示例
图向量混合检索 (Hybrid Search)
在实际应用中,我们将向量检索 (Vector)、图局部检索 (Local Graph) 和图全局检索 (Global Graph) 结合。
def hybrid_retrieval_pipeline(query: str):
# 1. 向量检索:获取语义相似的原始文档块
vector_docs = vector_db.similarity_search(query)
# 2. 图局部检索:获取查询中实体的 2-hop 邻居关系
entities = extract_entities(query)
graph_context = graph_db.query_subgraph(entities)
# 3. 图全局检索:获取高层级社区的摘要
global_context = global_searcher.search(query)
# 4. Rerank & Synthesis
final_context = combine_and_rerank(vector_docs, graph_context, global_context)
return llm.generate_answer(query, final_context)
使用 LangChain 集成
from langchain_community.graphs import Neo4jGraph
from langchain.chains import GraphCypherQAChain
class LangChainGraphRAG:
"""LangChain Graph RAG"""
def __init__(self, neo4j_url: str, username: str, password: str):
self.graph = Neo4jGraph(
url=neo4j_url,
username=username,
password=password
)
self.llm = ChatOpenAI(model="gpt-4o")
# 创建 Cypher QA Chain
self.qa_chain = GraphCypherQAChain.from_llm(
llm=self.llm,
graph=self.graph,
verbose=True
)
def query(self, question: str) -> str:
"""自然语言查询"""
result = self.qa_chain.invoke({"query": question})
return result["result"]
def get_schema(self) -> str:
"""获取图谱 Schema"""
return self.graph.schema
# 使用
langchain_graph = LangChainGraphRAG(
"bolt://localhost:7687",
"neo4j",
"password"
)
answer = langchain_graph.query("谁是 OpenAI 的 CEO?")
图向量混合检索
class HybridGraphVectorRetriever:
"""图 + 向量混合检索"""
def __init__(self, graph_rag: GraphRAG, vectorstore):
self.graph_rag = graph_rag
self.vectorstore = vectorstore
def retrieve(
self,
query: str,
graph_weight: float = 0.4,
vector_weight: float = 0.6,
k: int = 5
) -> List[dict]:
"""混合检索"""
# 图检索
graph_results = self.graph_rag.local_search(query, k=k)
graph_context = [
{
"content": f"{r['entity']}: {r['relations']}",
"source": "graph",
"score": 1.0 - i * 0.1
}
for i, r in enumerate(graph_results)
]
# 向量检索
vector_docs = self.vectorstore.similarity_search_with_score(query, k=k)
vector_context = [
{
"content": doc.page_content,
"source": "vector",
"score": 1 / (1 + score)
}
for doc, score in vector_docs
]
# 合并并加权
all_results = []
for r in graph_context:
r["weighted_score"] = r["score"] * graph_weight
all_results.append(r)
for r in vector_context:
r["weighted_score"] = r["score"] * vector_weight
all_results.append(r)
# 排序
all_results.sort(key=lambda x: x["weighted_score"], reverse=True)
return all_results[:k]
def query(self, question: str, k: int = 5) -> str:
"""查询"""
results = self.retrieve(question, k=k)
context = "\n".join([r["content"] for r in results])
prompt = f"""基于以下信息回答问题。
信息来源:
{context}
问题:{question}
答案:"""
llm = ChatOpenAI(model="gpt-4o")
response = llm.invoke(prompt)
return response.content
知识图谱问答
Cypher 生成
class Text2Cypher:
"""自然语言转 Cypher"""
def __init__(self, schema: str):
self.schema = schema
self.llm = ChatOpenAI(model="gpt-4o")
def generate_cypher(self, question: str) -> str:
"""生成 Cypher 查询"""
prompt = f"""将自然语言问题转换为 Neo4j Cypher 查询。
图谱 Schema:
{self.schema}
问题:{question}
Cypher 查询:"""
response = self.llm.invoke(prompt)
return response.content.strip()
def validate_cypher(self, cypher: str) -> bool:
"""验证 Cypher 语法"""
# 简单验证
required = ["MATCH", "RETURN"]
dangerous = ["DELETE", "REMOVE", "DROP", "CREATE", "SET"]
cypher_upper = cypher.upper()
if not any(kw in cypher_upper for kw in required):
return False
if any(kw in cypher_upper for kw in dangerous):
return False
return True
# 示例 Schema
schema = """
Nodes:
- Entity (id, name, type)
- Person (id, name, role)
- Company (id, name, founded)
Relationships:
- DEVELOPED_BY: Entity -> Company
- WORKS_AT: Person -> Company
- BASED_ON: Entity -> Entity
"""
text2cypher = Text2Cypher(schema)
cypher = text2cypher.generate_cypher("OpenAI 开发了哪些产品?")
# 输出: MATCH (c:Company {name: 'OpenAI'})<-[:DEVELOPED_BY]-(e:Entity) RETURN e.name
实战案例:企业知识图谱
class EnterpriseKnowledgeGraph:
"""企业知识图谱系统"""
def __init__(self, neo4j_config: dict):
self.kg = Neo4jKnowledgeGraph(**neo4j_config)
self.extractor = KnowledgeExtractor()
self.embeddings = OpenAIEmbeddings()
def ingest_document(self, document: str, source: str):
"""导入文档"""
# 1. 抽取知识
result = self.extractor.extract_from_document(document)
# 2. 添加到图谱
for entity in result["entities"]:
entity.properties["source"] = source
self.kg.create_entity(entity)
for relation in result["relations"]:
self.kg.create_relation(relation)
return {
"entities": len(result["entities"]),
"relations": len(result["relations"])
}
def semantic_search(self, query: str, k: int = 5) -> list:
"""语义搜索实体"""
# 获取所有实体
with self.kg.driver.session() as session:
result = session.run("MATCH (e:Entity) RETURN e.name, e.id")
entities = [(r["e.name"], r["e.id"]) for r in result]
# 嵌入并搜索
query_emb = self.embeddings.embed_query(query)
entity_embs = self.embeddings.embed_documents([e[0] for e in entities])
similarities = []
for i, emb in enumerate(entity_embs):
sim = np.dot(query_emb, emb) / (
np.linalg.norm(query_emb) * np.linalg.norm(emb)
)
similarities.append((entities[i], sim))
similarities.sort(key=lambda x: x[1], reverse=True)
return [
{"name": e[0], "id": e[1], "score": s}
for (e, s) in similarities[:k]
]
def answer_question(self, question: str) -> dict:
"""回答问题"""
# 1. 语义搜索相关实体
relevant_entities = self.semantic_search(question, k=3)
# 2. 获取子图
subgraph_info = []
for entity in relevant_entities:
subgraph = self.kg.query_subgraph(entity["id"], hops=2)
subgraph_info.append(subgraph)
# 3. 构建上下文
context = self._format_subgraph(subgraph_info)
# 4. 生成答案
llm = ChatOpenAI(model="gpt-4o")
prompt = f"""基于知识图谱信息回答问题。
知识图谱:
{context}
问题:{question}
答案:"""
response = llm.invoke(prompt)
return {
"answer": response.content,
"entities": relevant_entities,
"subgraph": subgraph_info
}
def _format_subgraph(self, subgraphs: list) -> str:
"""格式化子图"""
lines = []
for sg in subgraphs:
for node in sg.get("nodes", []):
lines.append(f"实体: {node[1]} (类型: {node[2]})")
for edge in sg.get("edges", []):
lines.append(f"关系: {edge[0]} --{edge[1]}--> {edge[2]}")
return "\n".join(lines)
最佳实践
| 场景 | 推荐方案 |
|---|---|
| 简单问答 | 向量 RAG |
| 实体关系查询 | GraphRAG Local |
| 全局主题理解 | GraphRAG Global |
| 多跳推理 | 图遍历 + LLM |
| 混合需求 | Hybrid 检索 |
参考资源
版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。
(采用 CC BY-NC-SA 4.0 许可协议进行授权)
本文标题:《 LLM应用开发——GraphRAG与知识图谱 》
本文链接:http://localhost:3015/ai/GraphRAG%E7%9F%A5%E8%AF%86%E5%9B%BE%E8%B0%B1.html
本文最后一次更新为 天前,文章中的某些内容可能已过时!
目录
- 前言
- GraphRAG 概述
- 为什么需要 GraphRAG
- GraphRAG 优势
- 知识图谱基础
- 三元组与实体关系
- 工业级实现:Neo4j + LangChain
- Cypher 语句生成
- 进阶:LightRAG 与 混合 RAG
- 总结
- 知识抽取
- 使用 LLM 抽取实体和关系 (Pydantic 结构化输出)
- 实体对齐与消歧 (Entity Resolution)
- 批量构建知识图谱
- GraphRAG 核心算法实现
- 1. 分层社区发现 (Leiden Algorithm)
- 2. 全局搜索 (Global Search) 的 Map-Reduce 模式
- 图向量混合检索 (Hybrid Search)
- 使用 LangChain 集成
- 图向量混合检索
- 知识图谱问答
- Cypher 生成
- 实战案例:企业知识图谱
- 最佳实践
- 参考资源