第 5 篇:对话循环 — query.ts 如何驱动一次完整的 AI 交互

本篇是《深入 Claude Code 源码》系列的第 5 篇。我们将深入 query.ts 这个 1729 行的核心文件,揭示一次完整的 AI 对话是如何被驱动的——从消息组装、API 调用、工具执行到错误恢复,理解这个 Agent 运行时的”心跳”。

为什么需要理解对话循环?

如果把 Claude Code 比作一个人体,那 query.ts 就是它的心脏——对话循环的编排入口。当然,心脏需要血管系统才能工作:重试逻辑在 services/api/withRetry.ts,工具执行在 services/tools/,停止钩子在 query/stopHooks.ts,环境配置在 query/config.ts。本篇会覆盖这个完整的”循环系统”,而不仅仅是 query.ts 一个文件。每一次用户提问,都会触发这个循环:

1
用户输入 → 组装消息 → 调用 API → 模型返回 → 执行工具 → 结果回传 → 模型继续...

这个循环看似简单,但实际的工程复杂度远超预期。一个生产级的 AI 对话循环需要处理:

  • 流式响应:模型的回复是逐 token 流回的,工具调用可能在流的中途就开始执行
  • 多层压缩:对话历史可能随时超出上下文窗口,需要多种策略自动压缩
  • 错误恢复:API 过载、上下文太长、输出被截断……每种错误都有专门的恢复路径
  • 模型降级:主模型不可用时,自动切换到 fallback 模型
  • 并发工具执行:只读工具可以并行,写入工具必须串行

本篇将从宏观到微观,层层展开这个循环的设计。


一、全局视角:AsyncGenerator 驱动的状态机

1.1 query() 的签名

query.ts 的核心是两个嵌套的 AsyncGenerator 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// query.ts:219-239
export async function* query(
params: QueryParams,
): AsyncGenerator<
| StreamEvent
| RequestStartEvent
| Message
| TombstoneMessage
| ToolUseSummaryMessage,
Terminal
> {
const consumedCommandUuids: string[] = []
const terminal = yield* queryLoop(params, consumedCommandUuids)
// 正常退出时通知已消费的命令
for (const uuid of consumedCommandUuids) {
notifyCommandLifecycle(uuid, 'completed')
}
return terminal
}

query() 是一个薄包装层,真正的逻辑在 queryLoop() 中。这个分层设计的目的是:命令生命周期通知只在正常退出时执行。如果 queryLoop() 抛出异常或被 .return() 关闭,for...of 循环不会执行——这正是预期行为,因为异常意味着命令没有成功完成。

1.2 为什么用 AsyncGenerator?

query() 返回 AsyncGenerator 而非 Promise,这是一个关键的架构决策。AsyncGenerator 让对话循环可以:

  1. 流式产出事件:每个中间结果(流式 token、工具执行进度、压缩通知)通过 yield 逐个产出,调用方(REPL 或 SDK)实时消费
  2. 双向通信:调用方可以通过 .return() 随时终止循环(如用户按 Ctrl+C)
  3. 延迟计算:只在调用方拉取时才推进循环,天然的背压控制

1.3 QueryParams:对话循环的输入契约

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// query.ts:181-199
export type QueryParams = {
messages: Message[] // 对话历史
systemPrompt: SystemPrompt // 系统提示词
userContext: { [k: string]: string } // 用户上下文(注入到消息前面)
systemContext: { [k: string]: string } // 系统上下文(追加到系统提示词后面)
canUseTool: CanUseToolFn // 权限检查函数
toolUseContext: ToolUseContext // 运行时上下文容器
fallbackModel?: string // 降级模型
querySource: QuerySource // 查询来源标识
maxTurns?: number // 最大轮次限制
taskBudget?: { total: number } // API task_budget
deps?: QueryDeps // 依赖注入(测试用)
}

其中 QueryDeps 是一个精心设计的依赖注入接口:

