把几小时的批处理压缩到几分钟

为什么 LLM 应用要异步

LLM 调用的每一次请求耗时通常在几百毫秒到几秒之间——远慢于普通 Web 请求。同步代码处理批量任务时整个进程在 network wait 上空转,CPU 几乎不工作。一万条文档要打标签,串行跑完按 2 秒一条算需要 5.5 小时;改成 50 个并发,几分钟就完。

异步对 LLM 应用不是性能优化的"锦上添花",而是从能不能交付的根本问题。这一篇把我自己反复用到的几个并发模式整理出来,包含:基础并发、限流、超时与重试、worker pool、流式与并发的兼容。

asyncio 极速回顾

如果你不熟 asyncio,三件事就够用本篇:

import asyncio

# 1. async 定义协程,await 等待结果
async def fetch(url: str) -> str:
    await asyncio.sleep(1)  # 模拟 IO 等待
    return f"data from {url}"

# 2. asyncio.gather 并发执行多个协程
async def main():
    results = await asyncio.gather(
        fetch("a"), fetch("b"), fetch("c")
    )
    print(results)

# 3. asyncio.run 是入口
asyncio.run(main())

三个 fetch 各自 sleep 1 秒,并发执行总耗时约 1 秒(不是 3 秒)。这就是 IO 密集型场景下异步的核心收益。

AsyncOpenAI:异步版本的 SDK

openai SDK 原生提供 AsyncOpenAI,API 完全镜像同步版本,只是所有方法变成协程:

import asyncio
import os
from openai import AsyncOpenAI
from dotenv import load_dotenv

load_dotenv()

client = AsyncOpenAI(
    api_key=os.getenv("DEEPSEEK_API_KEY"),
    base_url="https://api.deepseek.com/v1",
)


async def classify(text: str) -> str:
    resp = await client.chat.completions.create(
        model="deepseek-chat",
        messages=[
            {"role": "system", "content": "判断文本情感,只输出 positive/negative/neutral"},
            {"role": "user", "content": text},
        ],
        temperature=0,
    )
    return resp.choices[0].message.content.strip()


async def main():
    texts = [
        "这家餐厅服务态度真好,菜也很赞",
        "等了半小时还没人理我,差评",
        "今天天气不错",
    ]
    results = await asyncio.gather(*[classify(t) for t in texts])
    for t, r in zip(texts, results):
        print(f"{r}: {t}")


asyncio.run(main())

3 条文本同时送出,几乎和单条耗时相同。把列表换成 1000 条文本,性能差距会放大到几百倍。

限流:别把 API 打挂

并发不是越高越好。每个 API 都有 RPM(requests per minute)和 TPM(tokens per minute)限制,超过会被服务端 429。限流的核心工具是 asyncio.Semaphore——一个信号量,限制同时进行的协程数:

import asyncio

# 全局并发限制:同时最多 20 个请求
SEMAPHORE = asyncio.Semaphore(20)


async def classify_limited(text: str) -> str:
    async with SEMAPHORE:
        return await classify(text)


async def main():
    texts = load_a_lot_of_texts()  # 假设 1 万条
    results = await asyncio.gather(*[classify_limited(t) for t in texts])
    return results

async with SEMAPHORE 在进入时占一个名额,退出时释放。1 万个协程同时被创建,但任何时刻活跃执行的不超过 20 个。

并发数怎么定:先看 API 文档的 RPM 上限,留 70~80% 安全余量。DeepSeek 默认 RPM 是 10000+,并发 20~50 都安全。OpenAI 的 Tier 1 账户 RPM 较低,要保守一些。用 token 算更准,因为长 prompt 也会触发 TPM 限流。

重试与超时

并发场景下偶发网络抖动、服务端瞬时故障是常态,必须有重试。tenacity 配合 asyncio 用法:

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
)
from openai import RateLimitError, APITimeoutError, APIConnectionError


