Skip to content

可观测性:Tracing 与 Token 用量

本章目标:

  1. 讲清 DeerFlow 如何同时接入 LangSmith 与 Langfuse 两套 tracing provider:在 model 创建时挂载 callback、凭证缺失时 fail fast。
  2. 拆解 TokenUsageMiddleware 如何记录 LLM token 用量,并把子代理(subagent)用量按 tool_call_id 缓存、再按 message 位置归并回 dispatching AIMessage
  3. 说明 RunJournal 的 caller 分桶累计与 Gateway token-usage 聚合端点如何把单次 run 的用量沉淀为 thread 级报表。

TL;DR

DeerFlow 在 create_chat_model() 内部统一调用 build_tracing_callbacks(),把所有"显式启用且凭证完整"的 provider 的 LangChain callback 追加到模型实例上;LangSmith 与 Langfuse 可同时启用,任一被显式开启却缺凭证会直接抛 ValueError 而非静默降级。Token 计量走两条互补链路:TokenUsageMiddlewareafter_model 钩子中读取 usage_metadata 并写步骤归因(attribution);子代理的 token 由 SubagentTokenCollector 在 subagent 进程内收集,经 task 工具按 tool_call_id 缓存,再由 TokenUsageMiddleware 按 message 位置归并回发起委派的那条 AIMessageRunJournal 按 caller(lead_agent / subagent / middleware)分桶累计,最终由 Gateway GET /api/threads/{id}/token-usage 做 thread 级聚合。

Overview

为什么一个 agent harness 必须内建 tracing 与精确的 token 归并,而不是交给上层应用去做?

一次 DeerFlow 对话不是单次 LLM 调用,而是一棵执行树:lead agent 多轮工具调用、若干并发 subagent 各自跑独立的 LangGraph、加上 title/memory 等 middleware 内部的 LLM 调用。每个节点都在烧 token,但它们分散在不同的 LangChain run、不同的 Python 线程、不同的回调上下文里。如果只在最外层累加,会丢掉 subagent 的用量;如果简单按 run_id 累加,LangChain 对同一响应可能多次触发 on_llm_end 导致重复计数 journal.py:283。因此 harness 必须自己持有归并逻辑,而不能把它推给上层。

更棘手的是"归属"问题:前端要展示"这条 AI 消息(发起了一次 task 委派)花了多少 token,其中 subagent 占多少"。subagent 的用量在它自己跑完时才知道,而那时 dispatching AIMessage 早已生成并入库。DeerFlow 的解法是:subagent 用量按 tool_call_id 缓存,等到下一轮 TokenUsageMiddleware 触发时,从尾部 ToolMessage 反向定位到带该 tool_call_idAIMessage,把 token 合并写回它的 usage_metadata token_usage_middleware.py:275-314

tracing 同理:把 callback 挂在 model 创建处而非 graph 入口,意味着 lead agent、subagent、middleware 用 create_chat_model() 拿到的每个模型实例都自动带上 tracing,无需在每个调用点手工注入 factory.py:152-156

Architecture

可观测性由两个相互独立又在 model 工厂处交汇的子系统组成:

  • Tracing 双 provider:tracing/factory.py 读取 tracing_config.py 解析出的 TracingConfig,为每个"enabled 且 configured"的 provider 构造一个 LangChain callback,在 create_chat_model() 末尾追加到模型实例。
  • Token 计量链路:SubagentTokenCollector(子代理进程内收集)→ task 工具(按 tool_call_id 缓存 + 上报 RunJournal)→ TokenUsageMiddleware(归并回 AIMessage + 记录步骤归因)→ RunJournal(按 caller 分桶累计)→ Gateway 聚合端点(thread 级报表)。

Source:

Components / Subsystems

Tracing provider 工厂

职责:为每个显式启用且凭证完整的 tracing provider 构造一个 LangChain callback;启用但缺凭证时 fail fast。

