能调通不等于能上线,真正麻烦的是异常分类、成本结构和兼容层
你可能遇到过这种情况:第一次接大模型 API,复制官方示例,填上 api_key,终端里吐出一段话,心想"就这?"。然后功能一上线,问题像约好了一样同时来:高峰期请求挂在那里几十秒不返回,供应商开始回 429,月底财务问这条 API 怎么烧了这么多钱,产品又要求同一个功能能在 OpenAI、DeepSeek 和自建网关之间随时切。
说白了,难的从来不是那一行 chat.completions.create,而是把它包进一个能恢复异常、能控成本、能换供应商的调用层。下面这几件事,basically 就是 demo 代码和生产代码的全部差距。
哪些错误该重试,哪些重试一万次也没用
新手写重试,常常是 try 三次完事。但这里有个前提被跳过了:不是所有错误都值得重试。把它们一视同仁,要么白白浪费时间,要么把本该立刻失败的请求拖成一场慢性事故。
按 HTTP 状态码分一下类。值得重试的是"瞬时性"错误:429(限速,对面只是让你慢一点)、500/502/503/504(上游网关或服务瞬时抖动)、408(请求超时),以及网络层的连接超时、读超时、ConnectionError。这些重试一下大概率就好了。
不该重试的是"确定性"错误,因为它们重试一万次结果都一样:401/403(密钥错了或没权限)、400(参数错误,比如模型名拼错、消息结构非法)、402(余额不足)、404(模型不存在)。还有一类容易忽略——内容审核拒绝,对面明确告诉你这段内容不让过,你重试只是在刷日志。
所以重试逻辑的第一步不是"睡多久",而是"这个异常到底该不该重试":
import httpx
from openai import APIStatusError, APIConnectionError, APITimeoutError
RETRYABLE_STATUS = {408, 409, 429, 500, 502, 503, 504}
def is_retryable(exc: Exception) -> bool:
# 网络层错误:连不上、读超时,几乎都值得重试
if isinstance(exc, (APIConnectionError, APITimeoutError, httpx.TransportError)):
return True
# HTTP 错误:只重试瞬时类状态码
if isinstance(exc, APIStatusError):
return exc.status_code in RETRYABLE_STATUS
# 其它未知异常,保守起见不重试,让它尽快暴露
return False退避为什么一定要加随机抖动
错误分类完,才轮到"睡多久"。几乎所有教程都会写指数退避:delay = base * 2 ** attempt,第一次睡 1 秒,然后 2 秒、4 秒、8 秒。指数部分没问题,问题在它太"整齐"了。
设想高峰期同时有 200 个客户端撞上限速,它们几乎在同一刻收到 429,于是几乎在同一刻开始睡 1 秒,再几乎在同一刻醒来一起重试——你只是把同一波洪峰平移了 1 秒,对面该崩还是崩。这就是重试雪崩(thundering herd),整齐的退避反而把请求"对齐"成了周期性冲击。
解药是给退避加随机抖动。业界常用的有两种。Full jitter:在 [0, base * 2 ** attempt] 区间里随机取一个值,打散得最彻底,AWS 的架构博客推荐的就是它。Equal jitter:保留一半固定退避、一半随机,delay = half + random(0, half),好处是既打散又保证不会"立刻"重试。两者都行,full jitter 在高并发下表现更稳。
还有一个比你算的退避更准的东西,很多人没用:Retry-After 响应头。供应商在返回 429 时常会带上它(值是秒数,也可能是 HTTP 日期),意思是"等这么久我这边就缓过来了"。它是对面给的确定信息,优先级应该高于你本地猜的退避。
把这些拼起来,再加两个上限——单次退避有个 cap(别退到几百秒),整体有个总耗时预算(别让一个请求悄悄卡你五分钟)——重试才算完整:
import asyncio
import random
import time
import email.utils
def parse_retry_after(exc) -> float | None:
headers = getattr(getattr(exc, "response", None), "headers", {}) or {}
raw = headers.get("retry-after")
if not raw:
return None
if raw.isdigit(): # 形如 "12"
return float(raw)
dt = email.utils.parsedate_to_datetime(raw) # 形如 HTTP 日期
return max(0.0, dt.timestamp() - time.time()) if dt else None
async def with_retry(call, *, max_retries=4, base=1.0, cap=30.0, budget=60.0):
started = time.monotonic()
for attempt in range(max_retries):
try:
return await call()
except Exception as exc:
last = attempt == max_retries - 1
if last or not is_retryable(exc) or time.monotonic() - started > budget:
raise
# 优先听对面的 Retry-After,否则用 full jitter 退避
hinted = parse_retry_after(exc)
backoff = hinted if hinted is not None else random.uniform(0, base * 2 ** attempt)
await asyncio.sleep(min(backoff, cap))Token 怎么算,钱到底花在哪
第二个容易翻车的地方是成本。很多团队的 Token 账单是糊的,因为对 Token 本身就有几个误解。
首先,Token 不是字数。英文大约 4 个字符算一个 token,中文一个汉字大致 1 到 2 个 token,而且——关键在这——每个模型的分词器不一样。OpenAI 用 tiktoken(GPT-4 系是 o200k_base),Claude、Qwen、DeepSeek 各有各的 tokenizer。同一段中文 prompt,在不同模型上算出来的 token 数能差出一两成,所以"按 OpenAI 估的成本"套到国产模型上往往不准。
其次,输入和输出不同价,而且输出通常贵 2 到 4 倍。这意味着省钱的重点常被搞反了:与其费劲压缩输入 prompt,不如先管住输出——让模型别长篇大论地复述、别把思考过程全写出来。一句"直接给结论,不要解释"省下的钱,往往比你精简 system prompt 多。
第三,善用 prompt caching。OpenAI、DeepSeek、Anthropic 现在都支持缓存请求的固定前缀。你的 system prompt、few-shot 示例这些每次都一样的部分,第一次之后会命中缓存,输入价大幅打折(DeepSeek 命中缓存的输入价能便宜近一个数量级)。但缓存命中有个前提:前缀必须逐字节一致。所以别手贱把当前时间戳、随机 ID 这种动态内容拼进 system prompt 的开头,那等于每次都把缓存打穿。
最后,把 usage 落到日志里,按请求维度真正算账,而不是月底看总额猜:
PRICING = { # 美元 / 1K tokens,按你的供应商实际价目填
"gpt-4.1-mini": {"in": 0.0004, "out": 0.0016, "cached_in": 0.0001},
"deepseek-chat": {"in": 0.00027, "out": 0.0011, "cached_in": 0.00007},
}
def log_cost(model: str, usage) -> float:
p = PRICING[model]
cached = getattr(getattr(usage, "prompt_tokens_details", None), "cached_tokens", 0) or 0
fresh_in = usage.prompt_tokens - cached
cost = (fresh_in * p["in"] + cached * p["cached_in"] + usage.completion_tokens * p["out"]) / 1000
print(f"model={model} in={usage.prompt_tokens}(cached={cached}) "
f"out={usage.completion_tokens} cost=${cost:.5f}")
return cost并发数不等于 QPS
接着是并发。本地加一道并发闸门是必要的——服务端的限流(RPM、TPM)是被动的红线,你撞上去就吃 429;本地节流是主动不去撞它。但这里有个普遍的误解:用 asyncio.Semaphore(5) 控住并发,不等于把速率控在了 5 QPS。
信号量控的是"同时在飞的请求数",不是"每秒发出的请求数"。如果每个请求 100ms 就返回,5 个并发实际跑出来是 50 QPS;如果每个请求要 2 秒,5 个并发只有 2.5 QPS。两者根本不是一回事。真要卡住 QPS,得用令牌桶或漏桶按时间发放额度。
还有两个坑。一是 TPM(每分钟 token 数)限制——它限的是 token 不是请求数,几个超大 prompt 的请求就能把 TPM 配额吃光,哪怕你的 RPM 还很闲。二是连接池:AsyncOpenAI 底层是 httpx,默认连接数有上限,并发拉高时如果不调 httpx 的 limits,请求会卡在等连接,而不是等模型。
适配层:把供应商的差异关进笼子
最后说多模型适配。很多人理解的"适配"就是存一张 base_url 和模型名的表——但那只是配置,不是适配。真正的差异藏在更深的地方:
消息格式上,多数供应商兼容 OpenAI 的 chat completions 结构,但 Anthropic 原生 API 的 system 是顶层参数而不是一条 message,Gemini 的结构则完全不同。字段上,OpenAI 新接口把 max_tokens 改成了 max_completion_tokens。function calling 的 schema 各家不一致。流式返回的 SSE chunk 结构不一样。错误码也不规范——有的供应商干脆把限速信息塞进 200 的 body 里,你的 is_retryable 根本拦不到。还有 reasoning 模型(o 系列、deepseek-reasoner)会多吐一段 reasoning_content,而且往往不支持 temperature。
务实的做法是分两层。大多数国产模型都提供 OpenAI 兼容 endpoint,这部分用一个统一的 OpenAI 客户端兜掉;差异实在大的(Anthropic 原生、Gemini)单独写各自的 adapter。对上层只暴露一个 chat(),业务代码永远不需要知道现在用的是哪家:
from abc import ABC, abstractmethod
class ChatAdapter(ABC):
@abstractmethod
async def chat(self, messages: list[dict], **kw) -> str: ...
class OpenAICompatAdapter(ChatAdapter):
"""覆盖所有提供 OpenAI 兼容接口的供应商:OpenAI 本体、DeepSeek、本地 vLLM 等"""
def __init__(self, base_url: str, api_key: str, model: str):
from openai import AsyncOpenAI
self.client = AsyncOpenAI(base_url=base_url, api_key=api_key, timeout=30.0)
self.model = model
async def chat(self, messages: list[dict], **kw) -> str:
async def call():
resp = await self.client.chat.completions.create(
model=self.model, messages=messages, **kw)
if resp.usage:
log_cost(self.model, resp.usage)
return resp.choices[0].message.content or ""
return await with_retry(call)
class AnthropicAdapter(ChatAdapter):
"""Anthropic 原生 API:system 要拆成顶层参数,结构与 OpenAI 不同"""
def __init__(self, api_key: str, model: str):
from anthropic import AsyncAnthropic
self.client = AsyncAnthropic(api_key=api_key)
self.model = model
async def chat(self, messages: list[dict], **kw) -> str:
system = next((m["content"] for m in messages if m["role"] == "system"), "")
convo = [m for m in messages if m["role"] != "system"]
async def call():
resp = await self.client.messages.create(
model=self.model, system=system, messages=convo,
max_tokens=kw.get("max_tokens", 1024))
return resp.content[0].text
return await with_retry(call)
业务代码现在只跟 ChatAdapter 打交道。哪天某家接口改字段、或者稳定性变差需要切走,你改的是适配层一个文件,而不是满项目去搜 create(。这套适配层不是为了炫技,它的全部价值就是:永远给你留一条可以切换的退路。
把异常分类、抖动退避、成本核算、并发限流、适配层这五件事做扎实,你的 API 调用层才算真的能上线。但 API 层兜住之后,下一个常见误区马上就来了——很多人开始把 Prompt 当成玄学,靠"感觉"反复试。下一篇我们就把 Prompt 拆成模板、约束、评估三件可以工程化的事,看看它为什么更像在写配置,而不是在抽签。
版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。
(采用 CC BY-NC-SA 4.0 许可协议进行授权)
本文标题:API 调用没你想的那么简单
本文链接:https://www.sshipanoo.com/blog/ai/llm-advanced/01-API调用没你想的那么简单/
本文最后一次更新为 天前,文章中的某些内容可能已过时!