Skip to content

ThreadState 与状态管理

本章目标:

  1. 讲清 ThreadState 为什么要在 LangGraph AgentState 之上扩展 sandbox / thread_data / title / artifacts / todos / uploaded_files / viewed_images 七个业务字段。
  2. 拆解两个自定义 reducer——merge_artifacts(去重合并)与 merge_viewed_images(合并 + 空字典清空)——的语义与边界。
  3. 还原一次回合中各中间件与工具如何按 state schema 约定读写这些字段并跨节点流转。

TL;DR

ThreadState 是 DeerFlow lead agent 的图状态 schema,继承自 LangChain 的 AgentState,额外声明七个 NotRequired 字段承载会话级业务上下文 thread_state.py:48-55。其中 artifactsviewed_imagesAnnotated[..., reducer] 绑定自定义 reducer,使并行工具调用的更新可被无冲突合并(去重 / 清空)而非互相覆盖 thread_state.py:52-55。中间件通过各自定义的 state_schema(均与 ThreadState 兼容)在 before_agent / before_model / after_model 钩子读写这些字段,工具则经 Command(update=...) 写回。ThreadState 被注册为 create_agent(..., state_schema=ThreadState) 的图状态类型 agent.py:426

Overview

LangGraph 的图状态本质是一个带 reducer 的 TypedDict:每个键可选地用 Annotated[Type, reducer] 声明合并函数,节点返回的局部更新会按键经 reducer 与旧值合并。LangChain 的 AgentState 已提供 messages(add_messages reducer)等基础键。DeerFlow 的 lead agent 不止做"消息进、消息出",还需要在整条对话生命周期内携带:沙箱句柄、每线程数据目录、自动生成的标题、产出物清单、待办列表、上传文件元数据、已查看图片的 base64。这些数据不是消息,却需要在多个中间件、多个工具、多个回合之间共享——因此必须进入图状态而非靠局部变量传递。

为什么要扩展 AgentState 而非另开通道:中间件钩子(before_agent/before_model/after_model)和工具(Command(update=...))只能读写图状态。把业务字段并入同一个 schema,它们才能被持久化(checkpointer)、被流式快照(stream_mode=["values"])、被跨回合恢复 thread_state.py:48-55

为什么需要自定义 reducer:present_filesview_image 工具可与其他工具并行执行(同一回合多个 tool call)。若用默认"后写覆盖"语义,后返回的工具会清掉前一个工具写入的 artifactsmerge_artifacts 将两侧列表拼接并保序去重,保证并行工具产出的文件路径不丢失;merge_viewed_images 则用字典合并,并保留一个特殊约定:中间件写入 {} 表示"已处理完毕,清空图片缓存",避免 base64 数据无限累积进入后续 LLM 调用 thread_state.py:21-45

Architecture

ThreadState 是整个 lead agent 子系统的"数据契约"。其结构分三层:基类 AgentState(LangChain 提供)、扩展字段(七个业务键)、自定义 reducer(两个合并函数)。每个中间件不直接复用 ThreadState,而是定义一个仅声明自己关心字段的子 schema(注释标明"Compatible with the ThreadState schema"),LangGraph 在装配图时按字段名归并这些 schema——这是 LangGraph state schema 机制允许的"多 schema 叠加"。

Source 列表:

  • backend/packages/harness/deerflow/agents/thread_state.py —— ThreadStateSandboxStateThreadDataStateViewedImageData、两个 reducer 的唯一定义处 thread_state.py:1-56
  • backend/packages/harness/deerflow/agents/__init__.py —— 对外导出 ThreadStateSandboxState init.py:5
  • backend/packages/harness/deerflow/agents/lead_agent/agent.py —— 将 ThreadState 注入 create_agent(state_schema=...) agent.py:426
  • backend/packages/harness/deerflow/agents/middlewares/thread_data_middleware.py —— 写 thread_data thread_data_middleware.py:113-118
  • backend/packages/harness/deerflow/agents/middlewares/uploads_middleware.py —— 写 uploaded_files uploads_middleware.py:292-295
  • backend/packages/harness/deerflow/agents/middlewares/view_image_middleware.py —— 读 viewed_images view_image_middleware.py:103
  • backend/packages/harness/deerflow/agents/middlewares/title_middleware.py —— 读/写 title title_middleware.py:75-89
  • backend/packages/harness/deerflow/sandbox/middleware.py —— 写/读 sandbox middleware.py:64-73
  • backend/packages/harness/deerflow/tools/builtins/present_file_tool.py —— 写 artifacts present_file_tool.py:116-121
  • backend/packages/harness/deerflow/tools/builtins/view_image_tool.py —— 写 viewed_images view_image_tool.py:158-162
  • backend/packages/harness/deerflow/sandbox/tools.py —— 工具侧读 sandbox / thread_data,惰性写 sandbox tools.py:997-1023