关键函数:build_tracing_callbacks() 先调用 validate_enabled_tracing_providers() 校验,再取 get_enabled_tracing_providers()(仅返回 is_configured 的 provider),逐个构造 callback;构造失败时包成 RuntimeError 抛出而非吞掉 factory.py:32-54。LangSmith 用 LangChainTracer(project_name=...) factory.py:12-15;Langfuse 先用 Langfuse(secret_key, public_key, host) 初始化客户端单例,再返回 LangfuseCallbackHandler(public_key=...) 挂到该客户端 factory.py:18-29

配置类:TracingConfig 持有 LangSmithTracingConfigLangfuseTracingConfig;enabled_providers 属性按 is_configured(enabled + 凭证齐全)返回可用 provider 列表,两者可同时进入列表 tracing_config.py:50-76get_tracing_config() 用双重检查锁缓存单例,从环境变量解析 tracing_config.py:107-129。挂载点在 create_chat_model() 末尾,把 callback 追加到 model_instance.callbacks factory.py:152-156

TokenUsageMiddleware

职责:在 after_model / aafter_model 钩子中,(1) 把子代理用量归并回 dispatching AIMessageusage_metadata,(2) 记录最新 AIMessage 的 token 日志,(3) 给该消息写步骤归因 token_usage_attribution(供前端展示哪一步花了多少 token)。

关键类:TokenUsageMiddleware(AgentMiddleware),核心逻辑在 _apply();after_model / aafter_model 均委托给它 token_usage_middleware.py:267-358_has_tool_call() 判断某 AIMessage 是否含指定 tool_call_id,用于反向定位 dispatching 消息 token_usage_middleware.py:220-228_build_attribution() 把每个 tool_call 描述成 action(subagent / search / todo / tool 等)并推断步骤类型 token_usage_middleware.py:231-264。该中间件仅在 token_usage.enabled 为真时挂入链路 agent.py:279-281

SubagentTokenCollector 与 token_collector 缓存

职责:SubagentTokenCollector 是挂在 subagent LangGraph 上的轻量回调,逐个 on_llm_end 累计该 subagent 内部 LLM 调用的 token,并按 run_id 去重防止重复计数;subagent 结束后记录被转交给父 RunJournal token_collector.py:24-59

关键类:SubagentTokenCollector(BaseCallbackHandler),caller 形如 subagent:{name},snapshot_records() 返回累计记录副本 token_collector.py:15-63。executor 在每次 subagent 执行时新建一个 collector,放进 run_config["callbacks"] 并附 tags=[collector_caller] executor.py:431-445task 工具侧维护 _subagent_usage_cache: dict[str, dict[str, int]],以 tool_call_id 为键缓存用量,pop_cached_subagent_usage() 供 middleware 取走 task_tool.py:29-49_cache_subagent_usage(..., enabled=cache_token_usage) 只在 token usage 启用时写缓存,subagent 在每个终态(完成/失败/取消/超时)都会调用一次 task_tool.py:350-377

Data Flow

下图展示两条关键流程:model 创建时挂 tracing callback;subagent token 按 tool_call_id 缓存后按 message 位置归并回主 AIMessage

归并的关键约束:一次模型响应可能并发派发多个 task,因此不能假设固定偏移。_apply()messages[-2] 起向前走连续的 ToolMessage;对每个带缓存用量的 ToolMessage,再从它前一条起向前找到带相同 tool_call_idAIMessage 作为 dispatch 目标;若同一条 AIMessage 已有累加(多 task),则在已有 update 上继续累加而非覆盖 token_usage_middleware.py:282-314。所有改动按 message 位置(dispatch_idx)排序后一次性作为 {"messages": [...]} 返回 token_usage_middleware.py:345-350

Implementation Details

Tracing fail-fast 校验

启用但缺凭证会在 validate() 中直接抛 ValueError,而不是静默关闭 tracing —— 避免"以为开了 tracing 实际没数据"的隐性故障 tracing_config.py:38-47:

