突破上下文限制的技术方案

前言

尽管现代 LLM 的上下文窗口越来越大(GPT-4 128K, Claude 200K),但在处理超长文档、多轮对话历史时仍面临挑战。本文介绍长上下文处理的核心策略:分块、压缩、摘要、检索等技术。


上下文限制问题

各模型上下文窗口

模型 上下文窗口 输出限制 特点
GPT-4 Turbo 128K 4K 通用能力强
GPT-4o 128K 16K 多模态
Claude 3.5 200K 8K 长文档理解
Gemini 1.5 1M 8K 超长上下文
Llama 3 128K - 开源

长上下文挑战

┌─────────────────────────────────────────────────────────────────┐
│                    长上下文处理挑战                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Token 限制                                                     │
│  ├── 输入超出窗口 → 截断或报错                                   │
│  ├── 输出被截断 → 答案不完整                                     │
│  └── 费用增加 → 成本失控                                        │
│                                                                 │
│  注意力衰减                                                      │
│  ├── "Lost in the Middle" → 中间内容被忽视                      │
│  ├── 首尾偏好 → 位置敏感                                        │
│  └── 长距离依赖 → 关联性下降                                    │
│                                                                 │
│  性能问题                                                        │
│  ├── 延迟增加 → 用户体验差                                      │
│  ├── 内存占用 → 资源消耗大                                      │
│  └── 吞吐下降 → 并发受限                                        │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

文档分块策略

基础分块方法

from langchain.text_splitter import (
    RecursiveCharacterTextSplitter,
    TokenTextSplitter,
    MarkdownTextSplitter
)

class ChunkingStrategy:
    """分块策略"""
    
    @staticmethod
    def fixed_size_chunking(text: str, chunk_size: int = 1000, overlap: int = 200):
        """固定大小分块"""
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=overlap,
            separators=["\n\n", "\n", "", ".", " ", ""]
        )
        return splitter.split_text(text)
    
    @staticmethod
    def token_based_chunking(text: str, chunk_size: int = 500, overlap: int = 50):
        """基于 Token 的分块"""
        splitter = TokenTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=overlap
        )
        return splitter.split_text(text)
    
    @staticmethod
    def semantic_chunking(text: str, embeddings):
        """语义分块 - 基于语义边界"""
        from langchain_experimental.text_splitter import SemanticChunker
        splitter = SemanticChunker(embeddings)
        return splitter.split_text(text)

---

#### 核心挑战:Lost in the Middle (迷失在中间)

研究表明当上下文非常长时模型对**开头****结尾**的信息记忆最深刻而位于**中间**的信息最容易被忽视

#### 应对策略
1.  **重排序 (Re-ranking)**将检索到的最相关的文档块放在 Prompt 的最前面或最后面
2.  **信息密度优化** Prompt 中明确指出:“请特别注意文档中间关于 XXX 的描述”。
3.  **多轮精炼**不要一次性喂给模型而是分批处理

---

#### 长文档处理的三大模式

当文档长度超过模型窗口时通常采用以下三种模式

#### 1. Stuffing (填充模式)
将所有内容直接塞进 Prompt
- **优点**模型拥有完整上下文理解最透彻
- **缺点**受限于窗口大小成本高

#### 2. Map-Reduce (映射-归约模式)
- **Map**将文档分块对每一块分别进行总结或提取
- **Reduce**将所有块的总结再次汇总生成最终答案
- **适用场景**生成超长文档的摘要

#### 3. Refine (迭代精炼模式)
- **过程**先处理第一块得到初始答案然后将答案和第二块一起发给模型要求模型根据新信息精炼答案以此类推
- **优点**答案会越来越精准
- **缺点**无法并行速度慢

