05-对话循环
第 5 篇:对话循环 — query.ts 如何驱动一次完整的 AI 交互
本篇是《深入 Claude Code 源码》系列的第 5 篇。我们将深入
query.ts这个 1729 行的核心文件,揭示一次完整的 AI 对话是如何被驱动的——从消息组装、API 调用、工具执行到错误恢复,理解这个 Agent 运行时的”心跳”。
为什么需要理解对话循环?
如果把 Claude Code 比作一个人体,那 query.ts 就是它的心脏——对话循环的编排入口。当然,心脏需要血管系统才能工作:重试逻辑在 services/api/withRetry.ts,工具执行在 services/tools/,停止钩子在 query/stopHooks.ts,环境配置在 query/config.ts。本篇会覆盖这个完整的”循环系统”,而不仅仅是 query.ts 一个文件。每一次用户提问,都会触发这个循环:
1 | 用户输入 → 组装消息 → 调用 API → 模型返回 → 执行工具 → 结果回传 → 模型继续... |
这个循环看似简单,但实际的工程复杂度远超预期。一个生产级的 AI 对话循环需要处理:
- 流式响应:模型的回复是逐 token 流回的,工具调用可能在流的中途就开始执行
- 多层压缩:对话历史可能随时超出上下文窗口,需要多种策略自动压缩
- 错误恢复:API 过载、上下文太长、输出被截断……每种错误都有专门的恢复路径
- 模型降级:主模型不可用时,自动切换到 fallback 模型
- 并发工具执行:只读工具可以并行,写入工具必须串行
本篇将从宏观到微观,层层展开这个循环的设计。
一、全局视角:AsyncGenerator 驱动的状态机
1.1 query() 的签名
query.ts 的核心是两个嵌套的 AsyncGenerator 函数:
1 | // query.ts:219-239 |
query() 是一个薄包装层,真正的逻辑在 queryLoop() 中。这个分层设计的目的是:命令生命周期通知只在正常退出时执行。如果 queryLoop() 抛出异常或被 .return() 关闭,for...of 循环不会执行——这正是预期行为,因为异常意味着命令没有成功完成。
1.2 为什么用 AsyncGenerator?
query() 返回 AsyncGenerator 而非 Promise,这是一个关键的架构决策。AsyncGenerator 让对话循环可以:
- 流式产出事件:每个中间结果(流式 token、工具执行进度、压缩通知)通过
yield逐个产出,调用方(REPL 或 SDK)实时消费 - 双向通信:调用方可以通过
.return()随时终止循环(如用户按 Ctrl+C) - 延迟计算:只在调用方拉取时才推进循环,天然的背压控制
1.3 QueryParams:对话循环的输入契约
1 | // query.ts:181-199 |
其中 QueryDeps 是一个精心设计的依赖注入接口:
1 | // query/deps.ts:21-31 |
生产环境使用 productionDeps() 返回真实实现,测试环境则注入 fake。这比 spyOn 模块 mock 更干净——callModel 和 autocompact 在 6-8 个测试文件中被 spy,依赖注入消除了这些重复样板。
1.4 State:循环的可变状态
每次迭代共享的可变状态被封装在一个 State 类型中:
1 | // query.ts:204-217 |
注意 transition 字段——它记录了上一次迭代为什么 continue。这不仅仅用于调试,还用于控制恢复逻辑:比如 collapse_drain_retry 后如果仍然 413(上下文太长),就不再重复 drain 而是 fall through 到 reactive compact。
循环中有 7+ 个 continue 站点,每个站点都通过 state = { ... } 写入新状态。但需要注意,这不是一个高度形式化的单层闭环状态机——它更像是主循环 + 若干恢复 continue 点 + 多个早退出口的混合结构。除了下表中的 continue 站点,还有 attemptWithFallback 驱动的内层 while 循环、异常路径、abort 早退(return { reason: 'aborted_streaming' })以及多种正常终止分支(return { reason: 'completed' / 'image_error' / 'prompt_too_long' / ... }):
| continue 站点 | transition.reason | 触发条件 |
|---|---|---|
| 上下文坍缩排空 | collapse_drain_retry |
prompt-too-long 时排空暂存的坍缩摘要 |
| 反应式压缩重试 | reactive_compact_retry |
413 错误触发全量压缩后重试 |
| 输出 token 升级 | max_output_tokens_escalate |
8k 默认限制命中,升级到 64k |
| 输出截断多轮恢复 | max_output_tokens_recovery |
输出被截断,注入恢复消息重试(最多 3 次) |
| Stop Hook 阻塞 | stop_hook_blocking |
停止钩子返回阻塞错误 |
| Token Budget 续行 | token_budget_continuation |
token 预算未耗尽,继续执行 |
| 工具执行后下一轮 | next_turn |
正常的工具结果回传 |
二、循环的完整时序
下面用一个 Mermaid 时序图展示一次包含工具调用的完整对话循环:
1 | sequenceDiagram |
三、消息预处理管线
在每次 API 调用之前,消息需要经过一条多阶段的预处理管线。这条管线的设计遵循一个原则:越轻量的压缩越先执行,越重的压缩越后执行。
3.1 管线各阶段
1 | // query.ts:365-468(简化) |
为什么这个顺序很重要?
- snip 和 microcompact 是本地操作,不需要 API 调用,几乎零耗时。它们先执行,可能就让 token 数降到了 autocompact 的阈值以下
- context collapse 在 autocompact 之前执行,原因是:如果坍缩就足以将 token 数降到阈值以下,就不需要 autocompact 的全量摘要,保留了更细粒度的上下文
- autocompact 最重——它需要一次完整的 API 调用来生成对话摘要
3.2 系统提示词的最终组装
1 | // query.ts:449-451 |
systemContext 被追加到系统提示词末尾,而 userContext 在调用 API 时被前置到消息数组的开头(prependUserContext(messagesForQuery, userContext))。这种分离确保了:
- systemContext(如 MCP 指令、Agent 规则)作为系统提示词的一部分,享受 prompt cache
- userContext(如会话特定的上下文)作为用户消息,不影响系统提示词的缓存命中率
四、API 调用与流式响应处理
4.1 调用模型
API 调用通过 deps.callModel() 发起。callModel 的生产实现是 queryModelWithStreaming(定义在 services/api/claude.ts:752),它本身也是一个 AsyncGenerator:
1 | // services/api/claude.ts:752-780 |
withStreamingVCR 是一个”录像带”中间件——在调试模式下录制/回放 API 响应,用于测试和问题复现。
4.2 withRetry:面向不可靠网络的重试层
在 queryModel 内部,真正的 API 调用被 withRetry 包裹。withRetry 本身也是一个 AsyncGenerator——它通过 yield 产出重试状态消息(如 “API error, retrying in 2s…”),调用方可以在 UI 上实时显示:
1 | // services/api/withRetry.ts:170-178 |
重试策略有几个关键设计:
1. 区分前台与后台查询
1 | // services/api/withRetry.ts:62-82 |
529(过载)错误只对前台查询重试。后台查询(如标题生成、工具摘要)立即放弃——在容量级联时,每次重试都是 3-10 倍的网关放大,而用户根本看不到这些后台任务的失败。
2. 指数退避 + Retry-After
1 | // services/api/withRetry.ts:530-548 |
基础延迟 500ms,指数增长到 32s 上限,加 25% 抖动。如果 API 返回了 Retry-After header,则优先遵循服务端指示。
3. 529 连续 3 次后的 Fallback 路径(有条件门槛)
1 | // services/api/withRetry.ts:326-365 |
注意这里的门槛:不是所有 529 都计入 fallback 计数器。只有在满足特定模型/订阅条件时(FALLBACK_FOR_ALL_PRIMARY_MODELS 环境变量,或者非 Claude AI 订阅用户使用非自定义 Opus 模型),529 才会递增 consecutive529Errors。源码中的 TODO 注释也暗示 isNonCustomOpusModel 检查可能是 Claude Code 早期硬编码 Opus 模型时的历史遗留。满足条件且连续 3 次后,抛出 FallbackTriggeredError,交由 queryLoop() 中的 fallback 处理逻辑接管(见下文 5.1)。
4. Persistent Retry 模式
对于无人值守会话(CLAUDE_CODE_UNATTENDED_RETRY),429/529 无限重试,退避上限 5 分钟,并每 30 秒 yield 一个心跳消息防止宿主环境判定会话空闲:
1 | // services/api/withRetry.ts:96-98 |
4.3 流式响应处理
queryLoop() 通过 for await...of 消费流式响应:
1 | // query.ts:659-863(核心流程简化) |
这段代码中有一个重要概念:暂扣(withhold)。当收到 prompt-too-long 或 max_output_tokens 等可恢复的错误时,不立即 yield 给调用方——而是等流结束后尝试恢复。如果恢复成功,调用方永远不会看到这个错误;如果恢复失败,再 yield 出去。
源码注释解释了为什么必须暂扣 max_output_tokens 错误(
query.ts:166-179):如果提前 yield 给 SDK 调用方(如 Cowork/Desktop),它们会在看到 error 字段时终止会话——而此时恢复循环可能还在成功运行。
五、错误恢复:7 层防御
5.1 Fallback 模型切换
当 withRetry 抛出 FallbackTriggeredError 时,queryLoop() 的内层 try...catch 接管:
1 | // query.ts:893-953(简化) |
注意第 4 步的 stripSignatureBlocks()——这是一个精妙的细节。Thinking 签名是模型绑定的:capybara 模型产生的 protected-thinking block 如果回放给 opus 模型会触发 400 错误。所以降级时必须剥离。
5.2 Prompt-Too-Long(413)的三级恢复
当 API 返回 prompt-too-long 错误时,恢复策略按成本递增的顺序尝试:
1 | Level 1: Context Collapse Drain(零 API 成本) |
1 | // query.ts:1085-1183(简化) |
hasAttemptedReactiveCompact 标志位防止无限循环:如果压缩后再次 413,说明压缩后的上下文仍然太长,继续压缩无意义。
5.3 Max-Output-Tokens 的两阶段恢复
当模型输出被截断时(stop_reason === 'max_output_tokens'),恢复分两步:
阶段 1:升级输出限制
如果当前使用的是 8k 默认限制(CAPPED_DEFAULT_MAX_TOKENS),先尝试升级到 64k(ESCALATED_MAX_TOKENS),不注入任何恢复消息,纯粹重试同一请求:
1 | // query.ts:1199-1221 |
阶段 2:多轮恢复
如果 64k 也不够,注入一条恢复消息让模型从断点继续,最多 3 次:
1 | // query.ts:1223-1251 |
恢复消息的措辞很有讲究——“no apology, no recap”。如果不加这个约束,模型会在每次恢复时说”抱歉,让我继续之前的工作”,浪费大量输出 token。
六、工具执行
6.1 两种执行模式
模型返回 tool_use block 后,有两种执行路径:
1 | // query.ts:1380-1382 |
| 模式 | 实现 | 特点 |
|---|---|---|
| Streaming Tool Execution | StreamingToolExecutor |
工具在 API 流式传输期间就开始执行 |
| Batch Tool Execution | runTools() |
等 API 响应完成后批量执行 |
Streaming 模式通过 Statsig Feature Gate(tengu_streaming_tool_execution2)控制。在这种模式下,每个 tool_use block 一到达就被加入执行队列:
1 | // query.ts:838-844 |
6.2 并发安全分区
无论哪种模式,工具执行都遵循相同的并发规则。toolOrchestration.ts 中的 partitionToolCalls() 将工具调用分为两类批次:
1 | // services/tools/toolOrchestration.ts:91-100 |
- Concurrent-safe 批次(如 Grep、Glob、FileRead):使用
runToolsConcurrently()并行执行,最大并发度 10 - Non-concurrent 批次(如 FileEdit、BashTool):使用
runToolsSerially()串行执行
分区算法保持工具的原始顺序——连续的 read-only 工具合并为一个并发批次,遇到写入工具就开始新的串行批次:
1 | [Grep, Glob, FileRead, FileEdit, Grep, FileRead] |
6.3 StreamingToolExecutor 的丢弃与取消机制
当发生流式降级(streaming fallback)或模型切换时,已经在执行的工具结果必须被丢弃:
1 | // services/tools/StreamingToolExecutor.ts:69-71 |
discard() 的作用是让失败尝试的工具结果整体沉没——设置标志位后,getCompletedResults() 和 getRemainingResults() 都直接返回空(StreamingToolExecutor.ts:412-415, 453-456),不再产出任何结果。调用方随后创建一个全新的 StreamingToolExecutor 实例来服务降级后的请求。
真正生成合成错误 tool_result(synthetic error message)的是另一套机制——getAbortReason()(StreamingToolExecutor.ts:210-231)。当工具执行期间检测到 abort 信号时,getAbortReason() 根据 this.discarded、this.hasErrored、abortController.signal.aborted 返回不同的取消原因(streaming_fallback、sibling_error、user_interrupted),然后 executeTool() 用这个原因创建合成的错误块(StreamingToolExecutor.ts:276-291)。
这两套机制的分工是:
discard():在流结束后调用,让已完成但未 yield 的结果沉默消失getAbortReason():在工具执行过程中检查,为正在运行或排队的工具生成符合 API 协议的 tool_result 错误块
七、附件注入:Memory、Skill、Command
工具执行完成后、下一次循环开始前,queryLoop() 会注入一系列”附件”(attachment messages):
1 | // query.ts:1580-1628(简化) |
其中 Memory 预取使用了 ES2024 的 using 关键字(显式资源管理):
1 | // query.ts:301-304 |
using 确保无论循环如何退出(正常返回、异常、.return()),预取资源都会被正确清理(dispose)。预取在循环入口处启动,在每次迭代的后处理阶段非阻塞地消费——如果预取还没完成就跳过,下一次迭代再消费。
八、QueryConfig:不可变的环境快照
queryLoop() 入口处会快照一次环境配置:
1 | // query/config.ts:15-27 |
为什么要快照?因为 Statsig Feature Gate 的值在会话期间可能变化(CACHED_MAY_BE_STALE),但对话循环的一次执行应该保持一致的行为。快照将”当前 gate 值”固化为不可变数据,避免了循环中途 gate 翻转导致的不一致问题。
注意 QueryConfig 刻意排除了 feature() gate。源码注释(query/config.ts:14)解释了原因:feature() 是编译期常量,必须保持内联在 if (feature('...')) 中,这样 Bun 的 Dead Code Elimination 才能将整个分支从构建产物中删除。如果把 feature() 的值存入 QueryConfig,就破坏了 DCE 的条件——编译器无法确定 config.gates.someFeature 是 true 还是 false。
九、feature() 在 query.ts 中的运用
query.ts 顶部有大量 feature() 门控的条件加载:
1 | // query.ts:15-21 |
这些 feature() + require() 组合确保了:
- 外部版不包含这些模块的代码——
feature()为false时,require()被 DCE 删除 - 使用
require()而非import——静态import无论条件如何都会被 bundler 纳入依赖图 - 类型安全通过
as typeof import()保留——不丢失 TypeScript 类型信息
在 query.ts 中,通过 feature() 门控的模块有:
| feature gate | 模块 | 功能 |
|---|---|---|
REACTIVE_COMPACT |
reactiveCompact.js |
413 错误后的反应式压缩 |
CONTEXT_COLLAPSE |
contextCollapse/index.js |
上下文坍缩(分段摘要) |
HISTORY_SNIP |
snipCompact.js |
历史消息裁剪 |
EXPERIMENTAL_SKILL_SEARCH |
skillSearch/prefetch.js |
技能发现预取 |
TEMPLATES |
jobs/classifier.js |
模板任务分类 |
BG_SESSIONS |
taskSummary.js |
后台会话摘要 |
TOKEN_BUDGET |
tokenBudget.js |
Token 预算控制 |
十、可迁移的设计模式
模式 1:AsyncGenerator 状态机
用 while(true) + yield + state = { ..., transition } 实现显式状态机。transition 字段记录跳转原因,既方便调试,又能在后续迭代中根据前一次的跳转原因决定行为(如避免重复恢复)。
适用场景:任何需要多轮交互、错误恢复、可中断的长运行任务。相比递归调用(早期 Claude Code 就是递归版),while(true) 状态机没有栈溢出风险,且 State 的所有字段一目了然。
模式 2:暂扣-恢复(Withhold-Recover)
在流式处理中,对可恢复的错误不立即向调用方报告——先暂扣,尝试恢复。恢复成功则调用方无感知;恢复失败再暴露错误。这避免了下游消费者对中间状态的错误反应。
适用场景:任何流式 API 的消费层。例如前端 WebSocket 消息处理、数据管道中的错误重试等。
模式 3:依赖注入的最小接口
QueryDeps 只有 4 个方法(callModel、microcompact、autocompact、uuid),使用 typeof realFunction 保持签名同步。这比 mock 整个模块或使用 DI 框架轻量得多。
适用场景:任何需要单元测试的核心业务逻辑。关键是只注入真正需要被替换的 I/O 边界,而不是所有依赖。
下一篇预告
我们将深入 services/compact/ 目录,揭示 Claude Code 如何通过 Microcompact、Full Compact、Session Memory Compact 等多种策略,在有限的上下文窗口内维持”无限”对话。你会看到 token 预算管理的四级告警机制、熔断器设计,以及 compact 后的上下文重建策略。
全部内容请关注 https://github.com/luyao618/Claude-Code-Source-Study (求一颗免费的小星星)