Components / Subsystems

下面逐字段说明:含义、类型、reducer、读写方。

sandbox(NotRequired[SandboxState | None])

承载当前线程绑定的沙箱句柄。SandboxState 仅含 sandbox_id(可选 str)thread_state.py:6-7。无自定义 reducer(默认覆盖语义)。SandboxMiddlewarelazy_init=False 时于 before_agent 写入 {"sandbox": {"sandbox_id": sandbox_id}},在 after_agent 读出并释放 middleware.py:58-73。默认 lazy_init=True 时则推迟到首个沙箱工具调用:ensure_sandbox_initialized() 直接对 runtime.state["sandbox"] 原地赋值并持久化跨工具调用 tools.py:1080-1102。读取方还包括 is_local_sandbox()sandbox_from_runtime() tools.py:1006-1048

thread_data(NotRequired[ThreadDataState | None])

携带本线程三大目录的物理路径:workspace_path / uploads_path / outputs_path thread_state.py:10-13。无自定义 reducer。唯一写入方是 ThreadDataMiddleware.before_agent,它解析 thread_iduser_id 后返回 {"thread_data": {**paths}} thread_data_middleware.py:113-118。读取方主要是工具:get_thread_data(runtime) 返回 runtime.state.get("thread_data") tools.py:997-1003,present_filesoutputs_path 校验路径归属 present_file_tool.py:58-61

title(NotRequired[str | None])

线程自动标题。无自定义 reducer。TitleMiddleware._should_generate_title 先读 state.get("title"),已有标题则跳过;首轮完整对话后由 after_model(本地兜底)或 aafter_model(LLM 生成)返回 {"title": ...} 写回 title_middleware.py:75-89 title_middleware.py:146-184

artifacts(Annotated[list[str], merge_artifacts])

