结构化错误分类与自动重试设计
Related topics: [[pydantic-ai-agent-graph]], [[langchain-runnable]], [[llm-error-handling]]
Overview
本文分析五个 LLM 框架中的结构化错误分类与自动重试机制:
| 框架 | 语言 | 定位 |
|---|---|---|
| pydantic-ai | Python | 结构化输出优先的 Agent 框架 |
| langchain | Python | 通用 LLM 编排框架 |
| pi-mono | TypeScript | VSCode 扩展 AI Agent 框架 |
| kosong | Python | 轻量级 Chat Provider 库 |
| republic | Python | 统一接口 LLM 客户端 |
核心关注点:错误层次结构设计、重试策略实现、错误恢复机制以及 Callback 系统中的错误传播。
Key Concepts
1. 错误分类层次结构 (Error Hierarchy)
pydantic-ai 的错误层次
Exception
├── ModelRetry # 工具函数重试信号
├── CallDeferred # 延迟工具调用
├── ApprovalRequired # 需要人工审批
├── UserError # 开发者使用错误
└── AgentRunError # Agent 运行期错误基类
├── UsageLimitExceeded # 用量限制超出
├── ConcurrencyLimitExceeded # 并发限制超出
├── UnexpectedModelBehavior # 模型异常行为
│ └── ContentFilterError # 内容过滤触发
├── ModelAPIError # 模型 API 错误基类
│ └── ModelHTTPError # HTTP 错误 (4xx/5xx)
└── IncompleteToolCall # 工具调用不完整
关键设计原则:
- 分层明确:
UserError(开发者错误) vsAgentRunError(运行时错误) - 可恢复性标记:
ModelRetry表示可重试,CallDeferred/ApprovalRequired表示需要外部干预 - 上下文丰富:
ModelHTTPError包含 status_code、body、model_name
# pydantic-ai/pydantic_ai_slim/pydantic_ai/exceptions.py
class ModelHTTPError(ModelAPIError):
"""Raised when an model provider response has a status code of 4xx or 5xx."""
status_code: int
body: object | None
def __init__(self, status_code: int, model_name: str, body: object | None = None):
self.status_code = status_code
self.body = body
message = f'status_code: {status_code}, model_name: {model_name}, body: {body}'
super().__init__(model_name=model_name, message=message)
langchain 的错误层次
Exception
└── LangChainException
├── TracerException
├── OutputParserException # 输出解析错误 (可发送到 LLM 修复)
└── ContextOverflowError # 上下文溢出
关键设计特点:
- ErrorCode 枚举: 标准化错误代码 (
OUTPUT_PARSING_FAILURE,MODEL_RATE_LIMIT等) - 可修复标记:
OutputParserException.send_to_llm允许将错误反馈给模型
# langchain/libs/core/langchain_core/exceptions.py
class OutputParserException(ValueError, LangChainException):
def __init__(
self,
error: Any,
observation: str | None = None,
llm_output: str | None = None,
send_to_llm: bool = False,
):
self.observation = observation
self.llm_output = llm_output
self.send_to_llm = send_to_llm # 是否反馈给 LLM 修复
2. 自动重试机制 (Automatic Retry)
pydantic-ai: 基于 Tenacity 的 HTTP 传输层重试
# pydantic-ai/pydantic_ai_slim/pydantic_ai/retries.py
class RetryConfig(TypedDict, total=False):
"""Configuration for tenacity-based retrying."""
sleep: Callable[[int | float], None | Awaitable[None]]
stop: StopBaseT # 停止策略
wait: WaitBaseT # 等待策略
retry: SyncRetryBaseT | RetryBaseT # 重试条件
before: Callable[[RetryCallState], None | Awaitable[None]]
after: Callable[[RetryCallState], None | Awaitable[None]]
reraise: bool # 是否重新抛出异常
class TenacityTransport(BaseTransport):
"""Synchronous HTTP transport with tenacity-based retry functionality."""
def handle_request(self, request: Request) -> Response:
@retry(**self.config)
def handle_request(req: Request) -> Response:
response = self.wrapped.handle_request(req)
response.request = req
if self.validate_response:
try:
self.validate_response(response)
except Exception:
response.close()
raise
return response
return handle_request(request)
Retry-After 支持:
def wait_retry_after(
fallback_strategy: Callable[[RetryCallState], float] | None = None,
max_wait: float = 300
) -> Callable[[RetryCallState], float]:
"""Wait strategy that respects HTTP Retry-After headers."""
def wait_func(state: RetryCallState) -> float:
exc = state.outcome.exception() if state.outcome else None
if isinstance(exc, HTTPStatusError):
retry_after = exc.response.headers.get('retry-after')
if retry_after:
try:
wait_seconds = int(retry_after)
return min(float(wait_seconds), max_wait)
except ValueError:
# Try parsing as HTTP date
retry_time = parsedate_to_datetime(retry_after)
wait_seconds = (retry_time - now).total_seconds()
return min(wait_seconds, max_wait)
return fallback_strategy(state)
return wait_func
langchain: RunnableRetry 包装器
# langchain/libs/core/langchain_core/runnables/retry.py
class RunnableRetry(RunnableBindingBase[Input, Output]):
"""Retry a Runnable if it fails."""
retry_exception_types: tuple[type[BaseException], ...] = (Exception,)
wait_exponential_jitter: bool = True
exponential_jitter_params: ExponentialJitterParams | None = None
max_attempt_number: int = 3
def _invoke(self, input_, run_manager, config, **kwargs):
for attempt in self._sync_retrying(reraise=True):
with attempt:
result = super().invoke(
input_,
self._patch_config(config, run_manager, attempt.retry_state),
**kwargs,
)
if attempt.retry_state.outcome and not attempt.retry_state.outcome.failed:
attempt.retry_state.set_result(result)
return result
使用方式:
# 链式调用添加重试
chain = template | model.with_retry(
retry_if_exception_type=(ValueError,),
wait_exponential_jitter=True,
stop_after_attempt=5,
)
3. 模型级 Fallback 机制
pydantic-ai: FallbackModel
# pydantic-ai/pydantic_ai_slim/pydantic_ai/models/fallback.py
class FallbackModel(Model):
"""A model that uses one or more fallback models upon failure."""
models: list[Model]
_fallback_on: Callable[[Exception], bool]
async def request(self, messages, model_settings, model_request_parameters):
exceptions: list[Exception] = []
for model in self.models:
try:
response = await model.request(messages, model_settings, model_request_parameters)
except Exception as exc:
if self._fallback_on(exc):
exceptions.append(exc)
continue
raise exc
return response
raise FallbackExceptionGroup('All models from FallbackModel failed', exceptions)
langchain: RunnableWithFallbacks
# langchain/libs/core/langchain_core/runnables/fallbacks.py
class RunnableWithFallbacks(RunnableSerializable[Input, Output]):
"""Runnable that can fallback to other Runnables if it fails."""
runnable: Runnable[Input, Output]
fallbacks: Sequence[Runnable[Input, Output]]
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,)
exception_key: str | None = None # 将异常传递给 fallback 的 key
def invoke(self, input, config=None, **kwargs):
first_error = None
last_error = None
for runnable in self.runnables:
try:
if self.exception_key and last_error is not None:
input[self.exception_key] = last_error
output = runnable.invoke(input, config, **kwargs)
except self.exceptions_to_handle as e:
if first_error is None:
first_error = e
last_error = e
else:
return output
raise first_error
4. Agent 内部重试逻辑
pydantic-ai: GraphAgentState 管理重试计数
# pydantic-ai/pydantic_ai_slim/pydantic_ai/_agent_graph.py
@dataclasses.dataclass(kw_only=True)
class GraphAgentState:
"""State kept across the execution of the agent graph."""
message_history: list[ModelMessage]
usage: RunUsage
retries: int = 0 # 当前重试次数
run_step: int = 0
run_id: str
def increment_retries(
self,
max_result_retries: int,
error: BaseException | None = None,
model_settings: ModelSettings | None = None,
) -> None:
self.retries += 1
if self.retries > max_result_retries:
# 特殊处理:token 限制导致的工具调用不完整
if (
self.message_history
and isinstance(model_response := self.message_history[-1], ModelResponse)
and model_response.finish_reason == 'length'
and isinstance(tool_call := model_response.parts[-1], ToolCallPart)
):
raise IncompleteToolCall(
f'Model token limit exceeded while generating a tool call'
)
message = f'Exceeded maximum retries ({max_result_retries})'
raise UnexpectedModelBehavior(message) from error
输出验证重试:
# 在 _run_stream 中处理输出验证失败
try:
validated = await tool_manager.validate_tool_call(call)
except UnexpectedModelBehavior as e:
if final_result:
# 如果已有有效结果,跳过失败的输出工具
continue
ctx.state.increment_retries(
ctx.deps.max_result_retries, error=e, model_settings=ctx.deps.model_settings
)
raise
5. Callback 系统中的错误传播
langchain: 错误回调接口
# langchain/libs/core/langchain_core/callbacks/base.py
class LLMManagerMixin:
def on_llm_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: UUID | None = None,
tags: list[str] | None = None,
**kwargs: Any,
) -> Any:
"""Run when LLM errors."""
class ChainManagerMixin:
def on_chain_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: UUID | None = None,
**kwargs: Any,
) -> Any:
"""Run when chain errors."""
class BaseCallbackHandler:
def on_retry(
self,
retry_state: RetryCallState,
*,
run_id: UUID,
parent_run_id: UUID | None = None,
**kwargs: Any,
) -> Any:
"""Run on a retry event."""
6. 错误映射与转换
pydantic-ai: OpenAI 错误映射
# pydantic-ai/pydantic_ai_slim/pydantic_ai/models/openai.py
try:
response = await self.client.chat.completions.create(...)
except APIStatusError as e:
if model_response := _check_azure_content_filter(e, self.system, self.model_name):
return model_response
if (status_code := e.status_code) >= 400:
raise ModelHTTPError(status_code=status_code, model_name=self.model_name, body=e.body) from e
raise
except APIConnectionError as e:
raise ModelAPIError(model_name=self.model_name, message=e.message) from e
langchain: 错误响应生成
# langchain/libs/core/langchain_core/language_models/chat_models.py
def _generate_response_from_error(error: BaseException) -> list[ChatGeneration]:
"""Generate a response from an error for tracing purposes."""
if hasattr(error, "response"):
response = error.response
metadata: dict = {}
if hasattr(response, "json"):
try:
metadata["body"] = response.json()
except Exception:
metadata["body"] = getattr(response, "text", None)
if hasattr(response, "headers"):
metadata["headers"] = dict(response.headers)
if hasattr(response, "status_code"):
metadata["status_code"] = response.status_code
if hasattr(error, "request_id"):
metadata["request_id"] = error.request_id
generations = [
ChatGeneration(message=AIMessage(content="", response_metadata=metadata))
]
else:
generations = []
return generations