实现实时响应的用户体验

前言

LLM 生成文本通常需要数秒时间,传统的请求-响应模式会让用户等待很长时间才能看到结果。流式输出(Streaming)技术允许我们逐步返回生成的内容,大幅提升用户体验。


流式输出概述

为什么需要流式输出

传统模式 流式模式
等待全部生成完成 即时看到输出
首字延迟高(可能 5-10秒) 首字延迟低(约 200ms)
用户体验差 类似人类打字效果
无法提前中断 可随时停止生成

技术对比

┌─────────────────────────────────────────────────────────────────┐
│                    流式传输技术对比                              │
├─────────────────┬─────────────────┬─────────────────────────────┤
│  技术           │  特点            │  适用场景                    │
├─────────────────┼─────────────────┼─────────────────────────────┤
│  SSE            │  单向、简单      │  服务器向客户端推送          │
│  WebSocket      │  双向、实时      │  需要双向通信                │
│  HTTP/2 Streaming│ 多路复用       │  高并发场景                  │
│  gRPC Streaming │  高效、类型安全  │  微服务架构                  │
└─────────────────┴─────────────────┴─────────────────────────────┘

OpenAI 流式 API

基础使用

from openai import OpenAI

client = OpenAI()

# 流式调用
stream = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "写一首关于春天的诗"}],
    stream=True  # 启用流式输出
)

# 逐块处理
for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="", flush=True)

流式响应结构

# 每个 chunk 的结构
{
    "id": "chatcmpl-xxx",
    "object": "chat.completion.chunk",
    "created": 1234567890,
    "model": "gpt-4",
    "choices": [{
        "index": 0,
        "delta": {
            "content": ""  # 增量内容
        },
        "finish_reason": None  # 最后一个chunk为"stop"
    }]
}

完整文本收集

def stream_with_collection(messages: list) -> tuple[str, str]:
    """流式输出并收集完整响应"""
    client = OpenAI()
    
    full_response = []
    
    stream = client.chat.completions.create(
        model="gpt-4",
        messages=messages,
        stream=True
    )
    
    for chunk in stream:
        delta = chunk.choices[0].delta
        
        if delta.content:
            content = delta.content
            full_response.append(content)
            yield content  # 流式输出
    
    # 返回完整响应
    return "".join(full_response)

# 使用生成器
messages = [{"role": "user", "content": "解释量子计算"}]
full_text = ""

for text in stream_with_collection(messages):
    print(text, end="", flush=True)
    full_text += text

print(f"\n\n完整响应长度: {len(full_text)}")

SSE (Server-Sent Events)

基本原理

SSE 是一种服务器向客户端单向推送数据的技术:

┌────────────────┐         SSE Connection         ┌────────────────┐
│                │  ─────────────────────────────> │                │
│    Server      │  data: chunk1                  │    Client      │
│                │  data: chunk2                  │                │
│                │  data: chunk3                  │                │
│                │  data: [DONE]                  │                │
└────────────────┘                                └────────────────┘

FastAPI + SSE 实现

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json

app = FastAPI()
client = OpenAI()

async def generate_sse_response(prompt: str):
    """生成 SSE 格式的响应"""
    stream = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            # SSE 格式: data: {json}\n\n
            yield f"data: {json.dumps({'content': content})}\n\n"
    
    # 发送结束标记
    yield f"data: {json.dumps({'done': True})}\n\n"

@app.get("/chat/stream")
async def chat_stream(prompt: str):
    return StreamingResponse(
        generate_sse_response(prompt),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"  # 禁用 Nginx 缓冲
        }
    )

前端 JavaScript 消费 SSE

// 使用 EventSource API
const eventSource = new EventSource('/chat/stream?prompt=你好');

eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    
    if (data.done) {
        eventSource.close();
        console.log('Stream completed');
        return;
    }
    
    // 追加内容到页面
    document.getElementById('output').textContent += data.content;
};

eventSource.onerror = (error) => {
    console.error('SSE Error:', error);
    eventSource.close();
};

