构建LLM应用的完整工具链
前言
LangChain是一个用于构建LLM应用的强大框架,提供了模块化的组件和链式调用能力。本文从基础概念到高级用法,全面介绍LangChain的核心功能。
LangChain概述
什么是LangChain
LangChain是一个开源框架,帮助开发者构建基于大语言模型的应用程序。
| 特性 | 说明 |
|---|---|
| 模块化 | 提供可组合的组件 |
| 链式调用 | 将多个组件串联成工作流 |
| 多模型支持 | OpenAI、Anthropic、本地模型等 |
| 生态丰富 | 大量集成和社区贡献 |
核心概念
┌─────────────────────────────────────────────────────┐
│ LangChain │
├─────────────────────────────────────────────────────┤
│ Model I/O │ Retrieval │ Agents │
│ ├─ Prompts │ ├─ Loaders │ ├─ Tools │
│ ├─ LLMs │ ├─ Splitters │ ├─ Toolkits │
│ └─ Output │ ├─ Embeddings │ └─ Executors │
│ Parsers │ └─ Vectorstore│ │
├─────────────────────────────────────────────────────┤
│ Chains │ Memory │ Callbacks │
└─────────────────────────────────────────────────────┘
安装
# 基础安装
pip install langchain langchain-core
# 安装OpenAI集成
pip install langchain-openai
# 安装社区集成包
pip install langchain-community
# 安装常用向量数据库支持
pip install chromadb faiss-cpu
Model I/O
LLM与ChatModel
from langchain_openai import ChatOpenAI, OpenAI
from langchain_core.messages import HumanMessage, SystemMessage
# Chat Model(推荐)
chat = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.7,
max_tokens=1000
)
# 使用消息列表调用
messages = [
SystemMessage(content="你是一个专业的Python开发者"),
HumanMessage(content="如何在Python中实现单例模式?")
]
response = chat.invoke(messages)
print(response.content)
# 简化调用
response = chat.invoke("解释什么是装饰器")
print(response.content)
流式输出
from langchain_openai import ChatOpenAI
chat = ChatOpenAI(model="gpt-4o-mini", streaming=True)
# 同步流式
for chunk in chat.stream("写一首关于编程的诗"):
print(chunk.content, end="", flush=True)
# 异步流式
async def stream_response():
async for chunk in chat.astream("解释递归"):
print(chunk.content, end="", flush=True)
Prompt Templates
from langchain_core.prompts import (
ChatPromptTemplate,
PromptTemplate,
MessagesPlaceholder
)
# 简单模板
simple_template = PromptTemplate.from_template(
"将以下文本翻译成{language}:\n{text}"
)
prompt = simple_template.format(language="日语", text="你好世界")
# Chat Prompt模板
chat_template = ChatPromptTemplate.from_messages([
("system", "你是一个{role},专注于{domain}领域。"),
("human", "{question}")
])
messages = chat_template.format_messages(
role="资深架构师",
domain="分布式系统",
question="如何设计一个高可用的消息队列?"
)
# 带历史记录的模板
history_template = ChatPromptTemplate.from_messages([
("system", "你是一个有帮助的助手。"),
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
])
Output Parsers
from langchain_core.output_parsers import (
StrOutputParser,
JsonOutputParser,
PydanticOutputParser
)
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List
# 字符串解析器
str_parser = StrOutputParser()
# JSON解析器
json_parser = JsonOutputParser()
# Pydantic解析器
class CodeReview(BaseModel):
issues: List[str] = Field(description="代码问题列表")
score: int = Field(description="代码质量评分(1-10)")
suggestions: List[str] = Field(description="改进建议")
pydantic_parser = PydanticOutputParser(pydantic_object=CodeReview)
# 获取格式化指令
format_instructions = pydantic_parser.get_format_instructions()
# 在Prompt中使用
review_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个代码审查专家。"),
("human", """请审查以下代码并按指定格式输出:
{format_instructions}
代码:
```python
{code}
”””) ])
---
#### 链(Chains)
#### LCEL (LangChain Expression Language)
```python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 创建组件
prompt = ChatPromptTemplate.from_template(
"用简洁的语言解释{concept},适合初学者理解。"
)
model = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()
# 使用 | 操作符组合成链
chain = prompt | model | parser
# 调用链
result = chain.invoke({"concept": "微服务架构"})
print(result)
# 批量处理
concepts = [
{"concept": "容器化"},
{"concept": "CI/CD"},
{"concept": "服务网格"}
]
results = chain.batch(concepts)
# 流式处理
for chunk in chain.stream({"concept": "Kubernetes"}):
print(chunk, end="", flush=True)
复杂链组合
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
# 并行执行
from langchain_core.runnables import RunnableParallel
analysis_chain = RunnableParallel(
summary=summary_prompt | model | parser,
sentiment=sentiment_prompt | model | parser,
keywords=keywords_prompt | model | parser
)
result = analysis_chain.invoke({"text": "这是一段需要分析的文本..."})
# result = {"summary": "...", "sentiment": "...", "keywords": "..."}
# 条件分支
from langchain_core.runnables import RunnableBranch
branch_chain = RunnableBranch(
(lambda x: x["type"] == "code", code_chain),
(lambda x: x["type"] == "doc", doc_chain),
default_chain # 默认分支
)
# 数据转换
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| model
| parser
)
链的调试与追踪
from langchain_core.globals import set_debug, set_verbose
# 开启调试模式
set_debug(True)
set_verbose(True)
# 使用回调
from langchain_core.callbacks import StdOutCallbackHandler
chain.invoke(
{"concept": "Docker"},
config={"callbacks": [StdOutCallbackHandler()]}
)
# 添加中间步骤日志
chain_with_logging = (
prompt
| RunnableLambda(lambda x: (print(f"Prompt: {x}"), x)[1])
| model
| RunnableLambda(lambda x: (print(f"Response: {x}"), x)[1])
| parser
)
文档加载与处理
Document Loaders
from langchain_community.document_loaders import (
TextLoader,
PyPDFLoader,
UnstructuredMarkdownLoader,
WebBaseLoader,
DirectoryLoader
)
# 加载文本文件
text_loader = TextLoader("document.txt", encoding="utf-8")
docs = text_loader.load()
# 加载PDF
pdf_loader = PyPDFLoader("document.pdf")
pdf_docs = pdf_loader.load() # 每页一个Document
# 加载Markdown
md_loader = UnstructuredMarkdownLoader("README.md")
md_docs = md_loader.load()
# 加载网页
web_loader = WebBaseLoader("https://example.com/article")
web_docs = web_loader.load()
# 加载目录下所有文件
dir_loader = DirectoryLoader(
"docs/",
glob="**/*.md",
loader_cls=UnstructuredMarkdownLoader
)
all_docs = dir_loader.load()
print(f"加载了 {len(all_docs)} 个文档")
Text Splitters
from langchain_text_splitters import (
RecursiveCharacterTextSplitter,
CharacterTextSplitter,
TokenTextSplitter,
MarkdownHeaderTextSplitter
)
# 递归字符分割器(推荐)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
separators=["\n\n", "\n", "。", "!", "?", ".", " ", ""]
)
chunks = text_splitter.split_documents(docs)
# Token分割器
token_splitter = TokenTextSplitter(
chunk_size=500,
chunk_overlap=50
)
# Markdown标题分割器
md_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[
("#", "h1"),
("##", "h2"),
("###", "h3"),
]
)
md_chunks = md_splitter.split_text(markdown_text)
文档处理管道
from langchain_core.documents import Document
class DocumentProcessor:
def __init__(self):
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
def process(self, docs: list[Document]) -> list[Document]:
# 1. 清洗文档
cleaned_docs = [self._clean_doc(doc) for doc in docs]
# 2. 分割文档
chunks = self.splitter.split_documents(cleaned_docs)
# 3. 添加元数据
for i, chunk in enumerate(chunks):
chunk.metadata["chunk_id"] = i
chunk.metadata["char_count"] = len(chunk.page_content)
return chunks
def _clean_doc(self, doc: Document) -> Document:
# 清除多余空白
content = " ".join(doc.page_content.split())
# 移除特殊字符
content = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', content)
return Document(
page_content=content,
metadata=doc.metadata
)
向量存储与检索
VectorStore
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma, FAISS
# 创建Embeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# 使用Chroma
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory="./chroma_db"
)
# 使用FAISS
faiss_store = FAISS.from_documents(chunks, embeddings)
# 保存和加载
faiss_store.save_local("faiss_index")
loaded_store = FAISS.load_local(
"faiss_index",
embeddings,
allow_dangerous_deserialization=True
)
Retriever
# 基础检索器
retriever = vectorstore.as_retriever(
search_type="similarity", # 或 "mmr"
search_kwargs={"k": 5}
)
docs = retriever.invoke("什么是微服务?")
# MMR检索(最大边际相关性)
mmr_retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={
"k": 5,
"fetch_k": 20,
"lambda_mult": 0.5
}
)
# 带分数的检索
docs_with_scores = vectorstore.similarity_search_with_score(
"查询文本",
k=5
)
# 多查询检索器
from langchain.retrievers import MultiQueryRetriever
multi_retriever = MultiQueryRetriever.from_llm(
retriever=retriever,
llm=ChatOpenAI()
)
高级检索策略
from langchain.retrievers import (
ContextualCompressionRetriever,
EnsembleRetriever
)
from langchain.retrievers.document_compressors import (
LLMChainExtractor
)
from langchain_community.retrievers import BM25Retriever
# 上下文压缩检索器
compressor = LLMChainExtractor.from_llm(ChatOpenAI())
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=retriever
)
# 混合检索(BM25 + 向量)
bm25_retriever = BM25Retriever.from_documents(chunks)
bm25_retriever.k = 5
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, retriever],
weights=[0.4, 0.6]
)
# 父文档检索器
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
parent_retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=InMemoryStore(),
child_splitter=RecursiveCharacterTextSplitter(chunk_size=200),
parent_splitter=RecursiveCharacterTextSplitter(chunk_size=2000),
)
记忆(Memory)
对话记忆
from langchain.memory import (
ConversationBufferMemory,
ConversationBufferWindowMemory,
ConversationSummaryMemory
)
# 完整缓冲记忆
buffer_memory = ConversationBufferMemory(
return_messages=True,
memory_key="history"
)
# 窗口记忆(只保留最近k轮)
window_memory = ConversationBufferWindowMemory(
k=5,
return_messages=True
)
# 摘要记忆
summary_memory = ConversationSummaryMemory(
llm=ChatOpenAI(),
return_messages=True
)
# 在链中使用记忆
from langchain.chains import ConversationChain
conversation = ConversationChain(
llm=ChatOpenAI(),
memory=buffer_memory,
verbose=True
)
# 对话
response1 = conversation.predict(input="我叫张三")
response2 = conversation.predict(input="我的名字是什么?")
使用LCEL管理记忆
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
# 存储会话历史
store = {}
def get_session_history(session_id: str):
if session_id not in store:
store[session_id] = ChatMessageHistory()
return store[session_id]
# 创建带记忆的链
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个有帮助的助手。"),
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
])
chain = prompt | ChatOpenAI() | StrOutputParser()
chain_with_history = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="input",
history_messages_key="history"
)
# 使用时指定session_id
config = {"configurable": {"session_id": "user_123"}}
response = chain_with_history.invoke(
{"input": "你好!"},
config=config
)
工具与Agent
自定义工具
from langchain_core.tools import tool, StructuredTool
from langchain_core.pydantic_v1 import BaseModel, Field
# 使用装饰器定义工具
@tool
def search_web(query: str) -> str:
"""搜索互联网获取信息。使用这个工具来查找最新的信息。"""
# 实际实现搜索逻辑
return f"搜索 '{query}' 的结果..."
@tool
def calculate(expression: str) -> str:
"""计算数学表达式。输入应该是有效的数学表达式。"""
try:
result = eval(expression)
return str(result)
except Exception as e:
return f"计算错误: {e}"
# 带结构化输入的工具
class SearchInput(BaseModel):
query: str = Field(description="搜索查询")
max_results: int = Field(default=5, description="最大结果数")
@tool("advanced_search", args_schema=SearchInput)
def advanced_search(query: str, max_results: int = 5) -> str:
"""高级搜索功能,支持限制结果数量。"""
return f"搜索 '{query}',返回 {max_results} 条结果"
# 从函数创建工具
def get_weather(city: str) -> str:
"""获取城市天气"""
return f"{city}今天天气晴朗,温度25°C"
weather_tool = StructuredTool.from_function(
func=get_weather,
name="weather",
description="获取指定城市的天气信息"
)
创建Agent
from langchain.agents import create_tool_calling_agent, AgentExecutor
# 准备工具
tools = [search_web, calculate, weather_tool]
# 创建Agent Prompt
agent_prompt = ChatPromptTemplate.from_messages([
("system", """你是一个有帮助的AI助手。
你可以使用以下工具来帮助回答问题:
{tools}
请根据用户的问题选择合适的工具。如果不需要工具,直接回答即可。"""),
MessagesPlaceholder(variable_name="chat_history", optional=True),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
# 创建Agent
agent = create_tool_calling_agent(
llm=ChatOpenAI(model="gpt-4o"),
tools=tools,
prompt=agent_prompt
)
# 创建Agent Executor
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=5,
handle_parsing_errors=True
)
# 运行Agent
result = agent_executor.invoke({
"input": "北京今天的天气怎么样?另外帮我算一下 123 * 456"
})
print(result["output"])
ReAct Agent
from langchain.agents import create_react_agent
from langchain import hub
# 使用LangChain Hub的ReAct prompt
react_prompt = hub.pull("hwchase17/react")
# 创建ReAct Agent
react_agent = create_react_agent(
llm=ChatOpenAI(model="gpt-4o"),
tools=tools,
prompt=react_prompt
)
react_executor = AgentExecutor(
agent=react_agent,
tools=tools,
verbose=True,
max_iterations=10
)
RAG应用完整示例
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_community.document_loaders import DirectoryLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
class RAGApplication:
def __init__(self, docs_path: str):
self.embeddings = OpenAIEmbeddings(
model="text-embedding-3-small"
)
self.llm = ChatOpenAI(model="gpt-4o-mini")
self.vectorstore = None
self.docs_path = docs_path
def index_documents(self):
"""索引文档"""
# 加载文档
loader = DirectoryLoader(
self.docs_path,
glob="**/*.md",
show_progress=True
)
documents = loader.load()
# 分割文档
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = splitter.split_documents(documents)
# 创建向量存储
self.vectorstore = Chroma.from_documents(
documents=chunks,
embedding=self.embeddings,
persist_directory="./rag_db"
)
print(f"已索引 {len(chunks)} 个文档块")
def create_chain(self):
"""创建RAG链"""
# 检索器
retriever = self.vectorstore.as_retriever(
search_kwargs={"k": 5}
)
# Prompt模板
prompt = ChatPromptTemplate.from_messages([
("system", """你是一个知识库助手。根据提供的上下文信息回答问题。
如果上下文中没有相关信息,请诚实地说不知道。
上下文信息:
{context}"""),
("human", "{question}")
])
# 格式化检索结果
def format_docs(docs):
return "\n\n---\n\n".join(
f"[来源: {doc.metadata.get('source', '未知')}]\n{doc.page_content}"
for doc in docs
)
# 构建链
chain = (
{
"context": retriever | format_docs,
"question": RunnablePassthrough()
}
| prompt
| self.llm
| StrOutputParser()
)
return chain
def query(self, question: str) -> str:
"""查询知识库"""
if self.vectorstore is None:
raise ValueError("请先索引文档")
chain = self.create_chain()
return chain.invoke(question)
# 使用示例
if __name__ == "__main__":
rag = RAGApplication("./documents")
rag.index_documents()
questions = [
"什么是微服务架构?",
"如何实现服务发现?",
"分布式事务有哪些解决方案?"
]
for q in questions:
print(f"\n问题: {q}")
print(f"回答: {rag.query(q)}")
工业级进阶:LangGraph 与 LangSmith
在构建复杂的生产级应用时,简单的线性链(Chains)往往难以处理循环逻辑和复杂的状态管理。
1. LangGraph:构建有状态的多 Agent 系统
LangGraph 是 LangChain 的扩展,允许你构建包含循环(Cycles)的图结构,非常适合实现复杂的 Agent 逻辑。
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
# 1. 定义状态
class State(TypedDict):
messages: Annotated[list, add_messages]
# 2. 定义节点
def chatbot(state: State):
return {"messages": [llm.invoke(state["messages"])]}
# 3. 构建图
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)
graph_builder.set_entry_point("chatbot")
graph_builder.add_edge("chatbot", END)
# 4. 编译并运行
graph = graph_builder.compile()
for event in graph.stream({"messages": [("user", "你好")]}):
print(event)
为什么使用 LangGraph?
- 循环支持:支持 ReAct 等需要反复迭代的模式。
- 状态持久化:可以轻松保存和恢复对话状态(Checkpoints)。
- 细粒度控制:可以精确控制 Agent 的每一步决策。
2. LangSmith:调试、测试与监控
LangSmith 是 LangChain 推出的开发者平台,解决了 LLM 应用“黑盒”难以调试的问题。
- Tracing (追踪):可视化查看 Prompt 是如何构建的、检索到了哪些文档、LLM 的原始响应是什么。
- Evaluation (评估):自动化运行测试集,对比不同 Prompt 或模型的表现。
- Monitoring (监控):监控生产环境中的 Token 消耗、延迟和错误率。
# 启用 LangSmith 追踪
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your_api_key"
os.environ["LANGCHAIN_PROJECT"] = "my_project"
总结
LangChain提供了构建LLM应用的完整工具链:
| 组件 | 功能 | 应用场景 |
|---|---|---|
| Model I/O | 模型交互、Prompt、解析 | 所有LLM应用 |
| Chains | 组件编排、工作流 | 复杂任务处理 |
| Retrieval | 文档加载、向量存储 | RAG系统 |
| Memory | 对话历史管理 | 聊天机器人 |
| Agents | 工具使用、自主决策 | 智能助手 |
掌握LangChain,能够快速构建各种LLM应用。
参考资源
版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。
(采用 CC BY-NC-SA 4.0 许可协议进行授权)
本文标题:《 LLM应用开发——LangChain框架 》
本文链接:http://localhost:3015/ai/LangChain%E6%A1%86%E6%9E%B6.html
本文最后一次更新为 天前,文章中的某些内容可能已过时!
目录
- 前言
- LangChain概述
- 什么是LangChain
- 核心概念
- 安装
- Model I/O
- LLM与ChatModel
- 流式输出
- Prompt Templates
- Output Parsers
- 复杂链组合
- 链的调试与追踪
- 文档加载与处理
- Document Loaders
- Text Splitters
- 文档处理管道
- 向量存储与检索
- VectorStore
- Retriever
- 高级检索策略
- 记忆(Memory)
- 对话记忆
- 使用LCEL管理记忆
- 工具与Agent
- 自定义工具
- 创建Agent
- ReAct Agent
- RAG应用完整示例
- 工业级进阶:LangGraph 与 LangSmith
- 1. LangGraph:构建有状态的多 Agent 系统
- 2. LangSmith:调试、测试与监控
- 总结
- 参考资源