```python
# LangChain 中的实现示例
from langchain.chains.summarize import load_summarize_chain

chain = load_summarize_chain(llm, chain_type="map_reduce")
summary = chain.run(docs)

进阶:Contextual Retrieval (上下文检索)

这是 Anthropic 提出的一种提升长文档检索精度的技术。

痛点

在传统 RAG 中,一个分块可能是:“他出生于 1990 年”。如果没有上下文,检索器不知道“他”是谁。

方案

在索引阶段,先让 LLM 为每个分块生成一段简短的上下文描述(如:“这段文字描述了乔布斯的生平”),然后将这段描述拼接到分块前面再进行向量化。

  • 效果:检索召回率显著提升,尤其是在处理长文档中的细碎信息时。

行业新趋势:LongRAG

传统的 RAG 喜欢短分块(300-500 Token),因为这样检索更精准。但 LongRAG 提倡使用长分块(4K - 32K Token)。

  • 理由:现代模型(如 Gemini 1.5 Pro)处理长上下文的能力已经非常强。长分块可以保留更多的语义连贯性,减少信息碎片化。
  • 架构长分块检索 -> 少量长分块注入 -> 长上下文模型生成

总结

处理长上下文不是简单的“堆 Token”,而是一场关于信息密度注意力管理的博弈。

  • 简单任务:直接 Stuffing。
  • 摘要任务:Map-Reduce。
  • 高精度检索:Contextual Retrieval + Re-ranking。
  • 超长文档:LongRAG 架构。

掌握这些策略,你才能在 Token 的海洋中游刃有余,构建出真正具备“深度阅读”能力的 AI 应用。 embeddings=embeddings, breakpoint_threshold_type=”percentile”, breakpoint_threshold_amount=95 ) return splitter.split_text(text)

@staticmethod
def markdown_chunking(text: str):
    """Markdown 结构分块"""
    splitter = MarkdownTextSplitter(
        chunk_size=1000,
        chunk_overlap=100
    )
    return splitter.split_text(text)

使用

chunker = ChunkingStrategy()

固定大小

chunks = chunker.fixed_size_chunking(long_document, 1000, 200)

Token 分块(更精确)

chunks = chunker.token_based_chunking(long_document, 500, 50)


#### 层次化分块

```python
class HierarchicalChunker:
    """层次化分块"""
    
    def __init__(self):
        self.parent_splitter = RecursiveCharacterTextSplitter(
            chunk_size=2000,
            chunk_overlap=200
        )
        self.child_splitter = RecursiveCharacterTextSplitter(
            chunk_size=400,
            chunk_overlap=50
        )
    
    def chunk(self, text: str) -> dict:
        """生成层次化分块"""
        # 父块
        parent_chunks = self.parent_splitter.split_text(text)
        
        hierarchy = []
        for i, parent in enumerate(parent_chunks):
            # 子块
            child_chunks = self.child_splitter.split_text(parent)
            
            hierarchy.append({
                "parent_id": i,
                "parent_content": parent,
                "children": [
                    {"child_id": f"{i}_{j}", "content": child}
                    for j, child in enumerate(child_chunks)
                ]
            })
        
        return hierarchy
    
    def get_context_window(
        self,
        chunk_id: str,
        hierarchy: list,
        include_siblings: bool = True
    ) -> str:
        """获取上下文窗口"""
        parent_id = int(chunk_id.split("_")[0])
        
        if include_siblings:
            # 返回整个父块
            return hierarchy[parent_id]["parent_content"]
        else:
            # 只返回当前子块
            for parent in hierarchy:
                for child in parent["children"]:
                    if child["child_id"] == chunk_id:
                        return child["content"]
        return ""

# 使用
hierarchical = HierarchicalChunker()
hierarchy = hierarchical.chunk(long_document)

基于章节的分块

import re

