把几小时的批处理压缩到几分钟
为什么 LLM 应用要异步
LLM 调用的每一次请求耗时通常在几百毫秒到几秒之间——远慢于普通 Web 请求。同步代码处理批量任务时整个进程在 network wait 上空转,CPU 几乎不工作。一万条文档要打标签,串行跑完按 2 秒一条算需要 5.5 小时;改成 50 个并发,几分钟就完。
异步对 LLM 应用不是性能优化的"锦上添花",而是从能不能交付的根本问题。这一篇把我自己反复用到的几个并发模式整理出来,包含:基础并发、限流、超时与重试、worker pool、流式与并发的兼容。
asyncio 极速回顾
如果你不熟 asyncio,三件事就够用本篇:
import asyncio
# 1. async 定义协程,await 等待结果
async def fetch(url: str) -> str:
await asyncio.sleep(1) # 模拟 IO 等待
return f"data from {url}"
# 2. asyncio.gather 并发执行多个协程
async def main():
results = await asyncio.gather(
fetch("a"), fetch("b"), fetch("c")
)
print(results)
# 3. asyncio.run 是入口
asyncio.run(main())
三个 fetch 各自 sleep 1 秒,并发执行总耗时约 1 秒(不是 3 秒)。这就是 IO 密集型场景下异步的核心收益。
AsyncOpenAI:异步版本的 SDK
openai SDK 原生提供 AsyncOpenAI,API 完全镜像同步版本,只是所有方法变成协程:
import asyncio
import os
from openai import AsyncOpenAI
from dotenv import load_dotenv
load_dotenv()
client = AsyncOpenAI(
api_key=os.getenv("DEEPSEEK_API_KEY"),
base_url="https://api.deepseek.com/v1",
)
async def classify(text: str) -> str:
resp = await client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": "判断文本情感,只输出 positive/negative/neutral"},
{"role": "user", "content": text},
],
temperature=0,
)
return resp.choices[0].message.content.strip()
async def main():
texts = [
"这家餐厅服务态度真好,菜也很赞",
"等了半小时还没人理我,差评",
"今天天气不错",
]
results = await asyncio.gather(*[classify(t) for t in texts])
for t, r in zip(texts, results):
print(f"{r}: {t}")
asyncio.run(main())
3 条文本同时送出,几乎和单条耗时相同。把列表换成 1000 条文本,性能差距会放大到几百倍。
限流:别把 API 打挂
并发不是越高越好。每个 API 都有 RPM(requests per minute)和 TPM(tokens per minute)限制,超过会被服务端 429。限流的核心工具是 asyncio.Semaphore——一个信号量,限制同时进行的协程数:
import asyncio
# 全局并发限制:同时最多 20 个请求
SEMAPHORE = asyncio.Semaphore(20)
async def classify_limited(text: str) -> str:
async with SEMAPHORE:
return await classify(text)
async def main():
texts = load_a_lot_of_texts() # 假设 1 万条
results = await asyncio.gather(*[classify_limited(t) for t in texts])
return results
async with SEMAPHORE 在进入时占一个名额,退出时释放。1 万个协程同时被创建,但任何时刻活跃执行的不超过 20 个。
并发数怎么定:先看 API 文档的 RPM 上限,留 70~80% 安全余量。DeepSeek 默认 RPM 是 10000+,并发 20~50 都安全。OpenAI 的 Tier 1 账户 RPM 较低,要保守一些。用 token 算更准,因为长 prompt 也会触发 TPM 限流。
重试与超时
并发场景下偶发网络抖动、服务端瞬时故障是常态,必须有重试。tenacity 配合 asyncio 用法:
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
from openai import RateLimitError, APITimeoutError, APIConnectionError
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=2, max=30),
retry=retry_if_exception_type((RateLimitError, APITimeoutError, APIConnectionError)),
)
async def robust_classify(text: str) -> str:
async with SEMAPHORE:
return await classify(text)
超时 SDK 自带(默认 600 秒,可改)。但更好的做法是在外层用 asyncio.timeout:
async def classify_with_timeout(text: str, timeout: float = 30.0) -> str | None:
try:
async with asyncio.timeout(timeout):
return await robust_classify(text)
except asyncio.TimeoutError:
return None
asyncio.timeout 是 Python 3.11+ 的语法,更早版本用 asyncio.wait_for。
进度可见:tqdm 异步版
跑 1 万条任务,一片黑屏什么都看不到很折磨人。tqdm 有 asyncio 支持:
from tqdm.asyncio import tqdm
async def main():
texts = load_a_lot_of_texts()
tasks = [classify_with_timeout(t) for t in texts]
results = []
for coro in tqdm.as_completed(tasks, total=len(tasks)):
result = await coro
results.append(result)
return results
实时进度条 + 已完成数 + 预估剩余时间,体验比 gather 静默等待好得多。
增量保存:别让 4 小时白跑
跑长任务最痛的事是跑到 80% 时进程崩了,结果全没。基本原则:每 N 条就把已处理的结果落盘。
import json
from pathlib import Path
OUTPUT = Path("./results.jsonl")
async def process_with_save(items, batch_size=100):
# 加载已完成的 ID,跳过
done_ids = set()
if OUTPUT.exists():
for line in OUTPUT.read_text().splitlines():
done_ids.add(json.loads(line)["id"])
todo = [it for it in items if it["id"] not in done_ids]
print(f"已完成 {len(done_ids)},剩余 {len(todo)}")
with OUTPUT.open("a", encoding="utf-8") as f:
sem = asyncio.Semaphore(20)
async def worker(item):
async with sem:
result = await robust_classify(item["text"])
# 即时写盘
f.write(json.dumps({
"id": item["id"],
"result": result,
}, ensure_ascii=False) + "\n")
f.flush()
await asyncio.gather(*[worker(it) for it in todo])
注意 f.flush()——不加的话 Python 缓冲会让数据要积攒到一定量才真正写入磁盘,崩溃时仍会丢失。生产场景对极致可靠性还要加 os.fsync。
Worker Pool 模式:可控的生产消费
对于持续到达的任务流(比如从消息队列取任务),用经典的 asyncio.Queue + worker 模式:
import asyncio
async def worker(name: str, queue: asyncio.Queue, sem: asyncio.Semaphore):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
try:
async with sem:
result = await robust_classify(item["text"])
print(f"[{name}] 处理 {item['id']}: {result}")
except Exception as e:
print(f"[{name}] 失败 {item['id']}: {e}")
finally:
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=1000)
sem = asyncio.Semaphore(20)
# 启动 5 个 worker
workers = [asyncio.create_task(worker(f"w{i}", queue, sem)) for i in range(5)]
# 生产任务
for item in load_items():
await queue.put(item)
# 等任务全部处理完
await queue.join()
# 通知 worker 退出
for _ in workers:
await queue.put(None)
await asyncio.gather(*workers)
asyncio.run(main())
这种结构的好处:
- 背压——
Queue(maxsize=1000)自动限制内存中堆积的任务数 - 可控的并发数——worker 数 × Semaphore 数双重控制
- 优雅退出——
None哨兵让 worker 自然结束
流式 + 并发可以共存吗
可以,但要想清楚。流式的目的是降低单个请求的首 token 延迟,常用在用户对话场景;并发的目的是提高总吞吐,用在批处理。两者一般不会同时优化——批处理任务用户看不见进度,没必要流式。
如果某个场景确实需要"并发流式"(比如 N 个用户同时发起聊天),各自用流式,每个用户的流不互相干扰。关键的反模式:不要在批处理里 stream=True 然后丢弃中间 chunk,那只是徒增协议开销和延迟。
真实场景:用并发处理一万条客服记录
把以上整合成一个端到端的批处理脚本:
# batch_process.py
import asyncio
import json
from pathlib import Path
import os
from openai import AsyncOpenAI
from tenacity import (
retry, stop_after_attempt, wait_exponential, retry_if_exception_type
)
from openai import RateLimitError, APITimeoutError, APIConnectionError
from tqdm.asyncio import tqdm
from dotenv import load_dotenv
load_dotenv()
client = AsyncOpenAI(
api_key=os.getenv("DEEPSEEK_API_KEY"),
base_url="https://api.deepseek.com/v1",
)
SEM = asyncio.Semaphore(20)
INPUT = Path("conversations.jsonl")
OUTPUT = Path("tagged.jsonl")
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(min=2, max=30),
retry=retry_if_exception_type((RateLimitError, APITimeoutError, APIConnectionError)),
)
async def tag(text: str) -> dict:
async with SEM:
async with asyncio.timeout(30):
resp = await client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": "对客服记录打标签,输出 JSON:{intent, urgency(low/mid/high), summary}"},
{"role": "user", "content": text},
],
temperature=0,
response_format={"type": "json_object"},
)
return json.loads(resp.choices[0].message.content)
async def main():
items = [json.loads(l) for l in INPUT.read_text().splitlines()]
done = set()
if OUTPUT.exists():
done = {json.loads(l)["id"] for l in OUTPUT.read_text().splitlines()}
todo = [it for it in items if it["id"] not in done]
print(f"总 {len(items)},已完成 {len(done)},剩 {len(todo)}")
f = OUTPUT.open("a", encoding="utf-8")
try:
async def process(item):
try:
result = await tag(item["text"])
f.write(json.dumps({"id": item["id"], **result}, ensure_ascii=False) + "\n")
f.flush()
except Exception as e:
# 失败不抛出,避免 gather 提前终止
print(f"失败 {item['id']}: {e}")
await tqdm.gather(*[process(it) for it in todo])
finally:
f.close()
asyncio.run(main())
这个脚本能力:
- 20 并发限流
- 5 次自动重试,指数退避
- 30 秒单请求超时
- 增量保存,重启不丢
- 进度条
- 单条失败不影响整体
200 行不到,已经达到工业级批处理脚本的可靠性。
几个常见踩坑
1. 别忘了 await
client.chat.completions.create(...) 返回的是协程,不 await 拿到的是协程对象本身,运行时不会报错但完全没在跑。这是最常见的 asyncio bug。
2. 不要在异步函数里用同步阻塞 IO
time.sleep()、同步的 requests.get()、文件 IO 都是阻塞的,会卡住整个事件循环。文件 IO 用 aiofiles,HTTP 用 httpx.AsyncClient,sleep 用 asyncio.sleep。
3. CPU 密集任务用 run_in_executor
asyncio 解决 IO 密集,CPU 密集(比如本地 embedding、tokenize 大量文本)会卡事件循环。要 loop.run_in_executor 丢到线程池或进程池:
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
async def heavy_cpu_task(data):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(executor, cpu_bound_function, data)
4. asyncio.gather 的"全有或全无"陷阱
gather(*tasks) 默认任何一个任务异常就立即取消其它所有任务并抛出。如果你想"个别失败不影响整体",加 return_exceptions=True:
results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
if isinstance(r, Exception):
# 处理失败
...相关阅读
- Python asyncio 官方文档
- tenacity 重试库
- tqdm asyncio 集成
- httpx 文档 — 异步原生的 HTTP 客户端
- aiofiles — 异步文件 IO
- Real Python: asyncio 教程
版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。
(采用 CC BY-NC-SA 4.0 许可协议进行授权)
本文标题:番外 6:异步与并发,批量调用 LLM 的工程模式
本文链接:https://www.sshipanoo.com/blog/ai/ai-for-python/番外06-异步与并发/
本文最后一次更新为 天前,文章中的某些内容可能已过时!