python
def validate(self) -> None:
    if not self.enabled:
        return
    missing: list[str] = []
    if not self.public_key:
        missing.append("LANGFUSE_PUBLIC_KEY")
    if not self.secret_key:
        missing.append("LANGFUSE_SECRET_KEY")
    if missing:
        raise ValueError(f"Langfuse tracing is enabled but required settings are missing: {', '.join(missing)}")

build_tracing_callbacks() 第一步就调 validate_enabled_tracing_providers(),所以任何用 create_chat_model() 创建模型的路径都会在缺凭证时立刻报错 factory.py:32-35

subagent token 归并回 AIMessage

合并时优先取该 AIMessage 已有的 update(同响应多 task 累加),否则取原消息的 usage_metadata,再加上 subagent 三项 token token_usage_middleware.py:303-311:

python
existing_update = state_updates.get(dispatch_idx)
prev = existing_update.usage_metadata if existing_update else (getattr(candidate, "usage_metadata", None) or {})
merged = {
    **prev,
    "input_tokens": prev.get("input_tokens", 0) + subagent_usage["input_tokens"],
    "output_tokens": prev.get("output_tokens", 0) + subagent_usage["output_tokens"],
    "total_tokens": prev.get("total_tokens", 0) + subagent_usage["total_tokens"],
}
state_updates[dispatch_idx] = candidate.model_copy(update={"usage_metadata": merged})

注意是 model_copy 生成新消息而非原地改,且按 message 位置(索引)归并而非 message id —— 与中间件链文档中"按 message position 而非 message id 归并"的描述一致。

速查表

LangSmith / Langfuse 配置项矩阵