使用 fetch API 处理 SSE

async function streamChat(prompt) {
    const response = await fetch(`/chat/stream?prompt=${encodeURIComponent(prompt)}`);
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    
    let output = document.getElementById('output');
    output.textContent = '';
    
    while (true) {
        const { done, value } = await reader.read();
        
        if (done) break;
        
        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');
        
        for (const line of lines) {
            if (line.startsWith('data: ')) {
                const data = JSON.parse(line.slice(6));
                if (data.content) {
                    output.textContent += data.content;
                }
            }
        }
    }
}

WebSocket 实现

FastAPI WebSocket 服务

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from openai import OpenAI
import json

app = FastAPI()
client = OpenAI()

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()
    
    try:
        while True:
            # 接收用户消息
            data = await websocket.receive_text()
            message = json.loads(data)
            
            # 流式调用 OpenAI
            stream = client.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": message["content"]}],
                stream=True
            )
            
            # 逐块发送
            for chunk in stream:
                if chunk.choices[0].delta.content:
                    await websocket.send_json({
                        "type": "chunk",
                        "content": chunk.choices[0].delta.content
                    })
            
            # 发送完成标记
            await websocket.send_json({"type": "done"})
            
    except WebSocketDisconnect:
        print("Client disconnected")

前端 WebSocket 客户端

class ChatWebSocket {
    constructor(url) {
        this.url = url;
        this.ws = null;
        this.onMessage = null;
        this.onComplete = null;
    }
    
    connect() {
        return new Promise((resolve, reject) => {
            this.ws = new WebSocket(this.url);
            
            this.ws.onopen = () => resolve();
            this.ws.onerror = (error) => reject(error);
            
            this.ws.onmessage = (event) => {
                const data = JSON.parse(event.data);
                
                if (data.type === 'chunk' && this.onMessage) {
                    this.onMessage(data.content);
                } else if (data.type === 'done' && this.onComplete) {
                    this.onComplete();
                }
            };
        });
    }
    
    send(content) {
        if (this.ws && this.ws.readyState === WebSocket.OPEN) {
            this.ws.send(JSON.stringify({ content }));
        }
    }
    
    close() {
        if (this.ws) {
            this.ws.close();
        }
    }
}

// 使用示例
const chat = new ChatWebSocket('ws://localhost:8000/ws/chat');

await chat.connect();

chat.onMessage = (content) => {
    document.getElementById('output').textContent += content;
};

chat.onComplete = () => {
    console.log('Response complete');
};

chat.send('你好,请介绍一下自己');

异步流式处理

AsyncOpenAI 客户端

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def async_stream_chat(prompt: str):
    """异步流式聊天"""
    stream = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    async for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

# 使用
async def main():
    async for content in async_stream_chat("解释机器学习"):
        print(content, end="", flush=True)

asyncio.run(main())

多流并发处理

import asyncio
from openai import AsyncOpenAI
from typing import AsyncGenerator

client = AsyncOpenAI()

async def stream_single(prompt: str, stream_id: int) -> AsyncGenerator[tuple, None]:
    """单个流式请求"""
    stream = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    async for chunk in stream:
        if chunk.choices[0].delta.content:
            yield (stream_id, chunk.choices[0].delta.content)

async def merge_streams(*prompts: str):
    """合并多个流"""
    tasks = [
        stream_single(prompt, i)
        for i, prompt in enumerate(prompts)
    ]
    
    # 使用 asyncio.as_completed 处理多个流
    async def consume_stream(gen, stream_id):
        results = []
        async for sid, content in gen:
            results.append(content)
            yield (stream_id, content)
        return (stream_id, "".join(results))
    
    # 并发处理
    for coro in asyncio.as_completed([
        consume_stream(gen, i) for i, gen in enumerate(tasks)
    ]):
        result = await coro
        print(f"Stream {result[0]} completed")

错误处理与重连

健壮的流式处理

