已经是最新一篇文章了!
已经是最后一篇文章了!
实现实时响应的用户体验
前言
LLM 生成文本通常需要数秒时间,传统的请求-响应模式会让用户等待很长时间才能看到结果。流式输出(Streaming)技术允许我们逐步返回生成的内容,大幅提升用户体验。
流式输出概述
为什么需要流式输出
| 传统模式 | 流式模式 |
|---|---|
| 等待全部生成完成 | 即时看到输出 |
| 首字延迟高(可能 5-10秒) | 首字延迟低(约 200ms) |
| 用户体验差 | 类似人类打字效果 |
| 无法提前中断 | 可随时停止生成 |
技术对比
┌─────────────────────────────────────────────────────────────────┐
│ 流式传输技术对比 │
├─────────────────┬─────────────────┬─────────────────────────────┤
│ 技术 │ 特点 │ 适用场景 │
├─────────────────┼─────────────────┼─────────────────────────────┤
│ SSE │ 单向、简单 │ 服务器向客户端推送 │
│ WebSocket │ 双向、实时 │ 需要双向通信 │
│ HTTP/2 Streaming│ 多路复用 │ 高并发场景 │
│ gRPC Streaming │ 高效、类型安全 │ 微服务架构 │
└─────────────────┴─────────────────┴─────────────────────────────┘
OpenAI 流式 API
基础使用
from openai import OpenAI
client = OpenAI()
# 流式调用
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "写一首关于春天的诗"}],
stream=True # 启用流式输出
)
# 逐块处理
for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
流式响应结构
# 每个 chunk 的结构
{
"id": "chatcmpl-xxx",
"object": "chat.completion.chunk",
"created": 1234567890,
"model": "gpt-4",
"choices": [{
"index": 0,
"delta": {
"content": "这" # 增量内容
},
"finish_reason": None # 最后一个chunk为"stop"
}]
}
完整文本收集
def stream_with_collection(messages: list) -> tuple[str, str]:
"""流式输出并收集完整响应"""
client = OpenAI()
full_response = []
stream = client.chat.completions.create(
model="gpt-4",
messages=messages,
stream=True
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
content = delta.content
full_response.append(content)
yield content # 流式输出
# 返回完整响应
return "".join(full_response)
# 使用生成器
messages = [{"role": "user", "content": "解释量子计算"}]
full_text = ""
for text in stream_with_collection(messages):
print(text, end="", flush=True)
full_text += text
print(f"\n\n完整响应长度: {len(full_text)}")
SSE (Server-Sent Events)
基本原理
SSE 是一种服务器向客户端单向推送数据的技术:
┌────────────────┐ SSE Connection ┌────────────────┐
│ │ ─────────────────────────────> │ │
│ Server │ data: chunk1 │ Client │
│ │ data: chunk2 │ │
│ │ data: chunk3 │ │
│ │ data: [DONE] │ │
└────────────────┘ └────────────────┘
FastAPI + SSE 实现
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json
app = FastAPI()
client = OpenAI()
async def generate_sse_response(prompt: str):
"""生成 SSE 格式的响应"""
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
# SSE 格式: data: {json}\n\n
yield f"data: {json.dumps({'content': content})}\n\n"
# 发送结束标记
yield f"data: {json.dumps({'done': True})}\n\n"
@app.get("/chat/stream")
async def chat_stream(prompt: str):
return StreamingResponse(
generate_sse_response(prompt),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # 禁用 Nginx 缓冲
}
)
前端 JavaScript 消费 SSE
// 使用 EventSource API
const eventSource = new EventSource('/chat/stream?prompt=你好');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.done) {
eventSource.close();
console.log('Stream completed');
return;
}
// 追加内容到页面
document.getElementById('output').textContent += data.content;
};
eventSource.onerror = (error) => {
console.error('SSE Error:', error);
eventSource.close();
};
使用 fetch API 处理 SSE
async function streamChat(prompt) {
const response = await fetch(`/chat/stream?prompt=${encodeURIComponent(prompt)}`);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let output = document.getElementById('output');
output.textContent = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.content) {
output.textContent += data.content;
}
}
}
}
}
WebSocket 实现
FastAPI WebSocket 服务
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from openai import OpenAI
import json
app = FastAPI()
client = OpenAI()
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
try:
while True:
# 接收用户消息
data = await websocket.receive_text()
message = json.loads(data)
# 流式调用 OpenAI
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": message["content"]}],
stream=True
)
# 逐块发送
for chunk in stream:
if chunk.choices[0].delta.content:
await websocket.send_json({
"type": "chunk",
"content": chunk.choices[0].delta.content
})
# 发送完成标记
await websocket.send_json({"type": "done"})
except WebSocketDisconnect:
print("Client disconnected")
前端 WebSocket 客户端
class ChatWebSocket {
constructor(url) {
this.url = url;
this.ws = null;
this.onMessage = null;
this.onComplete = null;
}
connect() {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => resolve();
this.ws.onerror = (error) => reject(error);
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'chunk' && this.onMessage) {
this.onMessage(data.content);
} else if (data.type === 'done' && this.onComplete) {
this.onComplete();
}
};
});
}
send(content) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ content }));
}
}
close() {
if (this.ws) {
this.ws.close();
}
}
}
// 使用示例
const chat = new ChatWebSocket('ws://localhost:8000/ws/chat');
await chat.connect();
chat.onMessage = (content) => {
document.getElementById('output').textContent += content;
};
chat.onComplete = () => {
console.log('Response complete');
};
chat.send('你好,请介绍一下自己');
异步流式处理
AsyncOpenAI 客户端
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def async_stream_chat(prompt: str):
"""异步流式聊天"""
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
# 使用
async def main():
async for content in async_stream_chat("解释机器学习"):
print(content, end="", flush=True)
asyncio.run(main())
多流并发处理
import asyncio
from openai import AsyncOpenAI
from typing import AsyncGenerator
client = AsyncOpenAI()
async def stream_single(prompt: str, stream_id: int) -> AsyncGenerator[tuple, None]:
"""单个流式请求"""
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield (stream_id, chunk.choices[0].delta.content)
async def merge_streams(*prompts: str):
"""合并多个流"""
tasks = [
stream_single(prompt, i)
for i, prompt in enumerate(prompts)
]
# 使用 asyncio.as_completed 处理多个流
async def consume_stream(gen, stream_id):
results = []
async for sid, content in gen:
results.append(content)
yield (stream_id, content)
return (stream_id, "".join(results))
# 并发处理
for coro in asyncio.as_completed([
consume_stream(gen, i) for i, gen in enumerate(tasks)
]):
result = await coro
print(f"Stream {result[0]} completed")
错误处理与重连
健壮的流式处理
from openai import OpenAI, APIError, RateLimitError
import time
from typing import Generator
def robust_stream(
prompt: str,
max_retries: int = 3,
retry_delay: float = 1.0
) -> Generator[str, None, None]:
"""带重试的健壮流式处理"""
client = OpenAI()
retries = 0
while retries < max_retries:
try:
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True,
timeout=30 # 设置超时
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
return # 成功完成
except RateLimitError:
retries += 1
wait_time = retry_delay * (2 ** retries) # 指数退避
print(f"Rate limited, waiting {wait_time}s...")
time.sleep(wait_time)
except APIError as e:
retries += 1
print(f"API Error: {e}, retry {retries}/{max_retries}")
time.sleep(retry_delay)
except Exception as e:
print(f"Unexpected error: {e}")
raise
raise Exception(f"Failed after {max_retries} retries")
前端重连机制
class RobustStreamClient {
constructor(endpoint) {
this.endpoint = endpoint;
this.maxRetries = 3;
this.retryDelay = 1000;
}
async *stream(prompt, options = {}) {
let retries = 0;
let lastContent = '';
while (retries < this.maxRetries) {
try {
const response = await fetch(this.endpoint, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt, resume_from: lastContent }),
signal: options.signal
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
lastContent += text;
yield text;
}
return; // 成功完成
} catch (error) {
if (error.name === 'AbortError') {
throw error; // 用户主动取消
}
retries++;
console.warn(`Stream error, retry ${retries}/${this.maxRetries}:`, error);
if (retries < this.maxRetries) {
await this.sleep(this.retryDelay * Math.pow(2, retries));
}
}
}
throw new Error('Max retries exceeded');
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
流式输出优化
Token 级别流控
import time
from typing import Generator
def throttled_stream(
prompt: str,
tokens_per_second: float = 50
) -> Generator[str, None, None]:
"""限速流式输出,模拟打字效果"""
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
interval = 1.0 / tokens_per_second
last_time = time.time()
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
# 计算需要等待的时间
elapsed = time.time() - last_time
if elapsed < interval:
time.sleep(interval - elapsed)
yield content
last_time = time.time()
缓冲区优化
from collections import deque
import asyncio
class BufferedStream:
"""带缓冲的流式处理器"""
def __init__(self, buffer_size: int = 10):
self.buffer = deque(maxlen=buffer_size)
self.is_complete = False
async def producer(self, prompt: str):
"""生产者:从 API 获取数据"""
client = AsyncOpenAI()
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
self.buffer.append(chunk.choices[0].delta.content)
await asyncio.sleep(0) # 让出控制权
self.is_complete = True
async def consumer(self):
"""消费者:处理缓冲区数据"""
while not self.is_complete or self.buffer:
if self.buffer:
content = self.buffer.popleft()
yield content
else:
await asyncio.sleep(0.01) # 等待新数据
async def stream(self, prompt: str):
"""启动生产者并返回消费者"""
# 启动生产者任务
producer_task = asyncio.create_task(self.producer(prompt))
# 返回消费者生成器
async for content in self.consumer():
yield content
# 确保生产者完成
await producer_task
完整实战示例
带记忆的流式聊天服务
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional, Dict
from openai import OpenAI
import json
import uuid
app = FastAPI()
client = OpenAI()
# 会话存储
sessions: Dict[str, list] = {}
class ChatRequest(BaseModel):
message: str
session_id: Optional[str] = None
def get_or_create_session(session_id: Optional[str]) -> tuple[str, list]:
"""获取或创建会话"""
if session_id and session_id in sessions:
return session_id, sessions[session_id]
new_id = str(uuid.uuid4())
sessions[new_id] = []
return new_id, sessions[new_id]
async def stream_response(messages: list, session_id: str):
"""流式响应生成器"""
full_response = []
try:
stream = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "你是一个有帮助的助手"},
*messages
],
stream=True
)
# 首先发送 session_id
yield f"data: {json.dumps({'session_id': session_id})}\n\n"
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response.append(content)
yield f"data: {json.dumps({'content': content})}\n\n"
# 保存完整响应到会话
sessions[session_id].append({
"role": "assistant",
"content": "".join(full_response)
})
yield f"data: {json.dumps({'done': True})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
# 获取或创建会话
session_id, messages = get_or_create_session(request.session_id)
# 添加用户消息
messages.append({
"role": "user",
"content": request.message
})
return StreamingResponse(
stream_response(messages, session_id),
media_type="text/event-stream"
)
@app.get("/session/{session_id}")
async def get_session(session_id: str):
if session_id not in sessions:
raise HTTPException(status_code=404, detail="Session not found")
return {"messages": sessions[session_id]}
@app.delete("/session/{session_id}")
async def delete_session(session_id: str):
if session_id in sessions:
del sessions[session_id]
return {"status": "deleted"}
配套前端组件
<!DOCTYPE html>
<html>
<head>
<title>流式聊天</title>
<style>
.chat-container {
max-width: 800px;
margin: 0 auto;
padding: 20px;
}
.message {
padding: 10px 15px;
margin: 10px 0;
border-radius: 10px;
}
.user-message {
background: #007bff;
color: white;
margin-left: 20%;
}
.assistant-message {
background: #f1f1f1;
margin-right: 20%;
}
.typing-indicator {
opacity: 0.7;
}
#input-area {
display: flex;
gap: 10px;
margin-top: 20px;
}
#message-input {
flex: 1;
padding: 10px;
border: 1px solid #ddd;
border-radius: 5px;
}
button {
padding: 10px 20px;
background: #007bff;
color: white;
border: none;
border-radius: 5px;
cursor: pointer;
}
button:disabled {
background: #ccc;
}
</style>
</head>
<body>
<div class="chat-container">
<div id="chat-messages"></div>
<div id="input-area">
<input type="text" id="message-input" placeholder="输入消息...">
<button id="send-btn" onclick="sendMessage()">发送</button>
<button id="stop-btn" onclick="stopStream()" style="display:none;">停止</button>
</div>
</div>
<script>
let sessionId = null;
let currentController = null;
async function sendMessage() {
const input = document.getElementById('message-input');
const message = input.value.trim();
if (!message) return;
// 显示用户消息
appendMessage(message, 'user');
input.value = '';
// 禁用发送,显示停止按钮
document.getElementById('send-btn').disabled = true;
document.getElementById('stop-btn').style.display = 'inline';
// 创建助手消息容器
const assistantDiv = appendMessage('', 'assistant');
assistantDiv.classList.add('typing-indicator');
// 创建 AbortController
currentController = new AbortController();
try {
const response = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, session_id: sessionId }),
signal: currentController.signal
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.session_id) {
sessionId = data.session_id;
} else if (data.content) {
assistantDiv.textContent += data.content;
} else if (data.error) {
assistantDiv.textContent = `错误: ${data.error}`;
}
}
}
}
} catch (error) {
if (error.name !== 'AbortError') {
console.error('Stream error:', error);
assistantDiv.textContent = '发生错误,请重试';
}
} finally {
assistantDiv.classList.remove('typing-indicator');
document.getElementById('send-btn').disabled = false;
document.getElementById('stop-btn').style.display = 'none';
currentController = null;
}
}
function stopStream() {
if (currentController) {
currentController.abort();
}
}
function appendMessage(content, role) {
const container = document.getElementById('chat-messages');
const div = document.createElement('div');
div.className = `message ${role}-message`;
div.textContent = content;
container.appendChild(div);
container.scrollTop = container.scrollHeight;
return div;
}
// 回车发送
document.getElementById('message-input').addEventListener('keypress', (e) => {
if (e.key === 'Enter') sendMessage();
});
</script>
</body>
</html>
最佳实践
实现检查清单
| 项目 | 说明 |
|---|---|
| ✅ 设置正确的响应头 | Content-Type: text/event-stream |
| ✅ 禁用缓冲 | Nginx: X-Accel-Buffering: no
|
| ✅ 心跳机制 | 长连接时定期发送保活消息 |
| ✅ 超时处理 | 设置合理的超时时间 |
| ✅ 错误处理 | 优雅处理各种错误情况 |
| ✅ 取消支持 | 允许用户中断生成 |
性能优化建议
# 1. 使用连接池
from openai import OpenAI
import httpx
client = OpenAI(
http_client=httpx.Client(
limits=httpx.Limits(max_connections=100)
)
)
# 2. 设置合理的超时
stream = client.chat.completions.create(
model="gpt-4",
messages=messages,
stream=True,
timeout=httpx.Timeout(60.0, connect=5.0)
)
# 3. 使用 gzip 压缩(如果内容较长)
from fastapi.middleware.gzip import GZipMiddleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
总结
流式输出是提升 LLM 应用用户体验的关键技术:
| 技术选择 | 优点 | 缺点 | 推荐场景 |
|---|---|---|---|
| SSE | 简单易用 | 单向通信 | Web 聊天应用 |
| WebSocket | 双向实时 | 复杂度高 | 需要双向交互 |
| HTTP/2 | 多路复用 | 浏览器支持有限 | 高并发服务 |
实现要点:
- 正确处理响应头和缓冲
- 实现健壮的错误处理和重连
- 提供取消/停止功能
- 考虑性能优化和连接管理
参考资源
版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。
(采用 CC BY-NC-SA 4.0 许可协议进行授权)
本文标题:《 LLM应用开发——Streaming流式输出 》
本文链接:http://localhost:3015/ai/Streaming%E6%B5%81%E5%BC%8F%E8%BE%93%E5%87%BA.html
本文最后一次更新为 天前,文章中的某些内容可能已过时!