Provider启用开关(env)凭证 env其它 env(默认值)Source
LangSmithLANGSMITH_TRACING / LANGCHAIN_TRACING_V2 / LANGCHAIN_TRACINGLANGSMITH_API_KEY / LANGCHAIN_API_KEYLANGSMITH_PROJECT(默认 deer-flow)、LANGSMITH_ENDPOINT(默认 https://api.smith.langchain.com)tracing_config.py:116-121
LangfuseLANGFUSE_TRACINGLANGFUSE_PUBLIC_KEY + LANGFUSE_SECRET_KEYLANGFUSE_BASE_URL(默认 https://cloud.langfuse.com)tracing_config.py:122-127

Token 计量点矩阵

计量点触发时机去重键caller 归桶Source
RunJournal.on_llm_end每次主链路 LLM 响应LangChain run_idtag → lead_agent / subagent / middlewarejournal.py:277-295
SubagentTokenCollector.on_llm_endsubagent 进程内每次 LLM 响应run_id(_counted_run_ids)固定 subagent:{name}token_collector.py:32-59
RunJournal.record_external_llm_usage_recordssubagent 终态上报source_run_id(_counted_external_source_ids)record 的 caller 字段journal.py:418-446
TokenUsageMiddleware._apply每次 after_modeladditional_kwargs[attribution] 相等则跳过归并写回 AIMessage.usage_metadatatoken_usage_middleware.py:316-350

Configuration

Tracing 环境变量

Env含义安全Source
LANGSMITH_TRACING(或 LANGCHAIN_TRACING_V2 / LANGCHAIN_TRACING)取值为 1/true/yes/on 之一即视为开启;按出现顺序取第一个非空值tracing_config.py:86-95
LANGSMITH_API_KEY(或 LANGCHAIN_API_KEY)LangSmith API key密钥,勿提交;启用却缺失将抛 ValueErrortracing_config.py:21-23
LANGSMITH_PROJECT / LANGSMITH_ENDPOINT项目名 / 端点,缺省 deer-flow 与官方 endpointtracing_config.py:119-120
LANGFUSE_TRACINGLangfuse 开启开关tracing_config.py:123
LANGFUSE_PUBLIC_KEY / LANGFUSE_SECRET_KEYLangfuse 凭证对SECRET_KEY 为密钥,勿提交;启用却任一缺失将抛 ValueErrortracing_config.py:124-125
LANGFUSE_BASE_URLLangfuse host,缺省 https://cloud.langfuse.comtracing_config.py:126

.env.example 中 LangSmith 段全部默认注释掉(即默认关闭 tracing).env.example:33-37

Token usage 配置

config.yamlTokenUsageConfig 解析,enabled 默认 True,在 AppConfig.token_usage 上以 default_factory 暴露 token_usage_config.py:4-7 app_config.py:87。它同时门控两处:TokenUsageMiddleware 是否挂入链路 agent.py:279-281,以及 subagent 用量是否写 _subagent_usage_cache task_tool.py:34-45

Gateway thread 级聚合端点

GET /api/threads/{thread_id}/token-usage(需 threads:read 权限、owner 校验)调用 run_store.aggregate_tokens_by_thread(),只统计 statussuccess/error 的已完成 run,返回 total_tokenstotal_input_tokenstotal_output_tokenstotal_runsby_modelby_caller(lead_agent/subagent/middleware) thread_runs.py:394-400 memory.py:86-105。SQL 后端用单条 GROUP BY model_name 查询实现同样的聚合 sql.py:222-257。单次 run 的累计来自 RunJournal.get_completion_data() 返回的 caller 分桶字段 journal.py:487-500

Common Pitfalls/Tips

  • 缺凭证 fail fast,不是静默降级:LANGSMITH_TRACING=true 但没设 LANGSMITH_API_KEY 会抛 LangSmith tracing is enabled but LANGSMITH_API_KEY (or LANGCHAIN_API_KEY) is not set.;Langfuse 同理。先想"开了 tracing 但没数据"会被立刻报错挡住 tracing_config.py:21-23 tracing_config.py:38-47
  • tracing 默认关闭:.env.example 把 LangSmith 段整段注释,docker-compose 文件中未发现 LANGSMITH_*/LANGFUSE_* 注入;若要在容器里开 tracing 需自行通过环境变量传入 .env.example:33-37
  • 两可同时启用:LangSmith 与 Langfuse 互不排斥,enabled_providers 会同时返回两者,build_tracing_callbacks() 把两个 callback 都挂上 tracing_config.py:70-76
  • token 重复计数防护靠去重键:LangChain 可能对同一响应多次触发 on_llm_end,RunJournalrun_id 去重、external records 按 source_run_id 去重;改动这两处时务必保留去重集合 journal.py:283 journal.py:424-425
  • 关闭 token usage 会同时关掉缓存与归并:token_usage.enabled=false_subagent_usage_cache 不写入,TokenUsageMiddleware 也不挂链,subagent 用量将无法归并回 AIMessage task_tool.py:43-45 agent.py:279-281
  • OpenAI 兼容网关需 stream_usage:LangChain 仅在无自定义 base_url 时默认开 stream_usage,DeerFlow 在 create_chat_model() 里为 OpenAI 兼容端点默认补上,否则流式响应没有 usage_metadata,token 计量会全空 factory.py:34-47 factory.py:141-148
  • 缓存在异常/取消时会清理:task 工具在 CancelledError 与一般 Exception 分支都会 _subagent_usage_cache.pop(tool_call_id, None),避免泄漏未归并的脏缓存 task_tool.py:418-422

References

章节关系
12-中间件链机制TokenUsageMiddleware 在 17 个中间件链中的挂载顺序与门控条件
19-子代理委派系统subagent 执行引擎与 task 工具,本章 token 缓存的上游来源
05-模型配置与Model工厂create_chat_model() 工厂,本章 tracing callback 与 stream_usage 的挂载点
15-Runtime运行时与StreamBridgeRunJournal 所在的运行时层,token 分桶累计与 run 持久化

公司内部参考 · 由 claude-wiki-gen 基于源码自动生成的二次分析