from openai import OpenAI, APIError, RateLimitError
import time
from typing import Generator

def robust_stream(
    prompt: str,
    max_retries: int = 3,
    retry_delay: float = 1.0
) -> Generator[str, None, None]:
    """带重试的健壮流式处理"""
    client = OpenAI()
    retries = 0
    
    while retries < max_retries:
        try:
            stream = client.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": prompt}],
                stream=True,
                timeout=30  # 设置超时
            )
            
            for chunk in stream:
                if chunk.choices[0].delta.content:
                    yield chunk.choices[0].delta.content
            
            return  # 成功完成
            
        except RateLimitError:
            retries += 1
            wait_time = retry_delay * (2 ** retries)  # 指数退避
            print(f"Rate limited, waiting {wait_time}s...")
            time.sleep(wait_time)
            
        except APIError as e:
            retries += 1
            print(f"API Error: {e}, retry {retries}/{max_retries}")
            time.sleep(retry_delay)
            
        except Exception as e:
            print(f"Unexpected error: {e}")
            raise
    
    raise Exception(f"Failed after {max_retries} retries")

前端重连机制

class RobustStreamClient {
    constructor(endpoint) {
        this.endpoint = endpoint;
        this.maxRetries = 3;
        this.retryDelay = 1000;
    }
    
    async *stream(prompt, options = {}) {
        let retries = 0;
        let lastContent = '';
        
        while (retries < this.maxRetries) {
            try {
                const response = await fetch(this.endpoint, {
                    method: 'POST',
                    headers: { 'Content-Type': 'application/json' },
                    body: JSON.stringify({ prompt, resume_from: lastContent }),
                    signal: options.signal
                });
                
                if (!response.ok) {
                    throw new Error(`HTTP ${response.status}`);
                }
                
                const reader = response.body.getReader();
                const decoder = new TextDecoder();
                
                while (true) {
                    const { done, value } = await reader.read();
                    if (done) break;
                    
                    const text = decoder.decode(value);
                    lastContent += text;
                    yield text;
                }
                
                return; // 成功完成
                
            } catch (error) {
                if (error.name === 'AbortError') {
                    throw error; // 用户主动取消
                }
                
                retries++;
                console.warn(`Stream error, retry ${retries}/${this.maxRetries}:`, error);
                
                if (retries < this.maxRetries) {
                    await this.sleep(this.retryDelay * Math.pow(2, retries));
                }
            }
        }
        
        throw new Error('Max retries exceeded');
    }
    
    sleep(ms) {
        return new Promise(resolve => setTimeout(resolve, ms));
    }
}

流式输出优化

Token 级别流控

import time
from typing import Generator

def throttled_stream(
    prompt: str,
    tokens_per_second: float = 50
) -> Generator[str, None, None]:
    """限速流式输出,模拟打字效果"""
    client = OpenAI()
    
    stream = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    interval = 1.0 / tokens_per_second
    last_time = time.time()
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            
            # 计算需要等待的时间
            elapsed = time.time() - last_time
            if elapsed < interval:
                time.sleep(interval - elapsed)
            
            yield content
            last_time = time.time()

缓冲区优化

from collections import deque
import asyncio

class BufferedStream:
    """带缓冲的流式处理器"""
    
    def __init__(self, buffer_size: int = 10):
        self.buffer = deque(maxlen=buffer_size)
        self.is_complete = False
        
    async def producer(self, prompt: str):
        """生产者:从 API 获取数据"""
        client = AsyncOpenAI()
        
        stream = await client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            stream=True
        )
        
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                self.buffer.append(chunk.choices[0].delta.content)
                await asyncio.sleep(0)  # 让出控制权
        
        self.is_complete = True
    
    async def consumer(self):
        """消费者:处理缓冲区数据"""
        while not self.is_complete or self.buffer:
            if self.buffer:
                content = self.buffer.popleft()
                yield content
            else:
                await asyncio.sleep(0.01)  # 等待新数据
    
    async def stream(self, prompt: str):
        """启动生产者并返回消费者"""
        # 启动生产者任务
        producer_task = asyncio.create_task(self.producer(prompt))
        
        # 返回消费者生成器
        async for content in self.consumer():
            yield content
        
        # 确保生产者完成
        await producer_task