1
2
3
4
5
6
7
// query/deps.ts:21-31
export type QueryDeps = {
callModel: typeof queryModelWithStreaming // API 调用
microcompact: typeof microcompactMessages // 微压缩
autocompact: typeof autoCompactIfNeeded // 自动压缩
uuid: () => string // UUID 生成
}

生产环境使用 productionDeps() 返回真实实现,测试环境则注入 fake。这比 spyOn 模块 mock 更干净——callModelautocompact 在 6-8 个测试文件中被 spy,依赖注入消除了这些重复样板。

1.4 State:循环的可变状态

每次迭代共享的可变状态被封装在一个 State 类型中:

1
2
3
4
5
6
7
8
9
10
11
12
13
// query.ts:204-217
type State = {
messages: Message[] // 当前消息数组
toolUseContext: ToolUseContext // 工具执行上下文
autoCompactTracking: AutoCompactTrackingState | undefined
maxOutputTokensRecoveryCount: number // 输出截断恢复计数
hasAttemptedReactiveCompact: boolean // 是否已尝试反应式压缩
maxOutputTokensOverride: number | undefined
pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined
stopHookActive: boolean | undefined
turnCount: number // 轮次计数
transition: Continue | undefined // 上一次迭代为何继续
}

注意 transition 字段——它记录了上一次迭代为什么 continue。这不仅仅用于调试,还用于控制恢复逻辑:比如 collapse_drain_retry 后如果仍然 413(上下文太长),就不再重复 drain 而是 fall through 到 reactive compact。

循环中有 7+ 个 continue 站点,每个站点都通过 state = { ... } 写入新状态。但需要注意,这不是一个高度形式化的单层闭环状态机——它更像是主循环 + 若干恢复 continue 点 + 多个早退出口的混合结构。除了下表中的 continue 站点,还有 attemptWithFallback 驱动的内层 while 循环、异常路径、abort 早退(return { reason: 'aborted_streaming' })以及多种正常终止分支(return { reason: 'completed' / 'image_error' / 'prompt_too_long' / ... }):

continue 站点 transition.reason 触发条件
上下文坍缩排空 collapse_drain_retry prompt-too-long 时排空暂存的坍缩摘要
反应式压缩重试 reactive_compact_retry 413 错误触发全量压缩后重试
输出 token 升级 max_output_tokens_escalate 8k 默认限制命中,升级到 64k
输出截断多轮恢复 max_output_tokens_recovery 输出被截断,注入恢复消息重试(最多 3 次)
Stop Hook 阻塞 stop_hook_blocking 停止钩子返回阻塞错误
Token Budget 续行 token_budget_continuation token 预算未耗尽,继续执行
工具执行后下一轮 next_turn 正常的工具结果回传

二、循环的完整时序

下面用一个 Mermaid 时序图展示一次包含工具调用的完整对话循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
sequenceDiagram
participant User as 用户/REPL
participant Q as queryLoop()
participant Pre as 预处理管线
participant API as deps.callModel()
participant Tool as 工具执行
participant Post as 后处理

User->>Q: query(params)

loop while(true)
Q->>Pre: 消息预处理
Pre->>Pre: applyToolResultBudget()
Pre->>Pre: snipCompact (裁剪旧历史)
Pre->>Pre: microcompact (微压缩)
Pre->>Pre: contextCollapse (上下文坍缩)
Pre->>Pre: autocompact (自动压缩)
Pre-->>Q: messagesForQuery

Q->>API: for await (message of callModel(...))
loop 流式响应
API-->>Q: yield StreamEvent / AssistantMessage
Q-->>User: yield message (实时流出)

alt 流中发现 tool_use block (且 streaming gate 启用)
Q->>Tool: streamingToolExecutor.addTool(block)
Note over Tool: 工具在流式传输期间开始执行<br/>(受 runtime gate 控制,非默认行为)
Tool-->>Q: getCompletedResults()
Q-->>User: yield tool result
end
end

