LLM 抽象层中的 Middleware/Callback 系统设计
相关主题: [[error-handling-retries]], [[streaming-patterns]], [[observability-telemetry]]
概述
本文分析两个主流 LLM 抽象框架中的中间件和回调系统设计:LangChain 和 Pydantic-AI。这些模式对于构建可观测、可扩展和可调试的 LLM 应用程序至关重要。
核心概念
1. 回调系统架构
LangChain: 分层 Mixin-Based 设计
LangChain 采用精密的 Mixin 架构实现回调:
BaseCallbackHandler
├── LLMManagerMixin (on_llm_start, on_llm_new_token, on_llm_end, on_llm_error)
├── ChainManagerMixin (on_chain_start, on_chain_end, on_chain_error)
├── ToolManagerMixin (on_tool_start, on_tool_end, on_tool_error)
├── RetrieverManagerMixin (on_retriever_start, on_retriever_end, on_retriever_error)
├── CallbackManagerMixin (on_llm_start, on_chat_model_start, on_chain_start, on_tool_start, on_retriever_start)
└── RunManagerMixin (on_text, on_retry, on_custom_event)
关键设计原则:
- 关注点分离: 每个 Mixin 处理特定组件类型(LLM、Chain、Tool、Retriever)
- 生命周期钩子: 每个组件都有
start、end和error回调 - 继承链: Handler 可以继承多个 Mixin 来组合行为
- 异步支持: 通过单独的
AsyncCallbackHandler类支持异步操作
位置: langchain/libs/core/langchain_core/callbacks/base.py:455-505
class BaseCallbackHandler(
LLMManagerMixin,
ChainManagerMixin,
ToolManagerMixin,
RetrieverManagerMixin,
CallbackManagerMixin,
RunManagerMixin,
):
"""基础回调处理器。"""
raise_error: bool = False
run_inline: bool = False
Pydantic-AI: 事件流 + 可观测性
Pydantic-AI 采用不同的方法:
- 事件流处理器:
EventStreamHandler用于流式事件处理 - OpenTelemetry 可观测性: 通过
InstrumentationSettings内置可观测性支持 - 基于图的执行: Agent 作为图节点运行
位置: pydantic-ai/pydantic_ai_slim/pydantic_ai/agent/abstract.py:47-52
EventStreamHandler: TypeAlias = Callable[
[RunContext[AgentDepsT], AsyncIterable[_messages.AgentStreamEvent]], Awaitable[None]
]
"""接收 Agent RunContext 和事件异步可迭代对象的函数。"""
2. 回调管理器模式
LangChain: 集中式管理器与运行上下文
CallbackManager 作为中央分发器:
class CallbackManager(BaseCallbackManager):
"""LangChain 的回调管理器。"""
def on_llm_start(
self,
serialized: dict[str, Any],
prompts: list[str],
run_id: UUID | None = None,
**kwargs: Any,
) -> list[CallbackManagerForLLMRun]:
# 分发到所有处理器
handle_event(self.handlers, "on_llm_start", "ignore_llm", ...)
# 返回运行管理器用于追踪
return [CallbackManagerForLLMRun(...)]
关键特性:
- Handler 注册: 动态添加/移除处理器
- 继承性: Handler 可以是可继承的(传播到子级)或本地的
- 标签与元数据: 附加上下文信息到运行
- 父子关系: 通过
parent_run_id实现层级运行追踪
位置: langchain/libs/core/langchain_core/callbacks/manager.py:1302-1408
Pydantic-AI: 基于图节点的执行
Pydantic-AI 在节点级别集成回调:
class UserPromptNode(AgentNode[DepsT, NodeRunEndT]):
"""处理用户提示和指令的节点。"""
async def run(self, ctx: GraphRunContext[...]) -> ModelRequestNode | CallToolsNode:
# 执行流经节点,每个节点可以发出事件
return ModelRequestNode(...)
位置: pydantic-ai/pydantic_ai_slim/pydantic_ai/_agent_graph.py:140-180
3. 中间件链机制
LangChain: 带事件分发的 Handler 链
LangChain 使用函数式事件分发模式:
def handle_event(
handlers: list[BaseCallbackHandler],
event_name: str,
ignore_condition_name: str | None,
*args: Any,
**kwargs: Any,
) -> None:
"""CallbackManager 的通用事件处理器。"""
coros: list[Coroutine] = []
for handler in handlers:
try:
if ignore_condition_name is None or not getattr(handler, ignore_condition_name):
event = getattr(handler, event_name)(*args, **kwargs)
if asyncio.iscoroutine(event):
coros.append(event)
except NotImplementedError:
# chat_model_start -> llm_start 的回退处理
if event_name == "on_chat_model_start":
handle_event([handler], "on_llm_start", ...)
except Exception as e:
logger.warning("回调错误: %s", e)
if handler.raise_error:
raise
位置: langchain/libs/core/langchain_core/callbacks/manager.py:254-335
关键设计模式:
- 忽略条件: Handler 可以声明
ignore_*属性来跳过事件 - 回退链:
on_chat_model_start回退到on_llm_start - 异步聚合: 收集协程并适当执行
- 错误隔离: 一个 handler 的错误不会破坏其他 handler
Pydantic-AI: 包装器模型模式
Pydantic-AI 使用装饰器/包装器模式实现中间件:
@dataclass(init=False)
class WrapperModel(Model):
"""包装另一个模型的模型。用作基类。"""
wrapped: Model
async def request(self, messages, model_settings, model_request_parameters):
# 预处理
result = await self.wrapped.request(messages, model_settings, model_request_parameters)
# 后处理
return result
@asynccontextmanager
async def request_stream(self, messages, model_settings, model_request_parameters, run_context):
async with self.wrapped.request_stream(...) as response_stream:
# 可以拦截/修改流
yield response_stream
位置: pydantic-ai/pydantic_ai_slim/pydantic_ai/models/wrapper.py
4. 关键回调点
LangChain 回调生命周期
| 组件 | 开始 | 新 Token | 结束 | 错误 |
|---|---|---|---|---|
| LLM | on_llm_start | on_llm_new_token | on_llm_end | on_llm_error |
| Chat Model | on_chat_model_start | on_llm_new_token | on_llm_end | on_llm_error |
| Chain | on_chain_start | - | on_chain_end | on_chain_error |
| Tool | on_tool_start | - | on_tool_end | on_tool_error |
| Retriever | on_retriever_start | - | on_retriever_end | on_retriever_error |
| Agent | on_agent_action | - | on_agent_finish | (通过 chain/tool) |
| Retry | on_retry | - | - | - |
| Custom | on_custom_event | - | - | - |