完整实战示例

带记忆的流式聊天服务

from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional, Dict
from openai import OpenAI
import json
import uuid

app = FastAPI()
client = OpenAI()

# 会话存储
sessions: Dict[str, list] = {}

class ChatRequest(BaseModel):
    message: str
    session_id: Optional[str] = None

def get_or_create_session(session_id: Optional[str]) -> tuple[str, list]:
    """获取或创建会话"""
    if session_id and session_id in sessions:
        return session_id, sessions[session_id]
    
    new_id = str(uuid.uuid4())
    sessions[new_id] = []
    return new_id, sessions[new_id]

async def stream_response(messages: list, session_id: str):
    """流式响应生成器"""
    full_response = []
    
    try:
        stream = client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "你是一个有帮助的助手"},
                *messages
            ],
            stream=True
        )
        
        # 首先发送 session_id
        yield f"data: {json.dumps({'session_id': session_id})}\n\n"
        
        for chunk in stream:
            if chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                full_response.append(content)
                yield f"data: {json.dumps({'content': content})}\n\n"
        
        # 保存完整响应到会话
        sessions[session_id].append({
            "role": "assistant",
            "content": "".join(full_response)
        })
        
        yield f"data: {json.dumps({'done': True})}\n\n"
        
    except Exception as e:
        yield f"data: {json.dumps({'error': str(e)})}\n\n"

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    # 获取或创建会话
    session_id, messages = get_or_create_session(request.session_id)
    
    # 添加用户消息
    messages.append({
        "role": "user",
        "content": request.message
    })
    
    return StreamingResponse(
        stream_response(messages, session_id),
        media_type="text/event-stream"
    )

@app.get("/session/{session_id}")
async def get_session(session_id: str):
    if session_id not in sessions:
        raise HTTPException(status_code=404, detail="Session not found")
    return {"messages": sessions[session_id]}

@app.delete("/session/{session_id}")
async def delete_session(session_id: str):
    if session_id in sessions:
        del sessions[session_id]
    return {"status": "deleted"}

配套前端组件

<!DOCTYPE html>
<html>
<head>
    <title>流式聊天</title>
    <style>
        .chat-container {
            max-width: 800px;
            margin: 0 auto;
            padding: 20px;
        }
        .message {
            padding: 10px 15px;
            margin: 10px 0;
            border-radius: 10px;
        }
        .user-message {
            background: #007bff;
            color: white;
            margin-left: 20%;
        }
        .assistant-message {
            background: #f1f1f1;
            margin-right: 20%;
        }
        .typing-indicator {
            opacity: 0.7;
        }
        #input-area {
            display: flex;
            gap: 10px;
            margin-top: 20px;
        }
        #message-input {
            flex: 1;
            padding: 10px;
            border: 1px solid #ddd;
            border-radius: 5px;
        }
        button {
            padding: 10px 20px;
            background: #007bff;
            color: white;
            border: none;
            border-radius: 5px;
            cursor: pointer;
        }
        button:disabled {
            background: #ccc;
        }
    </style>
