上下文擦除与转换能力对比
Related topics: [[session-history-management]], [[republic-anchor-mechanism]]
Overview
本文对比分析五个框架的 上下文擦除/转换能力,即如何在发送给 LLM 之前对历史消息进行过滤、精简或转换。
★ Insight ─────────────────────────────────────
上下文擦除的基石是强类型的消息 Streaming
只有当消息有明确的类型结构(如 TapeEntry.kind、ModelMessage.part_kind),才能实现精准的擦除逻辑:
- 擦除工具结果但保留工具调用
- 精简长代码块但保留签名
- 去重重复的文件读取记录
─────────────────────────────────────────────────
1. Republic: TapeContext.select 钩子
核心机制
# tape/context.py
@dataclass(frozen=True)
class TapeContext:
anchor: AnchorSelector = LAST_ANCHOR # 从哪里切片
select: Callable[[Sequence[TapeEntry], TapeContext], list[dict]] | None = None # 如何转换
工作流程
Tape (不可变)
↓
_slice_after_anchor() # 空间切片
↓
select(entries, context) # 内容转换/擦除
↓
messages → LLM
场景 A: 擦除工具结果
def prune_tool_results(entries: Sequence[TapeEntry], context: TapeContext):
"""擦除工具返回,只保留调用"""
messages = []
for entry in entries:
if entry.kind == "message":
messages.append(entry.payload)
elif entry.kind == "tool_call":
messages.append(entry.payload)
# 故意跳过 tool_result
return messages
# 使用
ctx = TapeContext(anchor=LAST_ANCHOR, select=prune_tool_results)
tape.chat("Check status again", context=ctx)
场景 B: 去重文件读取
def deduplicate_file_reads(entries: Sequence[TapeEntry], context: TapeContext):
"""只保留最后一次文件读取"""
last_file_reads: dict[str, TapeEntry] = {}
result = []
for entry in entries:
if entry.kind == "tool_result":
tool_name = entry.meta.get("tool_name")
if tool_name == "read_file":
file_path = entry.payload.get("path")
last_file_reads[file_path] = entry # 覆盖旧的
else:
result.append(entry.payload)
else:
result.append(entry.payload)
# 添加最后一次读取
for entry in last_file_reads.values():
result.append(entry.payload)
return result
设计哲学
| 原则 | 说明 |
|---|---|
| Immutable Tape | 原始磁带永远记录所有动作(审计证据) |
| Dynamic View | TapeContext 只是一个"滤镜",让 LLM 看到干净版本 |
| Separation of Concerns | Anchor 负责"空间",select 负责"内容" |
2. Kimi CLI: Compaction 协议
核心机制
# soul/compaction.py
@runtime_checkable
class Compaction(Protocol):
async def compact(self, messages: Sequence[Message], llm: LLM) -> Sequence[Message]:
"""将消息序列压缩为新的消息序列"""
...
SimpleCompaction 实现
class SimpleCompaction:
def __init__(self, max_preserved_messages: int = 2):
self.max_preserved_messages = max_preserved_messages
async def compact(self, messages: Sequence[Message], llm: LLM) -> Sequence[Message]:
compact_message, to_preserve = self.prepare(messages)
if compact_message is None:
return to_preserve
# 使用 LLM 进行智能压缩
result = await kosong.step(
chat_provider=llm.chat_provider,
system_prompt="You are a helpful assistant that compacts conversation context.",
toolset=EmptyToolset(),
history=[compact_message],
)
# 构建压缩后的消息
content = [system("Previous context has been compacted. Here is the compaction output:")]
content.extend(part for part in result.message.content if not isinstance(part, ThinkPart))
compacted_messages = [Message(role="user", content=content)]
compacted_messages.extend(to_preserve)
return compacted_messages
def prepare(self, messages: Sequence[Message]) -> PrepareResult:
"""准备压缩:保留最后 N 条消息,其余的待压缩"""
# 从后往前找最后 N 条 user/assistant 消息
# ...