class StructuredChunker:
    """基于文档结构的分块"""
    
    def __init__(self, max_chunk_size: int = 2000):
        self.max_size = max_chunk_size
    
    def chunk_by_sections(self, text: str) -> list:
        """按章节分块"""
        # 识别标题模式
        section_patterns = [
            r'^#{1,6}\s+.+$',           # Markdown 标题
            r'^\d+\.\s+.+$',             # 数字编号
            r'^第[一二三四五六七八九十]+[章节部分]\s*.+$',  # 中文章节
            r'^[A-Z][A-Z\s]+:?\s*$',     # 大写标题
        ]
        
        combined_pattern = '|'.join(f'({p})' for p in section_patterns)
        
        sections = []
        current_section = {"title": "Introduction", "content": ""}
        
        for line in text.split('\n'):
            if re.match(combined_pattern, line, re.MULTILINE):
                if current_section["content"].strip():
                    sections.append(current_section)
                current_section = {"title": line.strip(), "content": ""}
            else:
                current_section["content"] += line + "\n"
        
        if current_section["content"].strip():
            sections.append(current_section)
        
        # 处理过大的章节
        final_chunks = []
        for section in sections:
            if len(section["content"]) > self.max_size:
                # 进一步分割
                sub_chunks = self._split_large_section(section)
                final_chunks.extend(sub_chunks)
            else:
                final_chunks.append(section)
        
        return final_chunks
    
    def _split_large_section(self, section: dict) -> list:
        """分割过大的章节"""
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.max_size,
            chunk_overlap=100
        )
        
        sub_texts = splitter.split_text(section["content"])
        
        return [
            {
                "title": f"{section['title']} (Part {i+1})",
                "content": sub
            }
            for i, sub in enumerate(sub_texts)
        ]

上下文压缩与缓存优化

1. LLMLingua-2 提示词压缩

传统的 Prompt 压缩依赖于 LLM 自身,而 LLMLingua-2 使用更轻量级的模型(如 XLM-RoBERTa)来识别并移除冗余 Token,压缩率可达 2x-5x 且几乎不损失精度。

from llmlingua import PromptCompressor

# 初始化压缩器
compressor = PromptCompressor(
    model_name="microsoft/llmlingua-2-bert-base-multilingual-cased-v1",
    use_llmlingua2=True
)

def compress_long_context(context: str, instruction: str, question: str):
    """
    使用 LLMLingua-2 压缩长上下文
    """
    results = compressor.compress_prompt(
        context=[context],
        instruction=instruction,
        question=question,
        target_token=2000,
        rank_method="longllmlingua" # 针对长文本优化
    )
    
    print(f"原始 Token: {results['origin_tokens']}")
    print(f"压缩后 Token: {results['compressed_tokens']}")
    print(f"压缩率: {results['ratio']}")
    
    return results['compressed_prompt']

2. 上下文缓存 (Context Caching)

对于频繁访问的长文档(如 API 文档、法律条文),使用 OpenAI 或 Anthropic 的 Context Caching 可以显著降低成本并减少首字延迟 (TTFT)。

# Anthropic Context Caching 示例 (伪代码)
import anthropic

client = anthropic.Anthropic()

response = client.beta.prompt_caching.messages.create(
    model="claude-3-5-sonnet-20240620",
    max_tokens=1024,
    system=[
        {
            "type": "text",
            "text": "你是一个法律专家,请基于以下长文档回答问题...",
            "cache_control": {"type": "ephemeral"} # 标记为可缓存
        },
        {
            "type": "text",
            "text": LONG_LEGAL_DOCUMENT, # 超过 100k tokens 的文档
            "cache_control": {"type": "ephemeral"}
        }
    ],
    messages=[{"role": "user", "content": "这份合同的违约条款是什么?"}]
)

针对 1M+ 上下文的策略:LC-RAG

当模型支持 1M+ 上下文(如 Gemini 1.5 Pro)时,传统的 RAG 仍然有其价值,但策略发生了变化。

维度 传统 RAG (128K) 长上下文 RAG (1M+)
检索粒度 细粒度 (300-500 tokens) 粗粒度 (5000+ tokens)
召回数量 Top 5-10 Top 50-100
排序重要性 极高 (位置敏感) 中等 (模型注意力更强)
成本 低 (按需加载) 高 (全量加载)