alt needsFollowUp = false (模型未请求工具)
Q->>Post: handleStopHooks()
alt stop hook 返回 blockingErrors
Post-->>Q: 注入 blocking error 消息
Q->>Q: state = { ..., transition: 'stop_hook_blocking' }
Note over Q: continue → 回到 while(true) 顶部
else preventContinuation
Post-->>Q: 直接终止
Q-->>User: return Terminal
else 正常通过
Q-->>User: return Terminal
end
else needsFollowUp = true (模型请求了工具)
Q->>Tool: getRemainingResults() / runTools()
Tool-->>Q: yield 工具结果
Q->>Post: 注入 attachments (memory, skills, commands)
Q->>Q: state = { messages: [..., assistantMsgs, toolResults], transition: 'next_turn' }
Note over Q: continue → 回到 while(true) 顶部
end
end

三、消息预处理管线

在每次 API 调用之前,消息需要经过一条多阶段的预处理管线。这条管线的设计遵循一个原则:越轻量的压缩越先执行,越重的压缩越后执行

3.1 管线各阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// query.ts:365-468(简化)
// 1. 工具结果预算裁剪
messagesForQuery = await applyToolResultBudget(messagesForQuery, ...)

// 2. 历史裁剪(snip compact)
if (feature('HISTORY_SNIP')) {
const snipResult = snipModule!.snipCompactIfNeeded(messagesForQuery)
messagesForQuery = snipResult.messages
}

// 3. 微压缩(microcompact)
const microcompactResult = await deps.microcompact(messagesForQuery, ...)
messagesForQuery = microcompactResult.messages

// 4. 上下文坍缩(context collapse)
if (feature('CONTEXT_COLLAPSE') && contextCollapse) {
const collapseResult = await contextCollapse.applyCollapsesIfNeeded(...)
messagesForQuery = collapseResult.messages
}

// 5. 自动压缩(autocompact)
const { compactionResult } = await deps.autocompact(messagesForQuery, ...)

为什么这个顺序很重要?

  • snipmicrocompact 是本地操作,不需要 API 调用,几乎零耗时。它们先执行,可能就让 token 数降到了 autocompact 的阈值以下
  • context collapse 在 autocompact 之前执行,原因是:如果坍缩就足以将 token 数降到阈值以下,就不需要 autocompact 的全量摘要,保留了更细粒度的上下文
  • autocompact 最重——它需要一次完整的 API 调用来生成对话摘要

3.2 系统提示词的最终组装

1
2
3
4
// query.ts:449-451
const fullSystemPrompt = asSystemPrompt(
appendSystemContext(systemPrompt, systemContext),
)

systemContext 被追加到系统提示词末尾,而 userContext 在调用 API 时被前置到消息数组的开头(prependUserContext(messagesForQuery, userContext))。这种分离确保了:

  • systemContext(如 MCP 指令、Agent 规则)作为系统提示词的一部分,享受 prompt cache
  • userContext(如会话特定的上下文)作为用户消息,不影响系统提示词的缓存命中率

四、API 调用与流式响应处理

4.1 调用模型

API 调用通过 deps.callModel() 发起。callModel 的生产实现是 queryModelWithStreaming(定义在 services/api/claude.ts:752),它本身也是一个 AsyncGenerator:

1
2
3
4
5
6
7
8
9
10
11
// services/api/claude.ts:752-780
export async function* queryModelWithStreaming({
messages, systemPrompt, thinkingConfig, tools, signal, options,
}: { ... }): AsyncGenerator<
StreamEvent | AssistantMessage | SystemAPIErrorMessage,
void
> {
return yield* withStreamingVCR(messages, async function* () {
yield* queryModel(messages, systemPrompt, thinkingConfig, tools, signal, options)
})
}

withStreamingVCR 是一个”录像带”中间件——在调试模式下录制/回放 API 响应,用于测试和问题复现。

4.2 withRetry:面向不可靠网络的重试层

queryModel 内部,真正的 API 调用被 withRetry 包裹。withRetry 本身也是一个 AsyncGenerator——它通过 yield 产出重试状态消息(如 “API error, retrying in 2s…”),调用方可以在 UI 上实时显示:

