降低Token消耗,控制LLM应用运营成本
前言
LLM API 成本是制约大规模应用的重要因素。了解 Token 计费模型、优化 Prompt 设计、合理选择模型,可以在保证质量的前提下大幅降低成本。本文将系统介绍 LLM 应用的成本优化策略。
Token 计费模型
主流模型定价
| 模型 | 输入价格 | 输出价格 | 上下文窗口 |
|---|---|---|---|
| GPT-4 Turbo | $10/1M | $30/1M | 128K |
| GPT-4o | $5/1M | $15/1M | 128K |
| GPT-4o-mini | $0.15/1M | $0.60/1M | 128K |
| Claude 3.5 Sonnet | $3/1M | $15/1M | 200K |
| Claude 3 Haiku | $0.25/1M | $1.25/1M | 200K |
Token 计算
import tiktoken
class TokenCounter:
"""Token 计数器"""
def __init__(self, model: str = "gpt-4"):
self.model = model
self.encoder = tiktoken.encoding_for_model(model)
def count_tokens(self, text: str) -> int:
"""计算文本的 token 数"""
return len(self.encoder.encode(text))
def count_messages(self, messages: list) -> int:
"""计算消息列表的 token 数"""
# 每条消息有额外开销
tokens = 0
for message in messages:
tokens += 4 # 消息格式开销
for key, value in message.items():
tokens += self.count_tokens(str(value))
tokens += 2 # 对话结束标记
return tokens
def estimate_cost(
self,
input_tokens: int,
output_tokens: int,
model: str = None
) -> float:
"""估算成本"""
model = model or self.model
pricing = {
"gpt-4-turbo": {"input": 10, "output": 30},
"gpt-4o": {"input": 5, "output": 15},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-3.5-turbo": {"input": 0.50, "output": 1.50}
}
if model not in pricing:
return 0
price = pricing[model]
cost = (input_tokens * price["input"] +
output_tokens * price["output"]) / 1_000_000
return cost
# 使用
counter = TokenCounter("gpt-4o")
print(counter.count_tokens("Hello, world!")) # 4
print(counter.estimate_cost(1000, 500)) # 约 $0.0125
进阶:非实时任务的“半价”方案 (Batch API)
如果你的任务不需要立即返回结果(如批量翻译、离线数据提取、情感分析),使用 Batch API 是最有效的省钱手段。
核心优势
- 50% 折扣:所有 Token 费用直接减半。
- 更高限额:Batch 任务通常有独立的、更高的速率限制(Rate Limits)。
- 24 小时交付:通常在几分钟到几小时内完成,最长不超过 24 小时。
实现流程
-
准备文件:将所有请求写入一个
.jsonl文件。 - 上传文件:调用 OpenAI 的 Files API 上传。
- 创建任务:调用 Batches API 启动任务。
- 下载结果:任务完成后下载输出文件。
# 创建 Batch 任务示例
from openai import OpenAI
client = OpenAI()
# 1. 上传请求文件
batch_file = client.files.create(
file=open("requests.jsonl", "rb"),
purpose="batch"
)
# 2. 创建 Batch
client.batches.create(
input_file_id=batch_file.id,
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={"description": "nightly_data_extraction"}
)
提示词压缩 (Prompt Compression)
长 Prompt 是成本的杀手。LLMLingua 等技术可以帮助我们在不损失关键信息的前提下,压缩 Prompt 长度。
原理
通过一个小模型计算 Token 的信息熵,移除那些对语义贡献较小的 Token(如冗余的修饰词、连接词)。
效果
- 压缩率:可达 2x - 20x。
- 成本:直接降低 50% - 90%。
上下文管理:精简与剪枝
在长对话中,历史记录会不断累积。
- 滑动窗口 (Sliding Window):只保留最近的 N 轮对话。
- 摘要压缩 (Summarization):将较早的对话总结成一段简短的摘要,替换掉原始对话。
- 语义剪枝:使用 Embedding 检索与当前问题最相关的历史片段,而不是发送全部历史。
def prune_context(messages, max_tokens=2000):
"""简单的上下文剪枝逻辑"""
current_tokens = count_tokens(messages)
while current_tokens > max_tokens and len(messages) > 1:
# 移除最早的一轮对话(保留 System Prompt)
messages.pop(1)
current_tokens = count_tokens(messages)
return messages
总结:成本优化清单
-
模型降级:能用
gpt-4o-mini的绝不用gpt-4o。 - 使用缓存:应用层语义缓存 + 模型层 Prompt Caching。
- 批量处理:非实时任务强制走 Batch API。
- 精简 Prompt:移除冗余描述,使用压缩技术。
- 监控报警:设置每日预算上限,防止代码 Bug 导致 Token 狂飙。
成本优化不是一次性的工作,而是一个持续监控和迭代的过程。通过上述手段,通常可以将 LLM 应用的运营成本降低 60% - 80%。
模型选择策略
任务分级模型选择
┌─────────────────────────────────────────────────────────────────┐
│ 模型选择金字塔 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ │
│ │ GPT-4 │ 复杂推理 │
│ │ Claude │ 专业写作 │
│ └────┬────┘ │
│ │ │
│ ┌─────────┴─────────┐ │
│ │ GPT-4o-mini │ 一般对话 │
│ │ Claude Haiku │ 内容生成 │
│ └─────────┬─────────┘ │
│ │ │
│ ┌──────────────┴──────────────┐ │
│ │ GPT-3.5 / 本地模型 │ 简单任务 │
│ │ 意图识别 / 分类 │ 批量处理 │
│ └─────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
智能模型路由
from enum import Enum
from typing import Callable
class TaskComplexity(Enum):
SIMPLE = "simple" # 简单任务
MEDIUM = "medium" # 中等任务
COMPLEX = "complex" # 复杂任务
class CostAwareRouter:
"""成本感知的模型路由"""
def __init__(self):
self.client = OpenAI()
self.model_config = {
TaskComplexity.SIMPLE: {
"model": "gpt-4o-mini",
"max_tokens": 256
},
TaskComplexity.MEDIUM: {
"model": "gpt-4o-mini",
"max_tokens": 1024
},
TaskComplexity.COMPLEX: {
"model": "gpt-4o",
"max_tokens": 4096
}
}
def classify_task(self, query: str) -> TaskComplexity:
"""分类任务复杂度"""
# 简单规则判断
simple_keywords = ["是什么", "定义", "翻译", "总结"]
complex_keywords = ["分析", "比较", "推理", "代码", "优化"]
for keyword in complex_keywords:
if keyword in query:
return TaskComplexity.COMPLEX
for keyword in simple_keywords:
if keyword in query:
return TaskComplexity.SIMPLE
return TaskComplexity.MEDIUM
def route(self, query: str) -> str:
"""路由请求到合适的模型"""
complexity = self.classify_task(query)
config = self.model_config[complexity]
response = self.client.chat.completions.create(
model=config["model"],
messages=[{"role": "user", "content": query}],
max_tokens=config["max_tokens"]
)
return response.choices[0].message.content
# 使用
router = CostAwareRouter()
print(router.route("什么是机器学习?")) # 使用便宜模型
print(router.route("分析这段代码的性能问题并优化")) # 使用高级模型
模型降级策略
class ModelFallback:
"""模型降级策略"""
def __init__(self):
self.client = OpenAI()
self.model_chain = [
("gpt-4o", 4096),
("gpt-4o-mini", 2048),
("gpt-3.5-turbo", 1024)
]
def call_with_fallback(
self,
messages: list,
required_quality: float = 0.8
) -> dict:
"""带降级的模型调用"""
for model, max_tokens in self.model_chain:
try:
response = self.client.chat.completions.create(
model=model,
messages=messages,
max_tokens=max_tokens
)
result = response.choices[0].message.content
# 简单质量检查
if self._check_quality(result, required_quality):
return {
"model": model,
"response": result,
"usage": response.usage
}
except Exception as e:
print(f"{model} 失败: {e}")
continue
raise Exception("所有模型都失败了")
def _check_quality(self, response: str, threshold: float) -> bool:
"""检查响应质量"""
# 简单检查:长度、完整性等
if len(response) < 10:
return False
if response.endswith("...") and len(response) < 100:
return False
return True
Prompt 压缩
系统提示优化
class PromptOptimizer:
"""Prompt 优化器"""
def __init__(self):
self.counter = TokenCounter()
def compress_system_prompt(self, prompt: str) -> str:
"""压缩系统提示"""
# 移除多余空白
lines = [line.strip() for line in prompt.split('\n')]
lines = [line for line in lines if line]
# 合并短行
compressed = '\n'.join(lines)
return compressed
def shorten_examples(
self,
examples: list,
max_examples: int = 3
) -> list:
"""减少示例数量"""
if len(examples) <= max_examples:
return examples
# 选择最具代表性的示例
# 这里简单地选择第一个、中间和最后一个
indices = [0, len(examples) // 2, len(examples) - 1]
return [examples[i] for i in indices]
def optimize_prompt(self, prompt: str) -> dict:
"""优化 Prompt 并报告节省"""
original_tokens = self.counter.count_tokens(prompt)
optimized = self.compress_system_prompt(prompt)
optimized_tokens = self.counter.count_tokens(optimized)
return {
"original": prompt,
"optimized": optimized,
"original_tokens": original_tokens,
"optimized_tokens": optimized_tokens,
"saved_tokens": original_tokens - optimized_tokens,
"saved_percent": (1 - optimized_tokens / original_tokens) * 100
}
# 示例
optimizer = PromptOptimizer()
verbose_prompt = """
You are a helpful assistant that helps users with their questions.
You should always be polite and professional in your responses.
Please make sure to provide accurate and helpful information.
If you don't know the answer, please say so honestly.
Do not make up information or provide false answers.
"""
result = optimizer.optimize_prompt(verbose_prompt)
print(f"原始: {result['original_tokens']} tokens")
print(f"优化后: {result['optimized_tokens']} tokens")
print(f"节省: {result['saved_percent']:.1f}%")
使用压缩技术
class LLMLingua:
"""模拟 LLMLingua 压缩"""
def __init__(self, compression_rate: float = 0.5):
self.rate = compression_rate
self.client = OpenAI()
def compress(self, text: str) -> str:
"""压缩文本"""
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": f"""压缩以下文本到原长度的{int(self.rate * 100)}%,
保留关键信息,删除冗余内容。直接输出压缩后的文本。"""
},
{"role": "user", "content": text}
],
max_tokens=int(len(text) * self.rate)
)
return response.choices[0].message.content
def selective_compress(
self,
context: str,
query: str
) -> str:
"""选择性压缩:保留与查询相关的内容"""
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": """从上下文中提取与用户问题最相关的信息,
删除不相关内容。保持信息完整性。"""
},
{
"role": "user",
"content": f"问题:{query}\n\n上下文:{context}"
}
]
)
return response.choices[0].message.content
批处理优化
批量请求
import asyncio
from typing import List
import aiohttp
class BatchProcessor:
"""批量处理器"""
def __init__(self, batch_size: int = 10):
self.batch_size = batch_size
self.client = OpenAI()
async def process_batch_async(
self,
prompts: List[str],
model: str = "gpt-4o-mini"
) -> List[str]:
"""异步批量处理"""
async def process_one(prompt: str) -> str:
response = await asyncio.to_thread(
self.client.chat.completions.create,
model=model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
# 分批处理
results = []
for i in range(0, len(prompts), self.batch_size):
batch = prompts[i:i + self.batch_size]
batch_results = await asyncio.gather(*[
process_one(p) for p in batch
])
results.extend(batch_results)
return results
def combine_requests(
self,
items: List[str],
task_template: str
) -> str:
"""合并多个请求为一个"""
combined_prompt = f"""{task_template}
请对以下每个项目执行任务,用 JSON 数组格式返回结果:
项目列表:
{chr(10).join([f'{i+1}. {item}' for i, item in enumerate(items)])}
返回格式:["结果1", "结果2", ...]"""
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": combined_prompt}],
response_format={"type": "json_object"}
)
return response.choices[0].message.content
# 使用
processor = BatchProcessor()
# 合并请求:一次调用处理多个项目
items = ["苹果", "香蕉", "橙子"]
result = processor.combine_requests(items, "翻译成英文")
print(result) # {"results": ["apple", "banana", "orange"]}
OpenAI Batch API
import json
class OpenAIBatch:
"""使用 OpenAI Batch API"""
def __init__(self):
self.client = OpenAI()
def create_batch_file(
self,
requests: List[dict],
output_file: str
) -> str:
"""创建批处理文件"""
with open(output_file, 'w') as f:
for i, req in enumerate(requests):
batch_request = {
"custom_id": f"request-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": req.get("model", "gpt-4o-mini"),
"messages": req["messages"],
"max_tokens": req.get("max_tokens", 1000)
}
}
f.write(json.dumps(batch_request) + '\n')
return output_file
def submit_batch(self, file_path: str) -> str:
"""提交批处理任务"""
# 上传文件
with open(file_path, 'rb') as f:
batch_file = self.client.files.create(
file=f,
purpose="batch"
)
# 创建批处理任务
batch = self.client.batches.create(
input_file_id=batch_file.id,
endpoint="/v1/chat/completions",
completion_window="24h"
)
return batch.id
def check_batch_status(self, batch_id: str) -> dict:
"""检查批处理状态"""
batch = self.client.batches.retrieve(batch_id)
return {
"status": batch.status,
"completed": batch.request_counts.completed,
"failed": batch.request_counts.failed,
"total": batch.request_counts.total
}
def get_batch_results(self, batch_id: str) -> List[dict]:
"""获取批处理结果"""
batch = self.client.batches.retrieve(batch_id)
if batch.status != "completed":
raise ValueError(f"批处理未完成:{batch.status}")
# 下载结果
result_file = self.client.files.content(batch.output_file_id)
results = []
for line in result_file.text.split('\n'):
if line.strip():
results.append(json.loads(line))
return results
# 批量处理可获得 50% 折扣!
成本监控
使用量追踪
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List
@dataclass
class UsageRecord:
"""使用记录"""
timestamp: datetime
model: str
input_tokens: int
output_tokens: int
cost: float
request_id: str = ""
class CostTracker:
"""成本追踪器"""
def __init__(self):
self.records: List[UsageRecord] = []
self.daily_budget: float = 10.0 # 日预算
self.alert_threshold: float = 0.8 # 告警阈值
self.pricing = {
"gpt-4-turbo": {"input": 10, "output": 30},
"gpt-4o": {"input": 5, "output": 15},
"gpt-4o-mini": {"input": 0.15, "output": 0.60}
}
def record_usage(
self,
model: str,
input_tokens: int,
output_tokens: int,
request_id: str = ""
):
"""记录使用量"""
cost = self._calculate_cost(model, input_tokens, output_tokens)
record = UsageRecord(
timestamp=datetime.now(),
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=cost,
request_id=request_id
)
self.records.append(record)
# 检查预算
self._check_budget()
def _calculate_cost(
self,
model: str,
input_tokens: int,
output_tokens: int
) -> float:
"""计算成本"""
if model not in self.pricing:
return 0
price = self.pricing[model]
return (input_tokens * price["input"] +
output_tokens * price["output"]) / 1_000_000
def _check_budget(self):
"""检查预算"""
today_cost = self.get_today_cost()
if today_cost >= self.daily_budget:
raise Exception(f"日预算已用尽!当前: ${today_cost:.4f}")
if today_cost >= self.daily_budget * self.alert_threshold:
print(f"⚠️ 预算告警:已使用 {today_cost/self.daily_budget*100:.1f}%")
def get_today_cost(self) -> float:
"""获取今日成本"""
today = datetime.now().date()
return sum(
r.cost for r in self.records
if r.timestamp.date() == today
)
def get_stats(self, days: int = 7) -> dict:
"""获取统计信息"""
from datetime import timedelta
cutoff = datetime.now() - timedelta(days=days)
recent = [r for r in self.records if r.timestamp > cutoff]
if not recent:
return {"total_cost": 0}
# 按模型统计
model_stats: Dict[str, dict] = {}
for record in recent:
if record.model not in model_stats:
model_stats[record.model] = {
"requests": 0,
"input_tokens": 0,
"output_tokens": 0,
"cost": 0
}
stats = model_stats[record.model]
stats["requests"] += 1
stats["input_tokens"] += record.input_tokens
stats["output_tokens"] += record.output_tokens
stats["cost"] += record.cost
return {
"period_days": days,
"total_requests": len(recent),
"total_cost": sum(r.cost for r in recent),
"by_model": model_stats
}
def export_report(self) -> str:
"""导出报告"""
stats = self.get_stats()
report = f"""
=== LLM 成本报告 ===
统计周期:{stats['period_days']} 天
总请求数:{stats['total_requests']}
总成本:${stats['total_cost']:.4f}
按模型统计:
"""
for model, data in stats.get('by_model', {}).items():
report += f"""
{model}:
- 请求数:{data['requests']}
- 输入 Token:{data['input_tokens']:,}
- 输出 Token:{data['output_tokens']:,}
- 成本:${data['cost']:.4f}
"""
return report
# 使用
tracker = CostTracker()
tracker.daily_budget = 5.0
# 每次调用后记录
def tracked_chat(messages: list, model: str = "gpt-4o-mini") -> str:
client = OpenAI()
response = client.chat.completions.create(
model=model,
messages=messages
)
# 记录使用量
tracker.record_usage(
model=model,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
request_id=response.id
)
return response.choices[0].message.content
完整优化方案
class CostOptimizedLLM:
"""成本优化的 LLM 客户端"""
def __init__(self):
self.client = OpenAI()
self.tracker = CostTracker()
self.token_counter = TokenCounter()
self.cache = {} # 简单缓存
def chat(
self,
messages: list,
model: str = None,
use_cache: bool = True,
max_tokens: int = None
) -> dict:
"""优化的聊天接口"""
# 1. 检查缓存
if use_cache:
cache_key = self._cache_key(messages)
if cache_key in self.cache:
return {
"response": self.cache[cache_key],
"cached": True,
"cost": 0
}
# 2. 估算 token 并选择模型
input_tokens = self.token_counter.count_messages(messages)
if model is None:
model = self._select_model(messages, input_tokens)
# 3. 优化 max_tokens
if max_tokens is None:
max_tokens = self._estimate_output_tokens(messages)
# 4. 调用 API
response = self.client.chat.completions.create(
model=model,
messages=messages,
max_tokens=max_tokens
)
result = response.choices[0].message.content
# 5. 记录使用量
self.tracker.record_usage(
model=model,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens
)
# 6. 更新缓存
if use_cache:
self.cache[cache_key] = result
# 7. 计算成本
cost = self.tracker._calculate_cost(
model,
response.usage.prompt_tokens,
response.usage.completion_tokens
)
return {
"response": result,
"cached": False,
"model": model,
"cost": cost,
"tokens": {
"input": response.usage.prompt_tokens,
"output": response.usage.completion_tokens
}
}
def _cache_key(self, messages: list) -> str:
"""生成缓存键"""
import hashlib
content = json.dumps(messages, ensure_ascii=False, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
def _select_model(self, messages: list, input_tokens: int) -> str:
"""智能选择模型"""
# 简单规则
query = messages[-1]["content"] if messages else ""
# 复杂任务用高级模型
complex_indicators = ["分析", "推理", "代码", "优化", "比较"]
if any(ind in query for ind in complex_indicators):
return "gpt-4o"
# 长上下文用便宜模型
if input_tokens > 4000:
return "gpt-4o-mini"
# 默认
return "gpt-4o-mini"
def _estimate_output_tokens(self, messages: list) -> int:
"""估算输出 token"""
query = messages[-1]["content"] if messages else ""
# 根据问题类型估算
if "简短" in query or "一句话" in query:
return 100
if "详细" in query or "解释" in query:
return 1000
return 500
# 使用
llm = CostOptimizedLLM()
result = llm.chat([
{"role": "user", "content": "什么是机器学习?"}
])
print(f"响应: {result['response'][:100]}...")
print(f"使用模型: {result['model']}")
print(f"成本: ${result['cost']:.6f}")
print(f"缓存命中: {result['cached']}")
最佳实践
成本优化检查清单
| 优化项 | 节省潜力 | 实施难度 |
|---|---|---|
| 使用缓存 | 50-90% | 低 |
| 模型选择 | 30-70% | 低 |
| Prompt 压缩 | 10-30% | 中 |
| 批处理 | 50% | 中 |
| 输出限制 | 10-50% | 低 |
成本预算规划
def estimate_monthly_cost(
daily_requests: int,
avg_input_tokens: int,
avg_output_tokens: int,
model: str
) -> dict:
"""估算月度成本"""
pricing = {
"gpt-4o": {"input": 5, "output": 15},
"gpt-4o-mini": {"input": 0.15, "output": 0.60}
}
price = pricing.get(model, pricing["gpt-4o-mini"])
daily_cost = (
daily_requests * avg_input_tokens * price["input"] +
daily_requests * avg_output_tokens * price["output"]
) / 1_000_000
monthly_cost = daily_cost * 30
return {
"daily_cost": daily_cost,
"monthly_cost": monthly_cost,
"annual_cost": monthly_cost * 12,
"with_50_percent_cache": monthly_cost * 0.5
}
# 示例
estimate = estimate_monthly_cost(
daily_requests=1000,
avg_input_tokens=500,
avg_output_tokens=300,
model="gpt-4o-mini"
)
print(f"月度预估成本: ${estimate['monthly_cost']:.2f}")
print(f"启用缓存后: ${estimate['with_50_percent_cache']:.2f}")
总结
LLM 成本优化的核心策略:
- 模型选择:根据任务复杂度选择合适模型
- 缓存利用:精确+语义缓存双管齐下
- Prompt 优化:压缩系统提示,减少冗余
- 批处理:使用 Batch API 获得 50% 折扣
- 监控告警:实时追踪成本,设置预算
通过这些策略,通常可以降低 50-80% 的 LLM 使用成本。
参考资源
版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。
(采用 CC BY-NC-SA 4.0 许可协议进行授权)
本文标题:《 LLM应用开发——成本优化指南 》
本文链接:http://localhost:3015/ai/%E6%88%90%E6%9C%AC%E4%BC%98%E5%8C%96.html
本文最后一次更新为 天前,文章中的某些内容可能已过时!