LLM Call 和 Return 封装机制对比
Related topics: [[llm-framework-comparison]], [[llm-abstraction-comparison]]
Overview
本文对比分析四个框架如何封装 LLM API 的 Call(请求) 和 Return(响应),以及它们是否使用各 Provider 的官方 SDK。
1. SDK 使用策略对比
| 框架 | SDK 策略 | 使用的 SDK |
|---|---|---|
| LitAI | 私有统一 SDK | lightning_sdk.llm.LLM (Lightning AI 私有) |
| Republic | 第三方统一库 | any-llm (开源统一接口) |
| Pydantic AI | 各 Provider 官方 SDK | openai, anthropic, google-genai, mistralai, ... |
| Kimi CLI (kosong) | 各 Provider 官方 SDK | openai, anthropic, google-genai |
2. LitAI: Lightning SDK 统一封装
SDK 依赖
# litai/llm.py
from lightning_sdk.llm import LLM as SDKLLM
from lightning_sdk.lightning_cloud.openapi import V1ConversationResponseChunk
特点: 完全依赖 Lightning AI 私有 SDK,不直接使用任何 Provider SDK。
Call 封装
class LLM:
_sdkllm_cache: Dict[str, SDKLLM] = {} # 类级别缓存
def __init__(self, model, fallback_models, billing, max_retries, ...):
self._llm: Optional[SDKLLM] = None
# 后台线程预加载
threading.Thread(target=self._load_models, daemon=True).start()
def _load_models(self):
# 缓存 SDKLLM 实例
key = f"{self._model}::{self._teamspace}::{self._enable_async}"
if key not in self._sdkllm_cache:
self._sdkllm_cache[key] = SDKLLM(
name=self._model,
teamspace=self._teamspace,
enable_async=self._enable_async
)
self._llm = self._sdkllm_cache[key]
def _model_call(self, model: SDKLLM, prompt, ...):
# 直接调用 SDKLLM.chat()
response = model.chat(
prompt=prompt,
system_prompt=system_prompt,
max_completion_tokens=max_completion_tokens,
images=images,
conversation=conversation,
metadata=metadata,
stream=stream,
full_response=full_response,
tools=tools,
reasoning_effort=reasoning_effort,
**kwargs,
)
return response
Return 封装
@staticmethod
def _format_tool_response(
response: V1ConversationResponseChunk, # Lightning SDK 类型
call_tools: bool = True,
lit_tools: Optional[List[LitTool]] = None
) -> str:
if response.choices is None or len(response.choices) == 0:
return ""
tool_calls = response.choices[0].tool_calls
result = []
for tool_call in tool_calls:
new_tool = {
"function": {
"arguments": tool_call.function.arguments,
"name": tool_call.function.name,
}
}
result.append(new_tool)
return json.dumps(result)
通信流程
LLM.chat()
↓
SDKLLM.chat() [lightning_sdk]
↓
Lightning API Gateway [统一网关]
↓
Provider API (OpenAI/Anthropic/Google/...)
核心特点:
- 所有请求通过 Lightning AI 网关
- 统一计费和管理
- 后台线程预加载模型
- 类级别缓存 SDKLLM 实例
3. Republic: any-llm 统一接口
SDK 依赖
# republic/core/execution.py
from any_llm import AnyLLM
from any_llm.exceptions import (
AuthenticationError,
RateLimitError,
ContextLengthExceededError,
ModelNotFoundError,
ProviderError,
...
)
特点: 使用开源的 any-llm 库,它内部封装了各 Provider SDK。
Call 封装
class LLMCore:
RETRY = object() # 重试信号
def __init__(self, provider, model, fallback_models, max_retries, api_key, api_base, ...):
self._client_cache: dict[str, AnyLLM] = {}
def get_client(self, provider: str) -> AnyLLM:
"""获取或创建 AnyLLM 客户端"""
cache_key = self._freeze_cache_key(provider, api_key, api_base)
if cache_key not in self._client_cache:
self._client_cache[cache_key] = AnyLLM.create(
provider,
api_key=api_key,
api_base=api_base,
**self._client_args
)
return self._client_cache[cache_key]
def run_chat_sync(self, messages_payload, tools_payload, ...):
"""执行同步聊天请求"""
for provider_name, model_id, client in self.iter_clients(model, provider):
for attempt in range(self.max_attempts()):
try:
response = client.completion( # any-llm 统一接口
model=model_id,
messages=messages_payload,
tools=tools_payload,
stream=stream,
reasoning_effort=reasoning_effort,
**self._decide_kwargs_for_provider(provider_name, max_tokens, kwargs),
)
except Exception as exc:
outcome = self._handle_attempt_error(exc, provider_name, model_id, attempt)
if outcome.decision is AttemptDecision.RETRY_SAME_MODEL:
continue
break
else:
result = on_response(response, provider_name, model_id, attempt)
if result is self.RETRY:
continue
return result
Return 封装
# republic/clients/chat.py
class ChatClient:
@staticmethod
def _extract_text(response: Any) -> str:
"""从 any-llm 响应中提取文本"""
if isinstance(response, str):
return response
choices = getattr(response, "choices", None)
if not choices:
return ""
message = getattr(choices[0], "message", None)
if message is None:
return ""
return getattr(message, "content", "") or ""
@staticmethod
def _extract_tool_calls(response: Any) -> list[dict[str, Any]]:
"""从 any-llm 响应中提取工具调用"""
choices = getattr(response, "choices", None)
if not choices:
return []
message = getattr(choices[0], "message", None)
if message is None:
return []
tool_calls = getattr(message, "tool_calls", None) or []
calls: list[dict[str, Any]] = []
for tool_call in tool_calls:
entry: dict[str, Any] = {
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
}
}
call_id = getattr(tool_call, "id", None)
if call_id:
entry["id"] = call_id
calls.append(entry)
return calls
@staticmethod
def _extract_usage(response: Any) -> dict[str, Any] | None:
"""从 any-llm 响应中提取 usage"""
usage = getattr(response, "usage", None)
if usage is None:
return None
if hasattr(usage, "model_dump"):
return usage.model_dump()
# ...
流式响应处理
class ToolCallAssembler:
"""流式工具调用增量合并器"""
def __init__(self):
self._calls: dict[object, dict[str, Any]] = {}
self._order: list[object] = []
self._index_to_key: dict[Any, object] = {}
def add_deltas(self, tool_calls: list[Any]):
"""添加工具调用增量"""
for position, tool_call in enumerate(tool_calls):
key = self._resolve_key(tool_call, position)
if key not in self._calls:
self._order.append(key)
self._calls[key] = {"function": {"name": "", "arguments": ""}}
entry = self._calls[key]
# 合并增量
func = getattr(tool_call, "function", None)
if func:
name = getattr(func, "name", None)
if name:
entry["function"]["name"] = name
arguments = getattr(func, "arguments", None)
if arguments:
entry["function"]["arguments"] = entry["function"].get("arguments", "") + arguments
def finalize(self) -> list[dict[str, Any]]:
return [self._calls[key] for key in self._order]