1
2
3
4
5
6
7
8
9
10
11
12
// services/api/withRetry.ts:170-178
export async function* withRetry<T>(
getClient: () => Promise<Anthropic>,
operation: (client: Anthropic, attempt: number, context: RetryContext) => Promise<T>,
options: RetryOptions,
): AsyncGenerator<SystemAPIErrorMessage, T> {
const maxRetries = getMaxRetries(options)
// ...
for (let attempt = 1; attempt <= maxRetries + 1; attempt++) {
// ...
}
}

重试策略有几个关键设计:

1. 区分前台与后台查询

1
2
3
4
5
6
7
8
// services/api/withRetry.ts:62-82
const FOREGROUND_529_RETRY_SOURCES = new Set<QuerySource>([
'repl_main_thread',
'sdk',
'agent:custom',
'compact',
// ...
])

529(过载)错误只对前台查询重试。后台查询(如标题生成、工具摘要)立即放弃——在容量级联时,每次重试都是 3-10 倍的网关放大,而用户根本看不到这些后台任务的失败。

2. 指数退避 + Retry-After

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// services/api/withRetry.ts:530-548
export function getRetryDelay(
attempt: number,
retryAfterHeader?: string | null,
maxDelayMs = 32000,
): number {
if (retryAfterHeader) {
const seconds = parseInt(retryAfterHeader, 10)
if (!isNaN(seconds)) return seconds * 1000
}
const baseDelay = Math.min(BASE_DELAY_MS * Math.pow(2, attempt - 1), maxDelayMs)
const jitter = Math.random() * 0.25 * baseDelay
return baseDelay + jitter
}

基础延迟 500ms,指数增长到 32s 上限,加 25% 抖动。如果 API 返回了 Retry-After header,则优先遵循服务端指示。

3. 529 连续 3 次后的 Fallback 路径(有条件门槛)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// services/api/withRetry.ts:326-365
if (
is529Error(error) &&
(process.env.FALLBACK_FOR_ALL_PRIMARY_MODELS ||
(!isClaudeAISubscriber() && isNonCustomOpusModel(options.model)))
) {
consecutive529Errors++
if (consecutive529Errors >= MAX_529_RETRIES) {
if (options.fallbackModel) {
throw new FallbackTriggeredError(options.model, options.fallbackModel)
}
// 无 fallback 模型时,对外部用户直接报错
if (process.env.USER_TYPE === 'external' && !isPersistentRetryEnabled()) {
throw new CannotRetryError(new Error(REPEATED_529_ERROR_MESSAGE), retryContext)
}
}
}

注意这里的门槛:不是所有 529 都计入 fallback 计数器。只有在满足特定模型/订阅条件时(FALLBACK_FOR_ALL_PRIMARY_MODELS 环境变量,或者非 Claude AI 订阅用户使用非自定义 Opus 模型),529 才会递增 consecutive529Errors。源码中的 TODO 注释也暗示 isNonCustomOpusModel 检查可能是 Claude Code 早期硬编码 Opus 模型时的历史遗留。满足条件且连续 3 次后,抛出 FallbackTriggeredError,交由 queryLoop() 中的 fallback 处理逻辑接管(见下文 5.1)。

4. Persistent Retry 模式

对于无人值守会话(CLAUDE_CODE_UNATTENDED_RETRY),429/529 无限重试,退避上限 5 分钟,并每 30 秒 yield 一个心跳消息防止宿主环境判定会话空闲:

1
2
3
4
// services/api/withRetry.ts:96-98
const PERSISTENT_MAX_BACKOFF_MS = 5 * 60 * 1000 // 5 分钟
const PERSISTENT_RESET_CAP_MS = 6 * 60 * 60 * 1000 // 6 小时
const HEARTBEAT_INTERVAL_MS = 30_000 // 30 秒

4.3 流式响应处理

