五大框架流式消息处理深度对比
Related topics: [[llm-abstraction-comparison]], [[kosong]], [[republic]], [[litai]], [[pydantic-ai]], [[langchain]]
概述
本文深入对比五个框架对流式(Streaming)消息的处理机制,评估它们是否将流式支持作为一等公民(First-class Citizen)。
一等公民的评判标准
- API 设计:流式是否是核心抽象,还是事后添加的
- 数据流模式:推模式(Push)vs 拉模式(Pull),是否支持背压
- 片段管理:如何处理流式片段的组装和合并
- 工具调用:流式响应中工具调用的处理能力
- 错误处理:流式过程中的错误传播和恢复
- 取消机制:是否支持优雅的流式取消
1. kosong - 流式原生设计
核心架构:Protocol-based 流式抽象
@runtime_checkable
class StreamedMessage(Protocol):
def __aiter__(self) -> AsyncIterator[StreamedMessagePart]: ...
@property
def id(self) -> str | None: ...
@property
def usage(self) -> "TokenUsage | None": ...
流式处理流水线
ChatProvider.generate()
↓
StreamedMessage (Async Iterator)
↓
_generate() 合并片段
↓
GenerateResult (完整消息)
独特设计:merge_in_place 就地合并
async def generate(...):
message = Message(role="assistant", content=[])
pending_part: StreamedMessagePart | None = None
async for part in stream:
if on_message_part:
await callback(on_message_part, part.model_copy(deep=True))
if pending_part is None:
pending_part = part
elif not pending_part.merge_in_place(part): # 尝试合并
# 无法合并,保存 pending,开始新 part
_message_append(message, pending_part)
pending_part = part
优势:
- 延迟极低:片段到达立即通过 callback 通知上层
- 自动合并:文本片段自动拼接,工具调用参数增量组装
- 内存友好:不会缓存所有原始片段
工具调用的流式处理
# ToolCall 和 ToolCallPart 的协作
class ToolCall(BaseModel, MergeableMixin):
@override
def merge_in_place(self, other: Any) -> bool:
if not isinstance(other, ToolCallPart):
return False
if self.function.arguments is None:
self.function.arguments = other.arguments_part
else:
self.function.arguments += other.arguments_part or ""
return True
关键洞察:
ToolCallPart是流式片段(arguments_part)ToolCall是完整工具调用- 通过
merge_in_place实现增量组装