工具调用流式组装设计
Related topics: [[streaming-comparison]], [[llm-abstraction-comparison]], [[typed-message-parts-pydantic-ai]]
Overview
工具调用流式组装是 LLM 抽象层中的核心问题:当模型以流式方式返回工具调用时,响应可能被分割成多个片段(delta),框架需要能够将这些片段正确地组装成完整的工具调用。本文分析 pydantic-ai 和 kimi-cli (kosong) 两个框架的实现方案。
核心问题
1. 流式响应的挑战
LLM 的流式响应中,工具调用可能以以下方式到达:
片段1: {"tool_name": "search", "arguments": "{\"query\":"}
片段2: "\"hello world\"}"
或者:
片段1: {"tool_name": "search"}
片段2: {"arguments": {"query": "hello world"}}
框架需要处理:
- 增量组装:将多个片段合并为完整的工具调用
- 类型一致性:处理字符串 JSON 和字典类型的参数
- 完整性检测:判断工具调用是否已接收完毕
- 多工具并发:同时处理多个工具调用的流式响应
pydantic-ai 的实现
核心类型定义
文件: pydantic-ai/pydantic_ai_slim/pydantic_ai/messages.py
@dataclass(repr=False)
class BaseToolCallPart:
"""A tool call from a model."""
tool_name: str
args: str | dict[str, Any] | None = None
tool_call_id: str = field(default_factory=_generate_tool_call_id)
# ... provider metadata ...
@dataclass(repr=False)
class ToolCallPart(BaseToolCallPart):
"""A tool call from a model."""
part_kind: Literal['tool-call'] = 'tool-call'
@dataclass(repr=False, kw_only=True)
class ToolCallPartDelta:
"""A partial update (delta) for a `ToolCallPart`."""
tool_name_delta: str | None = None
args_delta: str | dict[str, Any] | None = None
tool_call_id: str | None = None
part_delta_kind: Literal['tool_call'] = 'tool_call'
Delta 应用机制
文件: pydantic-ai/pydantic_ai_slim/pydantic_ai/messages.py:1850-1950
def apply(self, part: ModelResponsePart | ToolCallPartDelta) -> ToolCallPart | BuiltinToolCallPart | ToolCallPartDelta:
"""Apply this delta to a part or delta, returning a new part or delta with the changes applied."""
if isinstance(part, ToolCallPart | BuiltinToolCallPart):
return self._apply_to_part(part)
if isinstance(part, ToolCallPartDelta):
return self._apply_to_delta(part)
raise ValueError(...)
def _apply_to_part(self, part: ToolCallPart | BuiltinToolCallPart) -> ToolCallPart | BuiltinToolCallPart:
"""Apply delta directly to a ToolCallPart."""
if self.tool_name_delta:
tool_name = part.tool_name + self.tool_name_delta
part = replace(part, tool_name=tool_name)
if isinstance(self.args_delta, str):
# 字符串参数:追加
updated_json = (part.args or '') + self.args_delta
part = replace(part, args=updated_json)
elif isinstance(self.args_delta, dict):
# 字典参数:合并
updated_dict = {**(part.args or {}), **self.args_delta}
part = replace(part, args=updated_dict)
return part
PartsManager:流式组装管理器
文件: pydantic-ai/pydantic_ai_slim/pydantic_ai/_parts_manager.py
ModelResponsePartsManager 是 pydantic-ai 流式组装的核心:
@dataclass
class ModelResponsePartsManager:
"""Manages a sequence of parts that make up a model's streamed response."""
_parts: list[ManagedPart] = field(default_factory=list[ManagedPart], init=False)
_vendor_id_to_part_index: dict[VendorId, int] = field(default_factory=dict[VendorId, int], init=False)