queryLoop() 通过 for await...of 消费流式响应:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// query.ts:659-863(核心流程简化)
for await (const message of deps.callModel({ ... })) {
// 1. 处理流式降级
if (streamingFallbackOccured) {
// 清空所有已收集的消息,重置工具执行器
// yield tombstone 标记已废弃的消息
}

// 2. 暂扣可恢复的错误消息
let withheld = false
if (isPromptTooLongMessage(message)) withheld = true
if (isWithheldMaxOutputTokens(message)) withheld = true
if (!withheld) yield yieldMessage // 正常消息实时流出

// 3. 收集 assistant 消息和 tool_use blocks
if (message.type === 'assistant') {
assistantMessages.push(message)
const toolBlocks = message.message.content.filter(c => c.type === 'tool_use')
if (toolBlocks.length > 0) {
toolUseBlocks.push(...toolBlocks)
needsFollowUp = true
}
}

// 4. 流式工具执行:工具在 API 流式传输期间就开始执行
if (streamingToolExecutor) {
for (const toolBlock of msgToolUseBlocks) {
streamingToolExecutor.addTool(toolBlock, message)
}
for (const result of streamingToolExecutor.getCompletedResults()) {
yield result.message
toolResults.push(...)
}
}
}

这段代码中有一个重要概念:暂扣(withhold)。当收到 prompt-too-long 或 max_output_tokens 等可恢复的错误时,不立即 yield 给调用方——而是等流结束后尝试恢复。如果恢复成功,调用方永远不会看到这个错误;如果恢复失败,再 yield 出去。

源码注释解释了为什么必须暂扣 max_output_tokens 错误(query.ts:166-179):如果提前 yield 给 SDK 调用方(如 Cowork/Desktop),它们会在看到 error 字段时终止会话——而此时恢复循环可能还在成功运行。


五、错误恢复:7 层防御

5.1 Fallback 模型切换

withRetry 抛出 FallbackTriggeredError 时,queryLoop() 的内层 try...catch 接管:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// query.ts:893-953(简化)
} catch (innerError) {
if (innerError instanceof FallbackTriggeredError && fallbackModel) {
currentModel = fallbackModel

// 1. 为已废弃的 assistant 消息生成 tool_result 错误块
yield* yieldMissingToolResultBlocks(assistantMessages, 'Model fallback triggered')

// 2. 清空所有已收集的状态
assistantMessages.length = 0
toolResults.length = 0
toolUseBlocks.length = 0

// 3. 丢弃流式工具执行器的挂起结果
if (streamingToolExecutor) {
streamingToolExecutor.discard()
streamingToolExecutor = new StreamingToolExecutor(...)
}

// 4. 剥离 thinking 签名块(防止跨模型 400 错误)
if (process.env.USER_TYPE === 'ant') {
messagesForQuery = stripSignatureBlocks(messagesForQuery)
}

// 5. 通知用户
yield createSystemMessage(
`Switched to ${renderModelName(fallbackModel)} due to high demand...`
)

attemptWithFallback = true
continue // 重新执行内层 while 循环
}
throw innerError
}

注意第 4 步的 stripSignatureBlocks()——这是一个精妙的细节。Thinking 签名是模型绑定的:capybara 模型产生的 protected-thinking block 如果回放给 opus 模型会触发 400 错误。所以降级时必须剥离。

5.2 Prompt-Too-Long(413)的三级恢复

当 API 返回 prompt-too-long 错误时,恢复策略按成本递增的顺序尝试:

1
2
3
4
5
Level 1: Context Collapse Drain(零 API 成本)
↓ 失败
Level 2: Reactive Compact(一次 API 调用做全量摘要)
↓ 失败
Level 3: Surface Error(向用户报告错误)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// query.ts:1085-1183(简化)
if (isWithheld413) {
// Level 1: 排空暂存的上下文坍缩摘要
if (feature('CONTEXT_COLLAPSE') && state.transition?.reason !== 'collapse_drain_retry') {
const drained = contextCollapse.recoverFromOverflow(messagesForQuery, querySource)
if (drained.committed > 0) {
state = { ..., transition: { reason: 'collapse_drain_retry' } }
continue // 重试 API 调用
}
}
}