@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=2, max=30),
    retry=retry_if_exception_type((RateLimitError, APITimeoutError, APIConnectionError)),
)
async def robust_classify(text: str) -> str:
    async with SEMAPHORE:
        return await classify(text)

超时 SDK 自带(默认 600 秒,可改)。但更好的做法是在外层用 asyncio.timeout

async def classify_with_timeout(text: str, timeout: float = 30.0) -> str | None:
    try:
        async with asyncio.timeout(timeout):
            return await robust_classify(text)
    except asyncio.TimeoutError:
        return None

asyncio.timeout 是 Python 3.11+ 的语法,更早版本用 asyncio.wait_for

进度可见:tqdm 异步版

跑 1 万条任务,一片黑屏什么都看不到很折磨人。tqdm 有 asyncio 支持:

from tqdm.asyncio import tqdm

async def main():
    texts = load_a_lot_of_texts()
    tasks = [classify_with_timeout(t) for t in texts]
    results = []
    for coro in tqdm.as_completed(tasks, total=len(tasks)):
        result = await coro
        results.append(result)
    return results

实时进度条 + 已完成数 + 预估剩余时间,体验比 gather 静默等待好得多。

增量保存:别让 4 小时白跑

跑长任务最痛的事是跑到 80% 时进程崩了,结果全没。基本原则:每 N 条就把已处理的结果落盘

import json
from pathlib import Path

OUTPUT = Path("./results.jsonl")


async def process_with_save(items, batch_size=100):
    # 加载已完成的 ID,跳过
    done_ids = set()
    if OUTPUT.exists():
        for line in OUTPUT.read_text().splitlines():
            done_ids.add(json.loads(line)["id"])

    todo = [it for it in items if it["id"] not in done_ids]
    print(f"已完成 {len(done_ids)},剩余 {len(todo)}")

    with OUTPUT.open("a", encoding="utf-8") as f:
        sem = asyncio.Semaphore(20)

        async def worker(item):
            async with sem:
                result = await robust_classify(item["text"])
                # 即时写盘
                f.write(json.dumps({
                    "id": item["id"],
                    "result": result,
                }, ensure_ascii=False) + "\n")
                f.flush()

        await asyncio.gather(*[worker(it) for it in todo])

注意 f.flush()——不加的话 Python 缓冲会让数据要积攒到一定量才真正写入磁盘,崩溃时仍会丢失。生产场景对极致可靠性还要加 os.fsync

Worker Pool 模式:可控的生产消费

对于持续到达的任务流(比如从消息队列取任务),用经典的 asyncio.Queue + worker 模式:

import asyncio


async def worker(name: str, queue: asyncio.Queue, sem: asyncio.Semaphore):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        try:
            async with sem:
                result = await robust_classify(item["text"])
                print(f"[{name}] 处理 {item['id']}: {result}")
        except Exception as e:
            print(f"[{name}] 失败 {item['id']}: {e}")
        finally:
            queue.task_done()


async def main():
    queue = asyncio.Queue(maxsize=1000)
    sem = asyncio.Semaphore(20)

    # 启动 5 个 worker
    workers = [asyncio.create_task(worker(f"w{i}", queue, sem)) for i in range(5)]

    # 生产任务
    for item in load_items():
        await queue.put(item)

    # 等任务全部处理完
    await queue.join()

    # 通知 worker 退出
    for _ in workers:
        await queue.put(None)
    await asyncio.gather(*workers)


asyncio.run(main())

这种结构的好处:

  • 背压——Queue(maxsize=1000) 自动限制内存中堆积的任务数
  • 可控的并发数——worker 数 × Semaphore 数双重控制
  • 优雅退出——None 哨兵让 worker 自然结束

流式 + 并发可以共存吗

可以,但要想清楚。流式的目的是降低单个请求的首 token 延迟,常用在用户对话场景;并发的目的是提高总吞吐,用在批处理。两者一般不会同时优化——批处理任务用户看不见进度,没必要流式。