LC-RAG 最佳实践:

  1. 粗筛 + 全量注入:先用向量检索筛选出最相关的 50 个片段,然后将这 50 个片段全部塞入 1M 窗口。
  2. 多跳推理:利用长上下文能力,让模型在窗口内进行跨片段的复杂推理。

信息抽取式压缩

class ExtractionCompressor:
    """基于信息抽取的压缩"""
    
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o-mini")
    
    def extract_key_info(self, text: str, query: str) -> dict:
        """抽取关键信息"""
        prompt = f"""从文本中抽取与问题相关的关键信息。

问题:{query}

文本:{text}

请返回 JSON 格式:
key_facts"""
        
        response = self.llm.invoke(prompt)
        
        import json
        try:
            return json.loads(response.content)
        except:
            return {"summary": response.content}
    
    def compress_documents(
        self,
        documents: list,
        query: str
    ) -> str:
        """压缩文档列表"""
        all_facts = []
        all_quotes = []
        all_entities = set()
        
        for doc in documents:
            info = self.extract_key_info(doc.page_content, query)
            all_facts.extend(info.get("key_facts", []))
            all_quotes.extend(info.get("relevant_quotes", []))
            all_entities.update(info.get("entities", []))
        
        # 构建压缩上下文
        compressed = f"""关键事实:
{chr(10).join(['- ' + f for f in all_facts[:10]])}

相关实体:{', '.join(list(all_entities)[:20])}

关键引用:
{chr(10).join(['> ' + q for q in all_quotes[:5]])}"""
        
        return compressed

递进式摘要

Map-Reduce 摘要

class MapReduceSummarizer:
    """Map-Reduce 摘要"""
    
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o-mini")
    
    def map_phase(self, chunks: list) -> list:
        """Map 阶段:并行摘要"""
        summaries = []
        
        for chunk in chunks:
            prompt = f"""请为以下文本生成简洁摘要(100字以内):

{chunk}

摘要:"""
            
            response = self.llm.invoke(prompt)
            summaries.append(response.content)
        
        return summaries
    
    def reduce_phase(self, summaries: list) -> str:
        """Reduce 阶段:合并摘要"""
        combined = "\n".join([f"- {s}" for s in summaries])
        
        prompt = f"""将以下多个摘要合并为一个连贯的总结:

{combined}

合并摘要:"""
        
        response = self.llm.invoke(prompt)
        return response.content
    
    def summarize(self, document: str, chunk_size: int = 2000) -> str:
        """完整摘要流程"""
        # 分块
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=100
        )
        chunks = splitter.split_text(document)
        
        # Map
        summaries = self.map_phase(chunks)
        
        # Reduce(可能需要多轮)
        while len(summaries) > 5:
            # 分组 reduce
            groups = [summaries[i:i+5] for i in range(0, len(summaries), 5)]
            summaries = [self.reduce_phase(g) for g in groups]
        
        # 最终 reduce
        return self.reduce_phase(summaries)

# 使用
summarizer = MapReduceSummarizer()
summary = summarizer.summarize(very_long_document)

Refine 迭代摘要

class RefineSummarizer:
    """迭代精炼摘要"""
    
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o")
    
    def summarize(self, document: str, chunk_size: int = 3000) -> str:
        """迭代摘要"""
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=200
        )
        chunks = splitter.split_text(document)
        
        # 初始摘要
        current_summary = self._initial_summary(chunks[0])
        
        # 迭代精炼
        for chunk in chunks[1:]:
            current_summary = self._refine_summary(current_summary, chunk)
        
        return current_summary
    
    def _initial_summary(self, text: str) -> str:
        """初始摘要"""
        prompt = f"""请为以下文本生成详细摘要:

{text}

摘要:"""
        
        response = self.llm.invoke(prompt)
        return response.content
    
    def _refine_summary(self, existing_summary: str, new_text: str) -> str:
        """精炼摘要"""
        prompt = f"""已有摘要:
{existing_summary}

新增内容:
{new_text}

请更新摘要,整合新内容中的重要信息:"""
        
        response = self.llm.invoke(prompt)
        return response.content

