五大框架 WebSocket 流式通讯支持性分析
Related topics: [[streaming-comparison]], [[llm-abstraction-comparison]]
概述
WebSocket 是实时 AI 应用的核心通讯方式。本文分析五个框架在 WebSocket 场景下的支持性,包括:
- 消息序列化:框架事件如何映射到 WebSocket 消息
- 双向通讯:如何处理客户端中断、打字指示等
- 连接管理:心跳、重连、超时处理
- 性能考量:内存占用、延迟、并发能力
WebSocket 场景的核心需求
┌──────── ─────────────────────────────────────────────────────────┐
│ WebSocket 流式交互模型 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Client ────────> WS Server ────────> LLM Provider │
│ │ │ │ │
│ │ 1. send msg │ 2. generate │ │
│ │ │ │ │
│ │ 4. recv chunk │ 3. stream chunks │ │
│ │ 4. recv chunk │ │ │
│ │ 4. recv tool │ │ │
│ │ 4. recv result │ │ │
│ │ 4. recv end │ │ │
│ ▼ ▼ ▼ │
│ │
│ 需要支持: │
│ - 实时文本流式 (text delta) │
│ - 工具调用流式 (tool_call delta -> execute -> result) │
│ - 结构化输出流式 (JSON delta) │
│ - 取消/中断 (client disconnect / stop signal) │
│ - 心跳保活 (ping/pong) │
└─────────────────────────────────────────────────────────────────┘
1. kosong - Callback 驱动的 WebSocket 完美适配
架构适配性
kosong 的 Callback 机制 天生适合 WebSocket 的推模式:
# kosong WebSocket 集成示例
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
kimi = Kimi(model="kimi-k2-turbo-preview", api_key="...")
history: list[Message] = []
async def on_message_part(part: StreamedMessagePart):
"""每个片段到达立即推送到客户端"""
if isinstance(part, TextPart):
await websocket.send_json({
"type": "text_delta",
"content": part.text
})
elif isinstance(part, ThinkPart):
await websocket.send_json({
"type": "thinking_delta",
"content": part.think
})
elif isinstance(part, ToolCall):
await websocket.send_json({
"type": "tool_call",
"tool": part.function.name,
"args": part.function.arguments
})
async def on_tool_result(result: ToolResult):
"""工具执行完成推送结果"""
await websocket.send_json({
"type": "tool_result",
"tool_call_id": result.tool_call_id,
"output": result.output
})
# 接收用户消息
data = await websocket.receive_json()
history.append(Message(role="user", content=data["message"]))
# 执行生成,流式推送
result = await kosong.step(
chat_provider=kimi,
system_prompt="You are a helpful assistant.",
toolset=toolset,
history=history,
on_message_part=on_message_part, # 实时推送
on_tool_result=on_tool_result, # 工具结果推送
)
# 发送完成标记
await websocket.send_json({
"type": "done",
"usage": result.usage.model_dump() if result.usage else None
})
WebSocket 消息协议映射
| kosong 事件 | WebSocket 消息类型 | 适用场景 |
|---|---|---|
TextPart | text_delta | 实时打字机效果 |
ThinkPart | thinking_delta | 展示推理过程 |
ToolCall | tool_call | 显示正在调用工具 |
ToolResult | tool_result | 显示工具返回 |
GenerateResult | done | 生成完成 |
取消/中断支持
# 优雅取消支持
async def websocket_endpoint(websocket: WebSocket):
task: asyncio.Task | None = None
async def generate_task():
try:
result = await kosong.step(
chat_provider=kimi,
...,
on_message_part=on_message_part,
)
except asyncio.CancelledError:
# 清理资源,通知客户端
await websocket.send_json({"type": "cancelled"})
raise
# 启动生成任务
task = asyncio.create_task(generate_task())
# 监听客户端消息(支持中断)
try:
while True:
data = await websocket.receive_json()
if data.get("action") == "stop":
task.cancel() # 取消生成
break
except WebSocketDisconnect:
task.cancel()
优势与局限
| 维度 | 评分 | 说明 |
|---|---|---|
| 延迟 | ⭐⭐⭐⭐⭐ | Callback 机制,零缓冲延迟 |
| 内存 | ⭐⭐⭐⭐⭐ | 不缓存所有片段,即来即推 |
| 复杂度 | ⭐⭐⭐⭐ | 简单直接,无需额外适配层 |
| 双向通讯 | ⭐⭐⭐ | 需自己实现中断/心跳 |
| 结构化输出 | ⭐⭐⭐ | 需自行解析和验证 |
2. republic - 事件流的 WebSocket 映射
架构适配性
republic 提供 StreamEvents 包装器,适合结构化事件推送:
# republic WebSocket 集成示例
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
llm = LLM(model="openai:gpt-5", api_key="...")
# 使用 stream_events 获取结构化事件
events = llm.stream_events(
prompt="What is the weather in Tokyo?",
tools=[get_weather],
auto_call_tools=True,
)
for event in events:
# 直接映射到 WebSocket 消息
await websocket.send_json({
"type": event.kind, # text, tool_call, tool_result, usage, error, final
"data": event.data
})
# 处理中断信号
if await check_client_disconnect(websocket):
break