主题
可观测性:Tracing 与 Token 用量
本章目标:
- 讲清 DeerFlow 如何同时接入 LangSmith 与 Langfuse 两套 tracing provider:在 model 创建时挂载 callback、凭证缺失时 fail fast。
- 拆解
TokenUsageMiddleware如何记录 LLM token 用量,并把子代理(subagent)用量按tool_call_id缓存、再按 message 位置归并回 dispatchingAIMessage。- 说明
RunJournal的 caller 分桶累计与 Gatewaytoken-usage聚合端点如何把单次 run 的用量沉淀为 thread 级报表。
TL;DR
DeerFlow 在 create_chat_model() 内部统一调用 build_tracing_callbacks(),把所有"显式启用且凭证完整"的 provider 的 LangChain callback 追加到模型实例上;LangSmith 与 Langfuse 可同时启用,任一被显式开启却缺凭证会直接抛 ValueError 而非静默降级。Token 计量走两条互补链路:TokenUsageMiddleware 在 after_model 钩子中读取 usage_metadata 并写步骤归因(attribution);子代理的 token 由 SubagentTokenCollector 在 subagent 进程内收集,经 task 工具按 tool_call_id 缓存,再由 TokenUsageMiddleware 按 message 位置归并回发起委派的那条 AIMessage。RunJournal 按 caller(lead_agent / subagent / middleware)分桶累计,最终由 Gateway GET /api/threads/{id}/token-usage 做 thread 级聚合。
Overview
为什么一个 agent harness 必须内建 tracing 与精确的 token 归并,而不是交给上层应用去做?
一次 DeerFlow 对话不是单次 LLM 调用,而是一棵执行树:lead agent 多轮工具调用、若干并发 subagent 各自跑独立的 LangGraph、加上 title/memory 等 middleware 内部的 LLM 调用。每个节点都在烧 token,但它们分散在不同的 LangChain run、不同的 Python 线程、不同的回调上下文里。如果只在最外层累加,会丢掉 subagent 的用量;如果简单按 run_id 累加,LangChain 对同一响应可能多次触发 on_llm_end 导致重复计数 journal.py:283。因此 harness 必须自己持有归并逻辑,而不能把它推给上层。
更棘手的是"归属"问题:前端要展示"这条 AI 消息(发起了一次 task 委派)花了多少 token,其中 subagent 占多少"。subagent 的用量在它自己跑完时才知道,而那时 dispatching AIMessage 早已生成并入库。DeerFlow 的解法是:subagent 用量按 tool_call_id 缓存,等到下一轮 TokenUsageMiddleware 触发时,从尾部 ToolMessage 反向定位到带该 tool_call_id 的 AIMessage,把 token 合并写回它的 usage_metadata token_usage_middleware.py:275-314。
tracing 同理:把 callback 挂在 model 创建处而非 graph 入口,意味着 lead agent、subagent、middleware 用 create_chat_model() 拿到的每个模型实例都自动带上 tracing,无需在每个调用点手工注入 factory.py:152-156。
Architecture
可观测性由两个相互独立又在 model 工厂处交汇的子系统组成:
- Tracing 双 provider:
tracing/factory.py读取tracing_config.py解析出的TracingConfig,为每个"enabled 且 configured"的 provider 构造一个 LangChain callback,在create_chat_model()末尾追加到模型实例。 - Token 计量链路:
SubagentTokenCollector(子代理进程内收集)→task工具(按tool_call_id缓存 + 上报RunJournal)→TokenUsageMiddleware(归并回AIMessage+ 记录步骤归因)→RunJournal(按 caller 分桶累计)→ Gateway 聚合端点(thread 级报表)。
Source:
- backend/packages/harness/deerflow/tracing/factory.py — 双 provider callback 构造
- backend/packages/harness/deerflow/config/tracing_config.py — 环境变量解析与 fail-fast 校验
- backend/packages/harness/deerflow/models/factory.py — model 创建处挂 callback
- backend/packages/harness/deerflow/agents/middlewares/token_usage_middleware.py — token 归并 + 步骤归因
- backend/packages/harness/deerflow/subagents/token_collector.py — 子代理 token 收集
- backend/packages/harness/deerflow/tools/builtins/task_tool.py —
tool_call_id缓存 - backend/packages/harness/deerflow/runtime/journal.py — caller 分桶累计
- backend/app/gateway/routers/thread_runs.py — thread 级聚合端点
Components / Subsystems
Tracing provider 工厂
职责:为每个显式启用且凭证完整的 tracing provider 构造一个 LangChain callback;启用但缺凭证时 fail fast。
关键函数:build_tracing_callbacks() 先调用 validate_enabled_tracing_providers() 校验,再取 get_enabled_tracing_providers()(仅返回 is_configured 的 provider),逐个构造 callback;构造失败时包成 RuntimeError 抛出而非吞掉 factory.py:32-54。LangSmith 用 LangChainTracer(project_name=...) factory.py:12-15;Langfuse 先用 Langfuse(secret_key, public_key, host) 初始化客户端单例,再返回 LangfuseCallbackHandler(public_key=...) 挂到该客户端 factory.py:18-29。
配置类:TracingConfig 持有 LangSmithTracingConfig 与 LangfuseTracingConfig;enabled_providers 属性按 is_configured(enabled + 凭证齐全)返回可用 provider 列表,两者可同时进入列表 tracing_config.py:50-76。get_tracing_config() 用双重检查锁缓存单例,从环境变量解析 tracing_config.py:107-129。挂载点在 create_chat_model() 末尾,把 callback 追加到 model_instance.callbacks factory.py:152-156。
TokenUsageMiddleware
职责:在 after_model / aafter_model 钩子中,(1) 把子代理用量归并回 dispatching AIMessage 的 usage_metadata,(2) 记录最新 AIMessage 的 token 日志,(3) 给该消息写步骤归因 token_usage_attribution(供前端展示哪一步花了多少 token)。
关键类:TokenUsageMiddleware(AgentMiddleware),核心逻辑在 _apply();after_model / aafter_model 均委托给它 token_usage_middleware.py:267-358。_has_tool_call() 判断某 AIMessage 是否含指定 tool_call_id,用于反向定位 dispatching 消息 token_usage_middleware.py:220-228。_build_attribution() 把每个 tool_call 描述成 action(subagent / search / todo / tool 等)并推断步骤类型 token_usage_middleware.py:231-264。该中间件仅在 token_usage.enabled 为真时挂入链路 agent.py:279-281。
SubagentTokenCollector 与 token_collector 缓存
职责:SubagentTokenCollector 是挂在 subagent LangGraph 上的轻量回调,逐个 on_llm_end 累计该 subagent 内部 LLM 调用的 token,并按 run_id 去重防止重复计数;subagent 结束后记录被转交给父 RunJournal token_collector.py:24-59。
关键类:SubagentTokenCollector(BaseCallbackHandler),caller 形如 subagent:{name},snapshot_records() 返回累计记录副本 token_collector.py:15-63。executor 在每次 subagent 执行时新建一个 collector,放进 run_config["callbacks"] 并附 tags=[collector_caller] executor.py:431-445。task 工具侧维护 _subagent_usage_cache: dict[str, dict[str, int]],以 tool_call_id 为键缓存用量,pop_cached_subagent_usage() 供 middleware 取走 task_tool.py:29-49。_cache_subagent_usage(..., enabled=cache_token_usage) 只在 token usage 启用时写缓存,subagent 在每个终态(完成/失败/取消/超时)都会调用一次 task_tool.py:350-377。
Data Flow
下图展示两条关键流程:model 创建时挂 tracing callback;subagent token 按 tool_call_id 缓存后按 message 位置归并回主 AIMessage。
归并的关键约束:一次模型响应可能并发派发多个 task,因此不能假设固定偏移。_apply() 从 messages[-2] 起向前走连续的 ToolMessage;对每个带缓存用量的 ToolMessage,再从它前一条起向前找到带相同 tool_call_id 的 AIMessage 作为 dispatch 目标;若同一条 AIMessage 已有累加(多 task),则在已有 update 上继续累加而非覆盖 token_usage_middleware.py:282-314。所有改动按 message 位置(dispatch_idx)排序后一次性作为 {"messages": [...]} 返回 token_usage_middleware.py:345-350。
Implementation Details
Tracing fail-fast 校验
启用但缺凭证会在 validate() 中直接抛 ValueError,而不是静默关闭 tracing —— 避免"以为开了 tracing 实际没数据"的隐性故障 tracing_config.py:38-47:
python
def validate(self) -> None:
if not self.enabled:
return
missing: list[str] = []
if not self.public_key:
missing.append("LANGFUSE_PUBLIC_KEY")
if not self.secret_key:
missing.append("LANGFUSE_SECRET_KEY")
if missing:
raise ValueError(f"Langfuse tracing is enabled but required settings are missing: {', '.join(missing)}")build_tracing_callbacks() 第一步就调 validate_enabled_tracing_providers(),所以任何用 create_chat_model() 创建模型的路径都会在缺凭证时立刻报错 factory.py:32-35。
subagent token 归并回 AIMessage
合并时优先取该 AIMessage 已有的 update(同响应多 task 累加),否则取原消息的 usage_metadata,再加上 subagent 三项 token token_usage_middleware.py:303-311:
python
existing_update = state_updates.get(dispatch_idx)
prev = existing_update.usage_metadata if existing_update else (getattr(candidate, "usage_metadata", None) or {})
merged = {
**prev,
"input_tokens": prev.get("input_tokens", 0) + subagent_usage["input_tokens"],
"output_tokens": prev.get("output_tokens", 0) + subagent_usage["output_tokens"],
"total_tokens": prev.get("total_tokens", 0) + subagent_usage["total_tokens"],
}
state_updates[dispatch_idx] = candidate.model_copy(update={"usage_metadata": merged})注意是 model_copy 生成新消息而非原地改,且按 message 位置(索引)归并而非 message id —— 与中间件链文档中"按 message position 而非 message id 归并"的描述一致。
速查表
LangSmith / Langfuse 配置项矩阵
| Provider | 启用开关(env) | 凭证 env | 其它 env(默认值) | Source |
|---|---|---|---|---|
| LangSmith | LANGSMITH_TRACING / LANGCHAIN_TRACING_V2 / LANGCHAIN_TRACING | LANGSMITH_API_KEY / LANGCHAIN_API_KEY | LANGSMITH_PROJECT(默认 deer-flow)、LANGSMITH_ENDPOINT(默认 https://api.smith.langchain.com) | tracing_config.py:116-121 |
| Langfuse | LANGFUSE_TRACING | LANGFUSE_PUBLIC_KEY + LANGFUSE_SECRET_KEY | LANGFUSE_BASE_URL(默认 https://cloud.langfuse.com) | tracing_config.py:122-127 |
Token 计量点矩阵
| 计量点 | 触发时机 | 去重键 | caller 归桶 | Source |
|---|---|---|---|---|
RunJournal.on_llm_end | 每次主链路 LLM 响应 | LangChain run_id | tag → lead_agent / subagent / middleware | journal.py:277-295 |
SubagentTokenCollector.on_llm_end | subagent 进程内每次 LLM 响应 | run_id(_counted_run_ids) | 固定 subagent:{name} | token_collector.py:32-59 |
RunJournal.record_external_llm_usage_records | subagent 终态上报 | source_run_id(_counted_external_source_ids) | record 的 caller 字段 | journal.py:418-446 |
TokenUsageMiddleware._apply | 每次 after_model | additional_kwargs[attribution] 相等则跳过 | 归并写回 AIMessage.usage_metadata | token_usage_middleware.py:316-350 |
Configuration
Tracing 环境变量
| Env | 含义 | 安全 | Source |
|---|---|---|---|
LANGSMITH_TRACING(或 LANGCHAIN_TRACING_V2 / LANGCHAIN_TRACING) | 取值为 1/true/yes/on 之一即视为开启;按出现顺序取第一个非空值 | — | tracing_config.py:86-95 |
LANGSMITH_API_KEY(或 LANGCHAIN_API_KEY) | LangSmith API key | 密钥,勿提交;启用却缺失将抛 ValueError | tracing_config.py:21-23 |
LANGSMITH_PROJECT / LANGSMITH_ENDPOINT | 项目名 / 端点,缺省 deer-flow 与官方 endpoint | — | tracing_config.py:119-120 |
LANGFUSE_TRACING | Langfuse 开启开关 | — | tracing_config.py:123 |
LANGFUSE_PUBLIC_KEY / LANGFUSE_SECRET_KEY | Langfuse 凭证对 | SECRET_KEY 为密钥,勿提交;启用却任一缺失将抛 ValueError | tracing_config.py:124-125 |
LANGFUSE_BASE_URL | Langfuse host,缺省 https://cloud.langfuse.com | — | tracing_config.py:126 |
.env.example 中 LangSmith 段全部默认注释掉(即默认关闭 tracing).env.example:33-37。
Token usage 配置
config.yaml 经 TokenUsageConfig 解析,enabled 默认 True,在 AppConfig.token_usage 上以 default_factory 暴露 token_usage_config.py:4-7 app_config.py:87。它同时门控两处:TokenUsageMiddleware 是否挂入链路 agent.py:279-281,以及 subagent 用量是否写 _subagent_usage_cache task_tool.py:34-45。
Gateway thread 级聚合端点
GET /api/threads/{thread_id}/token-usage(需 threads:read 权限、owner 校验)调用 run_store.aggregate_tokens_by_thread(),只统计 status 为 success/error 的已完成 run,返回 total_tokens、total_input_tokens、total_output_tokens、total_runs、by_model、by_caller(lead_agent/subagent/middleware) thread_runs.py:394-400 memory.py:86-105。SQL 后端用单条 GROUP BY model_name 查询实现同样的聚合 sql.py:222-257。单次 run 的累计来自 RunJournal.get_completion_data() 返回的 caller 分桶字段 journal.py:487-500。
Common Pitfalls/Tips
- 缺凭证 fail fast,不是静默降级:
LANGSMITH_TRACING=true但没设LANGSMITH_API_KEY会抛LangSmith tracing is enabled but LANGSMITH_API_KEY (or LANGCHAIN_API_KEY) is not set.;Langfuse 同理。先想"开了 tracing 但没数据"会被立刻报错挡住 tracing_config.py:21-23 tracing_config.py:38-47。 - tracing 默认关闭:
.env.example把 LangSmith 段整段注释,docker-compose 文件中未发现LANGSMITH_*/LANGFUSE_*注入;若要在容器里开 tracing 需自行通过环境变量传入 .env.example:33-37。 - 两可同时启用:LangSmith 与 Langfuse 互不排斥,
enabled_providers会同时返回两者,build_tracing_callbacks()把两个 callback 都挂上 tracing_config.py:70-76。 - token 重复计数防护靠去重键:LangChain 可能对同一响应多次触发
on_llm_end,RunJournal按run_id去重、external records 按source_run_id去重;改动这两处时务必保留去重集合 journal.py:283 journal.py:424-425。 - 关闭 token usage 会同时关掉缓存与归并:
token_usage.enabled=false时_subagent_usage_cache不写入,TokenUsageMiddleware也不挂链,subagent 用量将无法归并回AIMessagetask_tool.py:43-45 agent.py:279-281。 - OpenAI 兼容网关需
stream_usage:LangChain 仅在无自定义 base_url 时默认开stream_usage,DeerFlow 在create_chat_model()里为 OpenAI 兼容端点默认补上,否则流式响应没有usage_metadata,token 计量会全空 factory.py:34-47 factory.py:141-148。 - 缓存在异常/取消时会清理:
task工具在CancelledError与一般Exception分支都会_subagent_usage_cache.pop(tool_call_id, None),避免泄漏未归并的脏缓存 task_tool.py:418-422。
References
- backend/packages/harness/deerflow/tracing/factory.py — 双 provider callback 工厂与 fail-fast 包装
- backend/packages/harness/deerflow/config/tracing_config.py — TracingConfig 与环境变量解析/校验
- backend/packages/harness/deerflow/models/factory.py — model 创建处挂 tracing callback、stream_usage 默认开启
- backend/packages/harness/deerflow/agents/middlewares/token_usage_middleware.py — token 归并与步骤归因
- backend/packages/harness/deerflow/subagents/token_collector.py — subagent 进程内 token 收集器
- backend/packages/harness/deerflow/tools/builtins/task_tool.py —
tool_call_id缓存与终态上报 - backend/packages/harness/deerflow/runtime/journal.py — caller 分桶累计与 completion data
- backend/app/gateway/routers/thread_runs.py — thread 级 token-usage 聚合端点
- backend/packages/harness/deerflow/config/token_usage_config.py — token usage 开关配置
Related Pages
| 章节 | 关系 |
|---|---|
| 12-中间件链机制 | TokenUsageMiddleware 在 17 个中间件链中的挂载顺序与门控条件 |
| 19-子代理委派系统 | subagent 执行引擎与 task 工具,本章 token 缓存的上游来源 |
| 05-模型配置与Model工厂 | create_chat_model() 工厂,本章 tracing callback 与 stream_usage 的挂载点 |
| 15-Runtime运行时与StreamBridge | RunJournal 所在的运行时层,token 分桶累计与 run 持久化 |