构建LLM应用的完整工具链

前言

LangChain是一个用于构建LLM应用的强大框架,提供了模块化的组件和链式调用能力。本文从基础概念到高级用法,全面介绍LangChain的核心功能。


LangChain概述

什么是LangChain

LangChain是一个开源框架,帮助开发者构建基于大语言模型的应用程序。

特性 说明
模块化 提供可组合的组件
链式调用 将多个组件串联成工作流
多模型支持 OpenAI、Anthropic、本地模型等
生态丰富 大量集成和社区贡献

核心概念

┌─────────────────────────────────────────────────────┐
│                    LangChain                        │
├─────────────────────────────────────────────────────┤
│  Model I/O     │  Retrieval     │  Agents          │
│  ├─ Prompts    │  ├─ Loaders    │  ├─ Tools        │
│  ├─ LLMs       │  ├─ Splitters  │  ├─ Toolkits     │
│  └─ Output     │  ├─ Embeddings │  └─ Executors    │
│     Parsers    │  └─ Vectorstore│                  │
├─────────────────────────────────────────────────────┤
│  Chains        │  Memory        │  Callbacks       │
└─────────────────────────────────────────────────────┘

安装

# 基础安装
pip install langchain langchain-core

# 安装OpenAI集成
pip install langchain-openai

# 安装社区集成包
pip install langchain-community

# 安装常用向量数据库支持
pip install chromadb faiss-cpu

Model I/O

LLM与ChatModel

from langchain_openai import ChatOpenAI, OpenAI
from langchain_core.messages import HumanMessage, SystemMessage

# Chat Model(推荐)
chat = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0.7,
    max_tokens=1000
)

# 使用消息列表调用
messages = [
    SystemMessage(content="你是一个专业的Python开发者"),
    HumanMessage(content="如何在Python中实现单例模式?")
]

response = chat.invoke(messages)
print(response.content)

# 简化调用
response = chat.invoke("解释什么是装饰器")
print(response.content)

流式输出

from langchain_openai import ChatOpenAI

chat = ChatOpenAI(model="gpt-4o-mini", streaming=True)

# 同步流式
for chunk in chat.stream("写一首关于编程的诗"):
    print(chunk.content, end="", flush=True)

# 异步流式
async def stream_response():
    async for chunk in chat.astream("解释递归"):
        print(chunk.content, end="", flush=True)

Prompt Templates

from langchain_core.prompts import (
    ChatPromptTemplate,
    PromptTemplate,
    MessagesPlaceholder
)

# 简单模板
simple_template = PromptTemplate.from_template(
    "将以下文本翻译成{language}:\n{text}"
)
prompt = simple_template.format(language="日语", text="你好世界")

# Chat Prompt模板
chat_template = ChatPromptTemplate.from_messages([
    ("system", "你是一个{role},专注于{domain}领域。"),
    ("human", "{question}")
])

messages = chat_template.format_messages(
    role="资深架构师",
    domain="分布式系统",
    question="如何设计一个高可用的消息队列?"
)

# 带历史记录的模板
history_template = ChatPromptTemplate.from_messages([
    ("system", "你是一个有帮助的助手。"),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{input}")
])

Output Parsers

from langchain_core.output_parsers import (
    StrOutputParser,
    JsonOutputParser,
    PydanticOutputParser
)
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List

# 字符串解析器
str_parser = StrOutputParser()

# JSON解析器
json_parser = JsonOutputParser()

# Pydantic解析器
class CodeReview(BaseModel):
    issues: List[str] = Field(description="代码问题列表")
    score: int = Field(description="代码质量评分(1-10)")
    suggestions: List[str] = Field(description="改进建议")

pydantic_parser = PydanticOutputParser(pydantic_object=CodeReview)

# 获取格式化指令
format_instructions = pydantic_parser.get_format_instructions()