if (isWithheld413 || isWithheldMedia) {
// Level 2: 反应式压缩
const compacted = await reactiveCompact.tryReactiveCompact({
hasAttempted: hasAttemptedReactiveCompact,
// ...
})
if (compacted) {
state = { ..., hasAttemptedReactiveCompact: true, transition: { reason: 'reactive_compact_retry' } }
continue
}

// Level 3: 恢复失败,暴露错误
yield lastMessage
return { reason: 'prompt_too_long' }
}

hasAttemptedReactiveCompact 标志位防止无限循环:如果压缩后再次 413,说明压缩后的上下文仍然太长,继续压缩无意义。

5.3 Max-Output-Tokens 的两阶段恢复

当模型输出被截断时(stop_reason === 'max_output_tokens'),恢复分两步:

阶段 1:升级输出限制

如果当前使用的是 8k 默认限制(CAPPED_DEFAULT_MAX_TOKENS),先尝试升级到 64k(ESCALATED_MAX_TOKENS),不注入任何恢复消息,纯粹重试同一请求:

1
2
3
4
5
6
// query.ts:1199-1221
if (capEnabled && maxOutputTokensOverride === undefined) {
state = { ..., maxOutputTokensOverride: ESCALATED_MAX_TOKENS,
transition: { reason: 'max_output_tokens_escalate' } }
continue
}

阶段 2:多轮恢复

如果 64k 也不够,注入一条恢复消息让模型从断点继续,最多 3 次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// query.ts:1223-1251
if (maxOutputTokensRecoveryCount < MAX_OUTPUT_TOKENS_RECOVERY_LIMIT) {
const recoveryMessage = createUserMessage({
content: `Output token limit hit. Resume directly — no apology, no recap...`,
isMeta: true,
})
state = {
messages: [...messagesForQuery, ...assistantMessages, recoveryMessage],
maxOutputTokensRecoveryCount: maxOutputTokensRecoveryCount + 1,
transition: { reason: 'max_output_tokens_recovery', attempt: ... },
// ...
}
continue
}

恢复消息的措辞很有讲究——“no apology, no recap”。如果不加这个约束,模型会在每次恢复时说”抱歉,让我继续之前的工作”,浪费大量输出 token。


六、工具执行

6.1 两种执行模式

模型返回 tool_use block 后,有两种执行路径:

1
2
3
4
// query.ts:1380-1382
const toolUpdates = streamingToolExecutor
? streamingToolExecutor.getRemainingResults()
: runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext)
模式 实现 特点
Streaming Tool Execution StreamingToolExecutor 工具在 API 流式传输期间就开始执行
Batch Tool Execution runTools() 等 API 响应完成后批量执行

Streaming 模式通过 Statsig Feature Gate(tengu_streaming_tool_execution2)控制。在这种模式下,每个 tool_use block 一到达就被加入执行队列:

1
2
3
4
5
6
// query.ts:838-844
if (streamingToolExecutor && !toolUseContext.abortController.signal.aborted) {
for (const toolBlock of msgToolUseBlocks) {
streamingToolExecutor.addTool(toolBlock, message)
}
}

6.2 并发安全分区

无论哪种模式,工具执行都遵循相同的并发规则。toolOrchestration.ts 中的 partitionToolCalls() 将工具调用分为两类批次:

1
2
3
4
5
6
7
8
// services/tools/toolOrchestration.ts:91-100
function partitionToolCalls(toolUseMessages, toolUseContext): Batch[] {
return toolUseMessages.reduce((acc, toolUse) => {
const tool = findToolByName(toolUseContext.options.tools, toolUse.name)
const isConcurrencySafe = tool?.isConcurrencySafe(parsedInput)
// ...
}, [])
}
  • Concurrent-safe 批次(如 Grep、Glob、FileRead):使用 runToolsConcurrently() 并行执行,最大并发度 10
  • Non-concurrent 批次(如 FileEdit、BashTool):使用 runToolsSerially() 串行执行