# 使用
refiner = RefineSummarizer()
summary = refiner.summarize(very_long_document)

滑动窗口处理

对话历史管理

class SlidingWindowMemory:
    """滑动窗口记忆"""
    
    def __init__(self, max_tokens: int = 4000):
        self.max_tokens = max_tokens
        self.messages = []
        self.encoder = tiktoken.encoding_for_model("gpt-4")
    
    def add_message(self, role: str, content: str):
        """添加消息"""
        self.messages.append({"role": role, "content": content})
        self._trim_to_limit()
    
    def _count_tokens(self, messages: list) -> int:
        """计算 token 数"""
        total = 0
        for msg in messages:
            total += len(self.encoder.encode(msg["content"])) + 4
        return total
    
    def _trim_to_limit(self):
        """裁剪到限制内"""
        while self._count_tokens(self.messages) > self.max_tokens:
            if len(self.messages) > 2:
                # 保留系统消息和最新消息
                self.messages.pop(1)
            else:
                break
    
    def get_messages(self) -> list:
        """获取消息列表"""
        return self.messages.copy()

class SmartWindowMemory:
    """智能窗口记忆 - 带摘要"""
    
    def __init__(self, max_tokens: int = 4000, summary_threshold: int = 10):
        self.max_tokens = max_tokens
        self.summary_threshold = summary_threshold
        self.messages = []
        self.summary = ""
        self.llm = ChatOpenAI(model="gpt-4o-mini")
    
    def add_message(self, role: str, content: str):
        """添加消息"""
        self.messages.append({"role": role, "content": content})
        
        if len(self.messages) > self.summary_threshold:
            self._create_summary()
    
    def _create_summary(self):
        """创建历史摘要"""
        # 取前半部分消息生成摘要
        to_summarize = self.messages[:len(self.messages)//2]
        
        history = "\n".join([
            f"{m['role']}: {m['content']}"
            for m in to_summarize
        ])
        
        prompt = f"""总结以下对话的要点:

{history}

摘要:"""
        
        response = self.llm.invoke(prompt)
        
        if self.summary:
            self.summary = f"{self.summary}\n{response.content}"
        else:
            self.summary = response.content
        
        # 移除已摘要的消息
        self.messages = self.messages[len(self.messages)//2:]
    
    def get_context(self) -> list:
        """获取上下文"""
        context = []
        
        if self.summary:
            context.append({
                "role": "system",
                "content": f"之前对话摘要:{self.summary}"
            })
        
        context.extend(self.messages)
        
        return context

# 使用
memory = SmartWindowMemory(max_tokens=4000)
memory.add_message("user", "你好")
memory.add_message("assistant", "你好!有什么可以帮助你的?")
# ... 更多消息

context = memory.get_context()

分段处理长文档

class LongDocumentProcessor:
    """长文档处理器"""
    
    def __init__(self, max_context: int = 8000):
        self.max_context = max_context
        self.llm = ChatOpenAI(model="gpt-4o")
    
    def process_document(
        self,
        document: str,
        question: str
    ) -> str:
        """处理长文档回答问题"""
        # 1. 分块
        chunks = self._chunk_document(document)
        
        # 2. 相关性过滤
        relevant_chunks = self._filter_relevant(chunks, question)
        
        # 3. 如果还是太长,进行压缩
        context = self._prepare_context(relevant_chunks, question)
        
        # 4. 生成答案
        return self._generate_answer(context, question)
    
    def _chunk_document(self, document: str) -> list:
        """分块"""
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=1500,
            chunk_overlap=200
        )
        return splitter.split_text(document)
    
    def _filter_relevant(
        self,
        chunks: list,
        question: str,
        top_k: int = 10
    ) -> list:
        """过滤相关块"""
        embeddings = OpenAIEmbeddings()
        
        question_emb = embeddings.embed_query(question)
        chunk_embs = embeddings.embed_documents(chunks)
        
        # 计算相似度
        similarities = []
        for i, emb in enumerate(chunk_embs):
            sim = np.dot(question_emb, emb) / (
                np.linalg.norm(question_emb) * np.linalg.norm(emb)
            )
            similarities.append((i, sim))
        
        # 排序并返回 top_k
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        return [chunks[i] for i, _ in similarities[:top_k]]
    
    def _prepare_context(self, chunks: list, question: str) -> str:
        """准备上下文"""
        context = "\n\n".join(chunks)
        
        encoder = tiktoken.encoding_for_model("gpt-4")
        tokens = len(encoder.encode(context))
        
        if tokens > self.max_context:
            # 压缩
            compressor = ContextCompressor()
            context = compressor.compress_prompt(
                context,
                question,
                target_ratio=self.max_context / tokens
            )
        
        return context
    
    def _generate_answer(self, context: str, question: str) -> str:
        """生成答案"""
        prompt = f"""基于以下文档内容回答问题。

文档内容:
{context}

问题:{question}

答案:"""
        
        response = self.llm.invoke(prompt)
        return response.content

# 使用
processor = LongDocumentProcessor(max_context=8000)
answer = processor.process_document(very_long_doc, "文档的主要结论是什么?")

处理 “Lost in the Middle” 问题

class MiddleAwareRetriever:
    """解决 "Lost in the Middle" 问题"""
    
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o")
    
    def reorder_documents(self, documents: list) -> list:
        """重排文档顺序 - 重要的放两端"""
        if len(documents) <= 2:
            return documents
        
        # 假设已按相关性排序
        # 将最相关的交替放在首尾
        reordered = []
        left = []
        right = []
        
        for i, doc in enumerate(documents):
            if i % 2 == 0:
                left.append(doc)
            else:
                right.append(doc)
        
        # 首部放最相关的
        reordered = left + right[::-1]
        
        return reordered
    
    def query_with_sections(
        self,
        question: str,
        documents: list
    ) -> str:
        """分段查询"""
        # 将文档分组
        group_size = 3
        groups = [
            documents[i:i+group_size]
            for i in range(0, len(documents), group_size)
        ]
        
        partial_answers = []
        
        for i, group in enumerate(groups):
            context = "\n\n".join([d.page_content for d in group])
            
            prompt = f"""基于以下内容(第 {i+1} 部分)回答问题。
如果这部分没有相关信息,回答 "无相关信息"。

内容:
{context}

问题:{question}

回答:"""
            
            response = self.llm.invoke(prompt)
            if "无相关信息" not in response.content:
                partial_answers.append(response.content)
        
        # 合并答案
        if not partial_answers:
            return "未找到相关信息"
        
        combined = "\n".join(partial_answers)
        
        final_prompt = f"""综合以下多个回答,生成最终答案:

{combined}

问题:{question}

最终答案:"""
        
        response = self.llm.invoke(final_prompt)
        return response.content

最佳实践

场景 推荐策略
文档问答 检索 + 相关性过滤
长对话 滑动窗口 + 摘要
超长文档 Map-Reduce 摘要
代码分析 结构化分块
多文档 选择性压缩

Token 预算分配

def allocate_token_budget(
    total_budget: int,
    query_tokens: int,
    num_documents: int
) -> dict:
    """Token 预算分配"""
    # 预留输出空间
    output_reserve = min(2000, total_budget // 4)
    
    # 可用于上下文
    context_budget = total_budget - query_tokens - output_reserve
    
    # 每个文档的预算
    per_doc = context_budget // num_documents
    
    return {
        "total": total_budget,
        "context": context_budget,
        "per_document": per_doc,
        "output_reserve": output_reserve
    }

参考资源

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:《 LLM应用开发——长上下文处理策略 》

本文链接:http://localhost:3015/ai/%E9%95%BF%E4%B8%8A%E4%B8%8B%E6%96%87%E5%A4%84%E7%90%86.html

本文最后一次更新为 天前,文章中的某些内容可能已过时!