# 在Prompt中使用
review_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个代码审查专家。"),
    ("human", """请审查以下代码并按指定格式输出:

{format_instructions}

代码:
```python
{code}

”””) ])


---

#### 链(Chains)

#### LCEL (LangChain Expression Language)

```python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 创建组件
prompt = ChatPromptTemplate.from_template(
    "用简洁的语言解释{concept},适合初学者理解。"
)
model = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()

# 使用 | 操作符组合成链
chain = prompt | model | parser

# 调用链
result = chain.invoke({"concept": "微服务架构"})
print(result)

# 批量处理
concepts = [
    {"concept": "容器化"},
    {"concept": "CI/CD"},
    {"concept": "服务网格"}
]
results = chain.batch(concepts)

# 流式处理
for chunk in chain.stream({"concept": "Kubernetes"}):
    print(chunk, end="", flush=True)

复杂链组合

from langchain_core.runnables import RunnablePassthrough, RunnableLambda

# 并行执行
from langchain_core.runnables import RunnableParallel

analysis_chain = RunnableParallel(
    summary=summary_prompt | model | parser,
    sentiment=sentiment_prompt | model | parser,
    keywords=keywords_prompt | model | parser
)

result = analysis_chain.invoke({"text": "这是一段需要分析的文本..."})
# result = {"summary": "...", "sentiment": "...", "keywords": "..."}

# 条件分支
from langchain_core.runnables import RunnableBranch

branch_chain = RunnableBranch(
    (lambda x: x["type"] == "code", code_chain),
    (lambda x: x["type"] == "doc", doc_chain),
    default_chain  # 默认分支
)

# 数据转换
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | model
    | parser
)

链的调试与追踪

from langchain_core.globals import set_debug, set_verbose

# 开启调试模式
set_debug(True)
set_verbose(True)

# 使用回调
from langchain_core.callbacks import StdOutCallbackHandler

chain.invoke(
    {"concept": "Docker"},
    config={"callbacks": [StdOutCallbackHandler()]}
)

# 添加中间步骤日志
chain_with_logging = (
    prompt
    | RunnableLambda(lambda x: (print(f"Prompt: {x}"), x)[1])
    | model
    | RunnableLambda(lambda x: (print(f"Response: {x}"), x)[1])
    | parser
)

文档加载与处理

Document Loaders

from langchain_community.document_loaders import (
    TextLoader,
    PyPDFLoader,
    UnstructuredMarkdownLoader,
    WebBaseLoader,
    DirectoryLoader
)

# 加载文本文件
text_loader = TextLoader("document.txt", encoding="utf-8")
docs = text_loader.load()

# 加载PDF
pdf_loader = PyPDFLoader("document.pdf")
pdf_docs = pdf_loader.load()  # 每页一个Document

# 加载Markdown
md_loader = UnstructuredMarkdownLoader("README.md")
md_docs = md_loader.load()

# 加载网页
web_loader = WebBaseLoader("https://example.com/article")
web_docs = web_loader.load()

# 加载目录下所有文件
dir_loader = DirectoryLoader(
    "docs/",
    glob="**/*.md",
    loader_cls=UnstructuredMarkdownLoader
)
all_docs = dir_loader.load()

print(f"加载了 {len(all_docs)} 个文档")

Text Splitters

from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
    TokenTextSplitter,
    MarkdownHeaderTextSplitter
)

# 递归字符分割器(推荐)
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    length_function=len,
    separators=["\n\n", "\n", "", "", "", ".", " ", ""]
)

chunks = text_splitter.split_documents(docs)

# Token分割器
token_splitter = TokenTextSplitter(
    chunk_size=500,
    chunk_overlap=50
)

# Markdown标题分割器
md_splitter = MarkdownHeaderTextSplitter(
    headers_to_split_on=[
        ("#", "h1"),
        ("##", "h2"),
        ("###", "h3"),
    ]
)
md_chunks = md_splitter.split_text(markdown_text)

文档处理管道

from langchain_core.documents import Document

class DocumentProcessor:
    def __init__(self):
        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
    
    def process(self, docs: list[Document]) -> list[Document]:
        # 1. 清洗文档
        cleaned_docs = [self._clean_doc(doc) for doc in docs]
        
        # 2. 分割文档
        chunks = self.splitter.split_documents(cleaned_docs)
        
        # 3. 添加元数据
        for i, chunk in enumerate(chunks):
            chunk.metadata["chunk_id"] = i
            chunk.metadata["char_count"] = len(chunk.page_content)
        
        return chunks
    
    def _clean_doc(self, doc: Document) -> Document:
        # 清除多余空白
        content = " ".join(doc.page_content.split())
        # 移除特殊字符
        content = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', content)
        return Document(
            page_content=content,
            metadata=doc.metadata
        )

向量存储与检索

VectorStore

from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma, FAISS

# 创建Embeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

# 使用Chroma
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    persist_directory="./chroma_db"
)

# 使用FAISS
faiss_store = FAISS.from_documents(chunks, embeddings)

# 保存和加载
faiss_store.save_local("faiss_index")
loaded_store = FAISS.load_local(
    "faiss_index", 
    embeddings,
    allow_dangerous_deserialization=True
)

Retriever

# 基础检索器
retriever = vectorstore.as_retriever(
    search_type="similarity",  # 或 "mmr"
    search_kwargs={"k": 5}
)

docs = retriever.invoke("什么是微服务?")

# MMR检索(最大边际相关性)
mmr_retriever = vectorstore.as_retriever(
    search_type="mmr",
    search_kwargs={
        "k": 5,
        "fetch_k": 20,
        "lambda_mult": 0.5
    }
)

# 带分数的检索
docs_with_scores = vectorstore.similarity_search_with_score(
    "查询文本",
    k=5
)

# 多查询检索器
from langchain.retrievers import MultiQueryRetriever

multi_retriever = MultiQueryRetriever.from_llm(
    retriever=retriever,
    llm=ChatOpenAI()
)

高级检索策略

from langchain.retrievers import (
    ContextualCompressionRetriever,
    EnsembleRetriever
)
from langchain.retrievers.document_compressors import (
    LLMChainExtractor
)
from langchain_community.retrievers import BM25Retriever

# 上下文压缩检索器
compressor = LLMChainExtractor.from_llm(ChatOpenAI())
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=retriever
)

# 混合检索(BM25 + 向量)
bm25_retriever = BM25Retriever.from_documents(chunks)
bm25_retriever.k = 5

ensemble_retriever = EnsembleRetriever(
    retrievers=[bm25_retriever, retriever],
    weights=[0.4, 0.6]
)

# 父文档检索器
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore

parent_retriever = ParentDocumentRetriever(
    vectorstore=vectorstore,
    docstore=InMemoryStore(),
    child_splitter=RecursiveCharacterTextSplitter(chunk_size=200),
    parent_splitter=RecursiveCharacterTextSplitter(chunk_size=2000),
)

记忆(Memory)

对话记忆

from langchain.memory import (
    ConversationBufferMemory,
    ConversationBufferWindowMemory,
    ConversationSummaryMemory
)

# 完整缓冲记忆
buffer_memory = ConversationBufferMemory(
    return_messages=True,
    memory_key="history"
)

# 窗口记忆(只保留最近k轮)
window_memory = ConversationBufferWindowMemory(
    k=5,
    return_messages=True
)

# 摘要记忆
summary_memory = ConversationSummaryMemory(
    llm=ChatOpenAI(),
    return_messages=True
)

# 在链中使用记忆
from langchain.chains import ConversationChain

conversation = ConversationChain(
    llm=ChatOpenAI(),
    memory=buffer_memory,
    verbose=True
)

# 对话
response1 = conversation.predict(input="我叫张三")
response2 = conversation.predict(input="我的名字是什么?")

使用LCEL管理记忆

from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory

# 存储会话历史
store = {}

def get_session_history(session_id: str):
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]

# 创建带记忆的链
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个有帮助的助手。"),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{input}")
])

chain = prompt | ChatOpenAI() | StrOutputParser()

chain_with_history = RunnableWithMessageHistory(
    chain,
    get_session_history,
    input_messages_key="input",
    history_messages_key="history"
)

# 使用时指定session_id
config = {"configurable": {"session_id": "user_123"}}

response = chain_with_history.invoke(
    {"input": "你好!"},
    config=config
)

工具与Agent

自定义工具

from langchain_core.tools import tool, StructuredTool
from langchain_core.pydantic_v1 import BaseModel, Field

# 使用装饰器定义工具
@tool
def search_web(query: str) -> str:
    """搜索互联网获取信息。使用这个工具来查找最新的信息。"""
    # 实际实现搜索逻辑
    return f"搜索 '{query}' 的结果..."

@tool
def calculate(expression: str) -> str:
    """计算数学表达式。输入应该是有效的数学表达式。"""
    try:
        result = eval(expression)
        return str(result)
    except Exception as e:
        return f"计算错误: {e}"

# 带结构化输入的工具
class SearchInput(BaseModel):
    query: str = Field(description="搜索查询")
    max_results: int = Field(default=5, description="最大结果数")

@tool("advanced_search", args_schema=SearchInput)
def advanced_search(query: str, max_results: int = 5) -> str:
    """高级搜索功能,支持限制结果数量。"""
    return f"搜索 '{query}',返回 {max_results} 条结果"

# 从函数创建工具
def get_weather(city: str) -> str:
    """获取城市天气"""
    return f"{city}今天天气晴朗,温度25°C"

weather_tool = StructuredTool.from_function(
    func=get_weather,
    name="weather",
    description="获取指定城市的天气信息"
)

创建Agent

from langchain.agents import create_tool_calling_agent, AgentExecutor

# 准备工具
tools = [search_web, calculate, weather_tool]

# 创建Agent Prompt
agent_prompt = ChatPromptTemplate.from_messages([
    ("system", """你是一个有帮助的AI助手。
你可以使用以下工具来帮助回答问题:
{tools}

请根据用户的问题选择合适的工具。如果不需要工具,直接回答即可。"""),
    MessagesPlaceholder(variable_name="chat_history", optional=True),
    ("human", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad")
])

# 创建Agent
agent = create_tool_calling_agent(
    llm=ChatOpenAI(model="gpt-4o"),
    tools=tools,
    prompt=agent_prompt
)

# 创建Agent Executor
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    max_iterations=5,
    handle_parsing_errors=True
)

# 运行Agent
result = agent_executor.invoke({
    "input": "北京今天的天气怎么样?另外帮我算一下 123 * 456"
})
print(result["output"])

ReAct Agent

from langchain.agents import create_react_agent
from langchain import hub

# 使用LangChain Hub的ReAct prompt
react_prompt = hub.pull("hwchase17/react")

# 创建ReAct Agent
react_agent = create_react_agent(
    llm=ChatOpenAI(model="gpt-4o"),
    tools=tools,
    prompt=react_prompt
)

react_executor = AgentExecutor(
    agent=react_agent,
    tools=tools,
    verbose=True,
    max_iterations=10
)

RAG应用完整示例

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_community.document_loaders import DirectoryLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

class RAGApplication:
    def __init__(self, docs_path: str):
        self.embeddings = OpenAIEmbeddings(
            model="text-embedding-3-small"
        )
        self.llm = ChatOpenAI(model="gpt-4o-mini")
        self.vectorstore = None
        self.docs_path = docs_path
    
    def index_documents(self):
        """索引文档"""
        # 加载文档
        loader = DirectoryLoader(
            self.docs_path,
            glob="**/*.md",
            show_progress=True
        )
        documents = loader.load()
        
        # 分割文档
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
        chunks = splitter.split_documents(documents)
        
        # 创建向量存储
        self.vectorstore = Chroma.from_documents(
            documents=chunks,
            embedding=self.embeddings,
            persist_directory="./rag_db"
        )
        
        print(f"已索引 {len(chunks)} 个文档块")
    
    def create_chain(self):
        """创建RAG链"""
        # 检索器
        retriever = self.vectorstore.as_retriever(
            search_kwargs={"k": 5}
        )
        
        # Prompt模板
        prompt = ChatPromptTemplate.from_messages([
            ("system", """你是一个知识库助手。根据提供的上下文信息回答问题。
如果上下文中没有相关信息,请诚实地说不知道。

上下文信息:
{context}"""),
            ("human", "{question}")
        ])
        
        # 格式化检索结果
        def format_docs(docs):
            return "\n\n---\n\n".join(
                f"[来源: {doc.metadata.get('source', '未知')}]\n{doc.page_content}"
                for doc in docs
            )
        
        # 构建链
        chain = (
            {
                "context": retriever | format_docs,
                "question": RunnablePassthrough()
            }
            | prompt
            | self.llm
            | StrOutputParser()
        )
        
        return chain
    
    def query(self, question: str) -> str:
        """查询知识库"""
        if self.vectorstore is None:
            raise ValueError("请先索引文档")
        
        chain = self.create_chain()
        return chain.invoke(question)

# 使用示例
if __name__ == "__main__":
    rag = RAGApplication("./documents")
    rag.index_documents()
    
    questions = [
        "什么是微服务架构?",
        "如何实现服务发现?",
        "分布式事务有哪些解决方案?"
    ]
    
    for q in questions:
        print(f"\n问题: {q}")
        print(f"回答: {rag.query(q)}")

工业级进阶:LangGraph 与 LangSmith

在构建复杂的生产级应用时,简单的线性链(Chains)往往难以处理循环逻辑和复杂的状态管理。

1. LangGraph:构建有状态的多 Agent 系统

LangGraph 是 LangChain 的扩展,允许你构建包含循环(Cycles)的图结构,非常适合实现复杂的 Agent 逻辑。

from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages

# 1. 定义状态
class State(TypedDict):
    messages: Annotated[list, add_messages]

# 2. 定义节点
def chatbot(state: State):
    return {"messages": [llm.invoke(state["messages"])]}

# 3. 构建图
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)
graph_builder.set_entry_point("chatbot")
graph_builder.add_edge("chatbot", END)

# 4. 编译并运行
graph = graph_builder.compile()
for event in graph.stream({"messages": [("user", "你好")]}):
    print(event)

为什么使用 LangGraph?

  • 循环支持:支持 ReAct 等需要反复迭代的模式。
  • 状态持久化:可以轻松保存和恢复对话状态(Checkpoints)。
  • 细粒度控制:可以精确控制 Agent 的每一步决策。

2. LangSmith:调试、测试与监控

LangSmith 是 LangChain 推出的开发者平台,解决了 LLM 应用“黑盒”难以调试的问题。

  • Tracing (追踪):可视化查看 Prompt 是如何构建的、检索到了哪些文档、LLM 的原始响应是什么。
  • Evaluation (评估):自动化运行测试集,对比不同 Prompt 或模型的表现。
  • Monitoring (监控):监控生产环境中的 Token 消耗、延迟和错误率。
# 启用 LangSmith 追踪
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your_api_key"
os.environ["LANGCHAIN_PROJECT"] = "my_project"

总结

LangChain提供了构建LLM应用的完整工具链:

组件 功能 应用场景
Model I/O 模型交互、Prompt、解析 所有LLM应用
Chains 组件编排、工作流 复杂任务处理
Retrieval 文档加载、向量存储 RAG系统
Memory 对话历史管理 聊天机器人
Agents 工具使用、自主决策 智能助手

掌握LangChain,能够快速构建各种LLM应用。

参考资源

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:《 LLM应用开发——LangChain框架 》

本文链接:http://localhost:3015/ai/LangChain%E6%A1%86%E6%9E%B6.html

本文最后一次更新为 天前,文章中的某些内容可能已过时!