分区算法保持工具的原始顺序——连续的 read-only 工具合并为一个并发批次,遇到写入工具就开始新的串行批次:

1
2
[Grep, Glob, FileRead, FileEdit, Grep, FileRead]
└── 并发批次 ──┘ └─ 串行 ─┘ └── 并发批次 ──┘

6.3 StreamingToolExecutor 的丢弃与取消机制

当发生流式降级(streaming fallback)或模型切换时,已经在执行的工具结果必须被丢弃:

1
2
3
4
// services/tools/StreamingToolExecutor.ts:69-71
discard(): void {
this.discarded = true
}

discard() 的作用是让失败尝试的工具结果整体沉没——设置标志位后,getCompletedResults()getRemainingResults() 都直接返回空(StreamingToolExecutor.ts:412-415, 453-456),不再产出任何结果。调用方随后创建一个全新的 StreamingToolExecutor 实例来服务降级后的请求。

真正生成合成错误 tool_result(synthetic error message)的是另一套机制——getAbortReason()StreamingToolExecutor.ts:210-231)。当工具执行期间检测到 abort 信号时,getAbortReason() 根据 this.discardedthis.hasErroredabortController.signal.aborted 返回不同的取消原因(streaming_fallbacksibling_erroruser_interrupted),然后 executeTool() 用这个原因创建合成的错误块(StreamingToolExecutor.ts:276-291)。

这两套机制的分工是:

  • discard():在流结束后调用,让已完成但未 yield 的结果沉默消失
  • getAbortReason():在工具执行过程中检查,为正在运行或排队的工具生成符合 API 协议的 tool_result 错误块

七、附件注入:Memory、Skill、Command

工具执行完成后、下一次循环开始前,queryLoop() 会注入一系列”附件”(attachment messages):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// query.ts:1580-1628(简化)
// 1. 通用附件(文件变更通知、queued commands 等)
for await (const attachment of getAttachmentMessages(
null, updatedToolUseContext, null, queuedCommandsSnapshot,
[...messagesForQuery, ...assistantMessages, ...toolResults], querySource,
)) {
yield attachment
toolResults.push(attachment)
}

// 2. Memory 预取结果
if (pendingMemoryPrefetch?.settledAt !== null && pendingMemoryPrefetch?.consumedOnIteration === -1) {
const memoryAttachments = filterDuplicateMemoryAttachments(
await pendingMemoryPrefetch.promise,
toolUseContext.readFileState,
)
for (const memAttachment of memoryAttachments) {
const msg = createAttachmentMessage(memAttachment)
yield msg
toolResults.push(msg)
}
pendingMemoryPrefetch.consumedOnIteration = turnCount - 1
}

// 3. Skill 发现结果
if (skillPrefetch && pendingSkillPrefetch) {
const skillAttachments = await skillPrefetch.collectSkillDiscoveryPrefetch(pendingSkillPrefetch)
for (const att of skillAttachments) {
const msg = createAttachmentMessage(att)
yield msg
toolResults.push(msg)
}
}

其中 Memory 预取使用了 ES2024 的 using 关键字(显式资源管理):

1
2
3
4
// query.ts:301-304
using pendingMemoryPrefetch = startRelevantMemoryPrefetch(
state.messages, state.toolUseContext,
)

using 确保无论循环如何退出(正常返回、异常、.return()),预取资源都会被正确清理(dispose)。预取在循环入口处启动,在每次迭代的后处理阶段非阻塞地消费——如果预取还没完成就跳过,下一次迭代再消费。


八、QueryConfig:不可变的环境快照

queryLoop() 入口处会快照一次环境配置:

1
2
3
4
5
6
7
8
9
10
// query/config.ts:15-27
export type QueryConfig = {
sessionId: SessionId
gates: {
streamingToolExecution: boolean // Statsig gate
emitToolUseSummaries: boolean // 环境变量
isAnt: boolean // 内部用户
fastModeEnabled: boolean // 快速模式
}
}