</head>
<body>
    <div class="chat-container">
        <div id="chat-messages"></div>
        <div id="input-area">
            <input type="text" id="message-input" placeholder="输入消息...">
            <button id="send-btn" onclick="sendMessage()">发送</button>
            <button id="stop-btn" onclick="stopStream()" style="display:none;">停止</button>
        </div>
    </div>

    <script>
        let sessionId = null;
        let currentController = null;
        
        async function sendMessage() {
            const input = document.getElementById('message-input');
            const message = input.value.trim();
            if (!message) return;
            
            // 显示用户消息
            appendMessage(message, 'user');
            input.value = '';
            
            // 禁用发送,显示停止按钮
            document.getElementById('send-btn').disabled = true;
            document.getElementById('stop-btn').style.display = 'inline';
            
            // 创建助手消息容器
            const assistantDiv = appendMessage('', 'assistant');
            assistantDiv.classList.add('typing-indicator');
            
            // 创建 AbortController
            currentController = new AbortController();
            
            try {
                const response = await fetch('/chat/stream', {
                    method: 'POST',
                    headers: { 'Content-Type': 'application/json' },
                    body: JSON.stringify({ message, session_id: sessionId }),
                    signal: currentController.signal
                });
                
                const reader = response.body.getReader();
                const decoder = new TextDecoder();
                
                while (true) {
                    const { done, value } = await reader.read();
                    if (done) break;
                    
                    const chunk = decoder.decode(value);
                    const lines = chunk.split('\n');
                    
                    for (const line of lines) {
                        if (line.startsWith('data: ')) {
                            const data = JSON.parse(line.slice(6));
                            
                            if (data.session_id) {
                                sessionId = data.session_id;
                            } else if (data.content) {
                                assistantDiv.textContent += data.content;
                            } else if (data.error) {
                                assistantDiv.textContent = `错误: ${data.error}`;
                            }
                        }
                    }
                }
                
            } catch (error) {
                if (error.name !== 'AbortError') {
                    console.error('Stream error:', error);
                    assistantDiv.textContent = '发生错误,请重试';
                }
            } finally {
                assistantDiv.classList.remove('typing-indicator');
                document.getElementById('send-btn').disabled = false;
                document.getElementById('stop-btn').style.display = 'none';
                currentController = null;
            }
        }
        
        function stopStream() {
            if (currentController) {
                currentController.abort();
            }
        }
        
        function appendMessage(content, role) {
            const container = document.getElementById('chat-messages');
            const div = document.createElement('div');
            div.className = `message ${role}-message`;
            div.textContent = content;
            container.appendChild(div);
            container.scrollTop = container.scrollHeight;
            return div;
        }
        
        // 回车发送
        document.getElementById('message-input').addEventListener('keypress', (e) => {
            if (e.key === 'Enter') sendMessage();
        });
    </script>
</body>
</html>

最佳实践

实现检查清单

项目 说明
✅ 设置正确的响应头 Content-Type: text/event-stream
✅ 禁用缓冲 Nginx: X-Accel-Buffering: no
✅ 心跳机制 长连接时定期发送保活消息
✅ 超时处理 设置合理的超时时间
✅ 错误处理 优雅处理各种错误情况
✅ 取消支持 允许用户中断生成

性能优化建议

# 1. 使用连接池
from openai import OpenAI
import httpx

client = OpenAI(
    http_client=httpx.Client(
        limits=httpx.Limits(max_connections=100)
    )
)

# 2. 设置合理的超时
stream = client.chat.completions.create(
    model="gpt-4",
    messages=messages,
    stream=True,
    timeout=httpx.Timeout(60.0, connect=5.0)
)

# 3. 使用 gzip 压缩(如果内容较长)
from fastapi.middleware.gzip import GZipMiddleware
app.add_middleware(GZipMiddleware, minimum_size=1000)

总结

流式输出是提升 LLM 应用用户体验的关键技术:

技术选择 优点 缺点 推荐场景
SSE 简单易用 单向通信 Web 聊天应用
WebSocket 双向实时 复杂度高 需要双向交互
HTTP/2 多路复用 浏览器支持有限 高并发服务

实现要点:

  1. 正确处理响应头和缓冲
  2. 实现健壮的错误处理和重连
  3. 提供取消/停止功能
  4. 考虑性能优化和连接管理

参考资源

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:《 LLM应用开发——Streaming流式输出 》

本文链接:http://localhost:3015/ai/Streaming%E6%B5%81%E5%BC%8F%E8%BE%93%E5%87%BA.html

本文最后一次更新为 天前,文章中的某些内容可能已过时!