如果某个场景确实需要"并发流式"(比如 N 个用户同时发起聊天),各自用流式,每个用户的流不互相干扰。关键的反模式:不要在批处理里 stream=True 然后丢弃中间 chunk,那只是徒增协议开销和延迟。

真实场景:用并发处理一万条客服记录

把以上整合成一个端到端的批处理脚本:

# batch_process.py
import asyncio
import json
from pathlib import Path
import os
from openai import AsyncOpenAI
from tenacity import (
    retry, stop_after_attempt, wait_exponential, retry_if_exception_type
)
from openai import RateLimitError, APITimeoutError, APIConnectionError
from tqdm.asyncio import tqdm
from dotenv import load_dotenv

load_dotenv()

client = AsyncOpenAI(
    api_key=os.getenv("DEEPSEEK_API_KEY"),
    base_url="https://api.deepseek.com/v1",
)

SEM = asyncio.Semaphore(20)
INPUT = Path("conversations.jsonl")
OUTPUT = Path("tagged.jsonl")


@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(min=2, max=30),
    retry=retry_if_exception_type((RateLimitError, APITimeoutError, APIConnectionError)),
)
async def tag(text: str) -> dict:
    async with SEM:
        async with asyncio.timeout(30):
            resp = await client.chat.completions.create(
                model="deepseek-chat",
                messages=[
                    {"role": "system", "content": "对客服记录打标签,输出 JSON:{intent, urgency(low/mid/high), summary}"},
                    {"role": "user", "content": text},
                ],
                temperature=0,
                response_format={"type": "json_object"},
            )
            return json.loads(resp.choices[0].message.content)


async def main():
    items = [json.loads(l) for l in INPUT.read_text().splitlines()]
    done = set()
    if OUTPUT.exists():
        done = {json.loads(l)["id"] for l in OUTPUT.read_text().splitlines()}

    todo = [it for it in items if it["id"] not in done]
    print(f"总 {len(items)},已完成 {len(done)},剩 {len(todo)}")

    f = OUTPUT.open("a", encoding="utf-8")
    try:
        async def process(item):
            try:
                result = await tag(item["text"])
                f.write(json.dumps({"id": item["id"], **result}, ensure_ascii=False) + "\n")
                f.flush()
            except Exception as e:
                # 失败不抛出,避免 gather 提前终止
                print(f"失败 {item['id']}: {e}")

        await tqdm.gather(*[process(it) for it in todo])
    finally:
        f.close()


asyncio.run(main())

这个脚本能力:

  • 20 并发限流
  • 5 次自动重试,指数退避
  • 30 秒单请求超时
  • 增量保存,重启不丢
  • 进度条
  • 单条失败不影响整体

200 行不到,已经达到工业级批处理脚本的可靠性。

几个常见踩坑

1. 别忘了 await

client.chat.completions.create(...) 返回的是协程,不 await 拿到的是协程对象本身,运行时不会报错但完全没在跑。这是最常见的 asyncio bug。

2. 不要在异步函数里用同步阻塞 IO

time.sleep()、同步的 requests.get()、文件 IO 都是阻塞的,会卡住整个事件循环。文件 IO 用 aiofiles,HTTP 用 httpx.AsyncClient,sleep 用 asyncio.sleep

3. CPU 密集任务用 run_in_executor

asyncio 解决 IO 密集,CPU 密集(比如本地 embedding、tokenize 大量文本)会卡事件循环。要 loop.run_in_executor 丢到线程池或进程池:

import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

async def heavy_cpu_task(data):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(executor, cpu_bound_function, data)

4. asyncio.gather 的"全有或全无"陷阱

gather(*tasks) 默认任何一个任务异常就立即取消其它所有任务并抛出。如果你想"个别失败不影响整体",加 return_exceptions=True

results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
    if isinstance(r, Exception):
        # 处理失败
        ...

相关阅读

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:番外 6:异步与并发,批量调用 LLM 的工程模式

本文链接:https://www.sshipanoo.com/blog/ai/ai-for-python/番外06-异步与并发/

本文最后一次更新为 天前,文章中的某些内容可能已过时!