为什么要快照?因为 Statsig Feature Gate 的值在会话期间可能变化(CACHED_MAY_BE_STALE),但对话循环的一次执行应该保持一致的行为。快照将”当前 gate 值”固化为不可变数据,避免了循环中途 gate 翻转导致的不一致问题。

注意 QueryConfig 刻意排除了 feature() gate。源码注释(query/config.ts:14)解释了原因:feature() 是编译期常量,必须保持内联在 if (feature('...')) 中,这样 Bun 的 Dead Code Elimination 才能将整个分支从构建产物中删除。如果把 feature() 的值存入 QueryConfig,就破坏了 DCE 的条件——编译器无法确定 config.gates.someFeaturetrue 还是 false


九、feature() 在 query.ts 中的运用

query.ts 顶部有大量 feature() 门控的条件加载:

1
2
3
4
5
6
7
// query.ts:15-21
const reactiveCompact = feature('REACTIVE_COMPACT')
? (require('./services/compact/reactiveCompact.js') as typeof import('./services/compact/reactiveCompact.js'))
: null
const contextCollapse = feature('CONTEXT_COLLAPSE')
? (require('./services/contextCollapse/index.js') as typeof import('./services/contextCollapse/index.js'))
: null

这些 feature() + require() 组合确保了:

  1. 外部版不包含这些模块的代码——feature()false 时,require() 被 DCE 删除
  2. 使用 require() 而非 import——静态 import 无论条件如何都会被 bundler 纳入依赖图
  3. 类型安全通过 as typeof import() 保留——不丢失 TypeScript 类型信息

query.ts 中,通过 feature() 门控的模块有:

feature gate 模块 功能
REACTIVE_COMPACT reactiveCompact.js 413 错误后的反应式压缩
CONTEXT_COLLAPSE contextCollapse/index.js 上下文坍缩(分段摘要)
HISTORY_SNIP snipCompact.js 历史消息裁剪
EXPERIMENTAL_SKILL_SEARCH skillSearch/prefetch.js 技能发现预取
TEMPLATES jobs/classifier.js 模板任务分类
BG_SESSIONS taskSummary.js 后台会话摘要
TOKEN_BUDGET tokenBudget.js Token 预算控制

十、可迁移的设计模式

模式 1:AsyncGenerator 状态机

while(true) + yield + state = { ..., transition } 实现显式状态机。transition 字段记录跳转原因,既方便调试,又能在后续迭代中根据前一次的跳转原因决定行为(如避免重复恢复)。

适用场景:任何需要多轮交互、错误恢复、可中断的长运行任务。相比递归调用(早期 Claude Code 就是递归版),while(true) 状态机没有栈溢出风险,且 State 的所有字段一目了然。

模式 2:暂扣-恢复(Withhold-Recover)

在流式处理中,对可恢复的错误不立即向调用方报告——先暂扣,尝试恢复。恢复成功则调用方无感知;恢复失败再暴露错误。这避免了下游消费者对中间状态的错误反应。

适用场景:任何流式 API 的消费层。例如前端 WebSocket 消息处理、数据管道中的错误重试等。

模式 3:依赖注入的最小接口

QueryDeps 只有 4 个方法(callModelmicrocompactautocompactuuid),使用 typeof realFunction 保持签名同步。这比 mock 整个模块或使用 DI 框架轻量得多。

适用场景:任何需要单元测试的核心业务逻辑。关键是只注入真正需要被替换的 I/O 边界,而不是所有依赖。


下一篇预告

第 6 篇:上下文管理 — 无限对话的秘密

我们将深入 services/compact/ 目录,揭示 Claude Code 如何通过 Microcompact、Full Compact、Session Memory Compact 等多种策略,在有限的上下文窗口内维持”无限”对话。你会看到 token 预算管理的四级告警机制、熔断器设计,以及 compact 后的上下文重建策略。


全部内容请关注 https://github.com/luyao618/Claude-Code-Source-Study (求一颗免费的小星星)