用户可见产出物的虚拟路径列表,绑定 merge_artifacts reducer thread_state.py:52。唯一写入方是 present_files 工具,它把规范化后的 /mnt/user-data/outputs/* 路径以 Command(update={"artifacts": normalized_paths, ...}) 写回,注释明确"State updates are handled by a reducer to prevent conflicts",支持并行调用 present_file_tool.py:103 present_file_tool.py:116-121

todos(NotRequired[list | None])

待办列表,无自定义 reducer(由 LangChain TodoListMiddlewarePlanningState 管理同名键)。DeerFlow 侧的 todo_middleware.py 仅在 plan mode 下生效,读 state.get("todos") 用于上下文丢失检测与退出判定 todo_middleware.py:123-124 todo_middleware.py:293ThreadState 声明该键以保证图状态 schema 包含它。

uploaded_files(NotRequired[list[dict] | None])

本回合新上传文件的元数据列表。无自定义 reducer(覆盖语义,每回合重算)。UploadsMiddleware.before_agent 从最后一条 HumanMessageadditional_kwargs.files 解析新文件,扫描历史文件,并返回 {"uploaded_files": new_files, "messages": messages} uploads_middleware.py:292-295

viewed_images(Annotated[dict[str, ViewedImageData], merge_viewed_images])

image_path -> {base64, mime_type} 映射,绑定 merge_viewed_images reducer thread_state.py:55view_image 工具读图后写 Command(update={"viewed_images": {image_path: {...}}}),reducer 与已有图片合并 view_image_tool.py:158-162ViewImageMiddleware.before_modelstate.get("viewed_images", {}) 构造含 base64 的多模态 HumanMessage 注入 LLM 调用前 view_image_middleware.py:103-127ViewImageMiddlewareState 直接继承 ThreadState 以保留 reducer 注解 view_image_middleware.py:15-16

Data Flow

下面是一次回合中各中间件 / 工具对 ThreadState 的读写时序(以含 view_imagepresent_files 调用的回合为例)。

关键点:view_image 工具写入 viewed_images 后,reducer 把它合并进图状态;下一次 before_modelViewImageMiddleware 能读到完整图片集合,从而在 LLM 调用前注入 base64 多模态消息。若后续某中间件写入空字典,merge_viewed_images 触发清空分支,防止 base64 数据反复进入 token 预算。

Implementation Details

merge_artifacts——保序去重合并,任一侧为 None 时退化为另一侧:

python
def merge_artifacts(existing: list[str] | None, new: list[str] | None) -> list[str]:
    """Reducer for artifacts list - merges and deduplicates artifacts."""
    if existing is None:
        return new or []
    if new is None:
        return existing
    # Use dict.fromkeys to deduplicate while preserving order
    return list(dict.fromkeys(existing + new))

解读:dict.fromkeys(existing + new) 利用 Python 3.7+ 字典保序特性,先拼接两侧列表,再以路径字符串为键去重,既消除并行 present_files 调用产生的重复路径,又保留首次出现顺序 thread_state.py:21-28

merge_viewed_images——字典合并,空字典作为"清空"哨兵:

python
def merge_viewed_images(existing, new):
    if existing is None:
        return new or {}
    if new is None:
        return existing
    # Special case: empty dict means clear all viewed images
    if len(new) == 0:
        return {}
    # Merge dictionaries, new values override existing ones for same keys
    return {**existing, **new}

解读:常规路径走 {**existing, **new},同 image_path 键以新值覆盖;但 len(new) == 0 时返回 {},这是为让中间件在处理完图片注入后主动清空缓存而设计的特殊约定(docstring 明确说明),回归测试 test_view_image_middleware.pystate = {"viewed_images": {}} 覆盖该场景 thread_state.py:31-45 test_view_image_middleware.py:156

速查表

字段类型reducer写方读方Source
sandboxNotRequired[SandboxState | None]无(覆盖)SandboxMiddleware.before_agent(eager)/ ensure_sandbox_initialized(lazy)is_local_sandbox / sandbox_from_runtime / SandboxMiddleware.after_agentthread_state.py:49 middleware.py:58-83
thread_dataNotRequired[ThreadDataState | None]无(覆盖)ThreadDataMiddleware.before_agentget_thread_data / present_filesthread_state.py:50 thread_data_middleware.py:113-118
titleNotRequired[str | None]无(覆盖)TitleMiddleware.(a)after_modelTitleMiddleware._should_generate_titlethread_state.py:51 title_middleware.py:75-89
artifactsAnnotated[list[str], merge_artifacts]merge_artifacts(保序去重)present_files 工具流式 values 快照消费方thread_state.py:52 present_file_tool.py:116-121
todosNotRequired[list | None]无(由 PlanningState 管理)TodoListMiddleware(LangChain)todo_middleware 检测/退出判定thread_state.py:53 todo_middleware.py:123-124
uploaded_filesNotRequired[list[dict] | None]无(覆盖,逐回合重算)UploadsMiddleware.before_agent流式 values 快照消费方thread_state.py:54 uploads_middleware.py:292-295
viewed_imagesAnnotated[dict[str, ViewedImageData], merge_viewed_images]merge_viewed_images(合并 / 空字典清空)view_image 工具ViewImageMiddleware.before_modelthread_state.py:55 view_image_tool.py:158-162
messages(继承)AgentState 提供add_messages(LangChain 内置)全部中间件 / 模型 / 工具全部中间件 / 模型thread_state.py:3 thread_state.py:48

Common Pitfalls / Tips

  • 不要直接复用 ThreadState 当中间件 schema,除非需要 reducer 注解:多数中间件定义最小子 schema(如 TitleMiddlewareState 只声明 title),LangGraph 按字段名归并。但 ViewImageMiddlewareState 必须继承 ThreadState,否则 viewed_imagesAnnotated reducer 注解会丢失,导致并行写入退化为覆盖 view_image_middleware.py:15-16
  • viewed_images 写空字典是"清空"语义,不是"无更新":若想"不修改" viewed_images,应不返回该键,而非返回 {}——后者会触发 merge_viewed_images 的清空分支 thread_state.py:41-43
  • artifacts 由 reducer 去重,工具可安全并行:present_files 注释明确声明可与其他工具并行;依赖 merge_artifacts 而非自行读-改-写 present_file_tool.py:103
  • sandbox 惰性写入靠原地赋值:lazy_init=True(默认)时不经中间件返回值,而由 ensure_sandbox_initialized() 直接 runtime.state["sandbox"] = ...,这依赖 LangGraph 工具运行时 state 的可变性持久跨工具调用 tools.py:1101-1102
  • thread_data 缺失会让 present_files 报错:工具用 outputs_path 做路径归属校验,若 ThreadDataMiddleware 未先于工具运行写入,present_files 抛 "Thread outputs path is not available" present_file_tool.py:58-61

References

页面关系
10-LeadAgent与Agent工厂.mdThreadState 由 LeadAgent 工厂经 create_agent(state_schema=ThreadState) 注册;本章是其状态契约的上游
12-中间件链机制.md各中间件用与 ThreadState 兼容的子 schema 在钩子中读写本章字段;中间件链是状态流转的执行框架
27-文件上传与文档转换.mduploaded_files 字段承载上传文件元数据,本章描述其在图状态中的存储与读写,该章描述上传与转换管线

公司内部参考 · 由 claude-wiki-gen 基于源码自动生成的二次分析