主题
ThreadState 与状态管理
本章目标:
- 讲清
ThreadState为什么要在 LangGraphAgentState之上扩展sandbox/thread_data/title/artifacts/todos/uploaded_files/viewed_images七个业务字段。- 拆解两个自定义 reducer——
merge_artifacts(去重合并)与merge_viewed_images(合并 + 空字典清空)——的语义与边界。- 还原一次回合中各中间件与工具如何按
state schema约定读写这些字段并跨节点流转。
TL;DR
ThreadState 是 DeerFlow lead agent 的图状态 schema,继承自 LangChain 的 AgentState,额外声明七个 NotRequired 字段承载会话级业务上下文 thread_state.py:48-55。其中 artifacts 与 viewed_images 用 Annotated[..., reducer] 绑定自定义 reducer,使并行工具调用的更新可被无冲突合并(去重 / 清空)而非互相覆盖 thread_state.py:52-55。中间件通过各自定义的 state_schema(均与 ThreadState 兼容)在 before_agent / before_model / after_model 钩子读写这些字段,工具则经 Command(update=...) 写回。ThreadState 被注册为 create_agent(..., state_schema=ThreadState) 的图状态类型 agent.py:426。
Overview
LangGraph 的图状态本质是一个带 reducer 的 TypedDict:每个键可选地用 Annotated[Type, reducer] 声明合并函数,节点返回的局部更新会按键经 reducer 与旧值合并。LangChain 的 AgentState 已提供 messages(add_messages reducer)等基础键。DeerFlow 的 lead agent 不止做"消息进、消息出",还需要在整条对话生命周期内携带:沙箱句柄、每线程数据目录、自动生成的标题、产出物清单、待办列表、上传文件元数据、已查看图片的 base64。这些数据不是消息,却需要在多个中间件、多个工具、多个回合之间共享——因此必须进入图状态而非靠局部变量传递。
为什么要扩展 AgentState 而非另开通道:中间件钩子(before_agent/before_model/after_model)和工具(Command(update=...))只能读写图状态。把业务字段并入同一个 schema,它们才能被持久化(checkpointer)、被流式快照(stream_mode=["values"])、被跨回合恢复 thread_state.py:48-55。
为什么需要自定义 reducer:present_files 与 view_image 工具可与其他工具并行执行(同一回合多个 tool call)。若用默认"后写覆盖"语义,后返回的工具会清掉前一个工具写入的 artifacts。merge_artifacts 将两侧列表拼接并保序去重,保证并行工具产出的文件路径不丢失;merge_viewed_images 则用字典合并,并保留一个特殊约定:中间件写入 {} 表示"已处理完毕,清空图片缓存",避免 base64 数据无限累积进入后续 LLM 调用 thread_state.py:21-45。
Architecture
ThreadState 是整个 lead agent 子系统的"数据契约"。其结构分三层:基类 AgentState(LangChain 提供)、扩展字段(七个业务键)、自定义 reducer(两个合并函数)。每个中间件不直接复用 ThreadState,而是定义一个仅声明自己关心字段的子 schema(注释标明"Compatible with the ThreadState schema"),LangGraph 在装配图时按字段名归并这些 schema——这是 LangGraph state schema 机制允许的"多 schema 叠加"。
Source 列表:
backend/packages/harness/deerflow/agents/thread_state.py——ThreadState、SandboxState、ThreadDataState、ViewedImageData、两个 reducer 的唯一定义处 thread_state.py:1-56backend/packages/harness/deerflow/agents/__init__.py—— 对外导出ThreadState、SandboxStateinit.py:5backend/packages/harness/deerflow/agents/lead_agent/agent.py—— 将ThreadState注入create_agent(state_schema=...)agent.py:426backend/packages/harness/deerflow/agents/middlewares/thread_data_middleware.py—— 写thread_datathread_data_middleware.py:113-118backend/packages/harness/deerflow/agents/middlewares/uploads_middleware.py—— 写uploaded_filesuploads_middleware.py:292-295backend/packages/harness/deerflow/agents/middlewares/view_image_middleware.py—— 读viewed_imagesview_image_middleware.py:103backend/packages/harness/deerflow/agents/middlewares/title_middleware.py—— 读/写titletitle_middleware.py:75-89backend/packages/harness/deerflow/sandbox/middleware.py—— 写/读sandboxmiddleware.py:64-73backend/packages/harness/deerflow/tools/builtins/present_file_tool.py—— 写artifactspresent_file_tool.py:116-121backend/packages/harness/deerflow/tools/builtins/view_image_tool.py—— 写viewed_imagesview_image_tool.py:158-162backend/packages/harness/deerflow/sandbox/tools.py—— 工具侧读sandbox/thread_data,惰性写sandboxtools.py:997-1023
Components / Subsystems
下面逐字段说明:含义、类型、reducer、读写方。
sandbox(NotRequired[SandboxState | None])
承载当前线程绑定的沙箱句柄。SandboxState 仅含 sandbox_id(可选 str)thread_state.py:6-7。无自定义 reducer(默认覆盖语义)。SandboxMiddleware 在 lazy_init=False 时于 before_agent 写入 {"sandbox": {"sandbox_id": sandbox_id}},在 after_agent 读出并释放 middleware.py:58-73。默认 lazy_init=True 时则推迟到首个沙箱工具调用:ensure_sandbox_initialized() 直接对 runtime.state["sandbox"] 原地赋值并持久化跨工具调用 tools.py:1080-1102。读取方还包括 is_local_sandbox()、sandbox_from_runtime() tools.py:1006-1048。
thread_data(NotRequired[ThreadDataState | None])
携带本线程三大目录的物理路径:workspace_path / uploads_path / outputs_path thread_state.py:10-13。无自定义 reducer。唯一写入方是 ThreadDataMiddleware.before_agent,它解析 thread_id 与 user_id 后返回 {"thread_data": {**paths}} thread_data_middleware.py:113-118。读取方主要是工具:get_thread_data(runtime) 返回 runtime.state.get("thread_data") tools.py:997-1003,present_files 用 outputs_path 校验路径归属 present_file_tool.py:58-61。
title(NotRequired[str | None])
线程自动标题。无自定义 reducer。TitleMiddleware._should_generate_title 先读 state.get("title"),已有标题则跳过;首轮完整对话后由 after_model(本地兜底)或 aafter_model(LLM 生成)返回 {"title": ...} 写回 title_middleware.py:75-89 title_middleware.py:146-184。
artifacts(Annotated[list[str], merge_artifacts])
用户可见产出物的虚拟路径列表,绑定 merge_artifacts reducer thread_state.py:52。唯一写入方是 present_files 工具,它把规范化后的 /mnt/user-data/outputs/* 路径以 Command(update={"artifacts": normalized_paths, ...}) 写回,注释明确"State updates are handled by a reducer to prevent conflicts",支持并行调用 present_file_tool.py:103 present_file_tool.py:116-121。
todos(NotRequired[list | None])
待办列表,无自定义 reducer(由 LangChain TodoListMiddleware 的 PlanningState 管理同名键)。DeerFlow 侧的 todo_middleware.py 仅在 plan mode 下生效,读 state.get("todos") 用于上下文丢失检测与退出判定 todo_middleware.py:123-124 todo_middleware.py:293。ThreadState 声明该键以保证图状态 schema 包含它。
uploaded_files(NotRequired[list[dict] | None])
本回合新上传文件的元数据列表。无自定义 reducer(覆盖语义,每回合重算)。UploadsMiddleware.before_agent 从最后一条 HumanMessage 的 additional_kwargs.files 解析新文件,扫描历史文件,并返回 {"uploaded_files": new_files, "messages": messages} uploads_middleware.py:292-295。
viewed_images(Annotated[dict[str, ViewedImageData], merge_viewed_images])
image_path -> {base64, mime_type} 映射,绑定 merge_viewed_images reducer thread_state.py:55。view_image 工具读图后写 Command(update={"viewed_images": {image_path: {...}}}),reducer 与已有图片合并 view_image_tool.py:158-162。ViewImageMiddleware.before_model 读 state.get("viewed_images", {}) 构造含 base64 的多模态 HumanMessage 注入 LLM 调用前 view_image_middleware.py:103-127。ViewImageMiddlewareState 直接继承 ThreadState 以保留 reducer 注解 view_image_middleware.py:15-16。
Data Flow
下面是一次回合中各中间件 / 工具对 ThreadState 的读写时序(以含 view_image 与 present_files 调用的回合为例)。
关键点:view_image 工具写入 viewed_images 后,reducer 把它合并进图状态;下一次 before_model 时 ViewImageMiddleware 能读到完整图片集合,从而在 LLM 调用前注入 base64 多模态消息。若后续某中间件写入空字典,merge_viewed_images 触发清空分支,防止 base64 数据反复进入 token 预算。
Implementation Details
merge_artifacts——保序去重合并,任一侧为 None 时退化为另一侧:
python
def merge_artifacts(existing: list[str] | None, new: list[str] | None) -> list[str]:
"""Reducer for artifacts list - merges and deduplicates artifacts."""
if existing is None:
return new or []
if new is None:
return existing
# Use dict.fromkeys to deduplicate while preserving order
return list(dict.fromkeys(existing + new))解读:dict.fromkeys(existing + new) 利用 Python 3.7+ 字典保序特性,先拼接两侧列表,再以路径字符串为键去重,既消除并行 present_files 调用产生的重复路径,又保留首次出现顺序 thread_state.py:21-28。
merge_viewed_images——字典合并,空字典作为"清空"哨兵:
python
def merge_viewed_images(existing, new):
if existing is None:
return new or {}
if new is None:
return existing
# Special case: empty dict means clear all viewed images
if len(new) == 0:
return {}
# Merge dictionaries, new values override existing ones for same keys
return {**existing, **new}解读:常规路径走 {**existing, **new},同 image_path 键以新值覆盖;但 len(new) == 0 时返回 {},这是为让中间件在处理完图片注入后主动清空缓存而设计的特殊约定(docstring 明确说明),回归测试 test_view_image_middleware.py 用 state = {"viewed_images": {}} 覆盖该场景 thread_state.py:31-45 test_view_image_middleware.py:156。
速查表
| 字段 | 类型 | reducer | 写方 | 读方 | Source |
|---|---|---|---|---|---|
sandbox | NotRequired[SandboxState | None] | 无(覆盖) | SandboxMiddleware.before_agent(eager)/ ensure_sandbox_initialized(lazy) | is_local_sandbox / sandbox_from_runtime / SandboxMiddleware.after_agent | thread_state.py:49 middleware.py:58-83 |
thread_data | NotRequired[ThreadDataState | None] | 无(覆盖) | ThreadDataMiddleware.before_agent | get_thread_data / present_files | thread_state.py:50 thread_data_middleware.py:113-118 |
title | NotRequired[str | None] | 无(覆盖) | TitleMiddleware.(a)after_model | TitleMiddleware._should_generate_title | thread_state.py:51 title_middleware.py:75-89 |
artifacts | Annotated[list[str], merge_artifacts] | merge_artifacts(保序去重) | present_files 工具 | 流式 values 快照消费方 | thread_state.py:52 present_file_tool.py:116-121 |
todos | NotRequired[list | None] | 无(由 PlanningState 管理) | TodoListMiddleware(LangChain) | todo_middleware 检测/退出判定 | thread_state.py:53 todo_middleware.py:123-124 |
uploaded_files | NotRequired[list[dict] | None] | 无(覆盖,逐回合重算) | UploadsMiddleware.before_agent | 流式 values 快照消费方 | thread_state.py:54 uploads_middleware.py:292-295 |
viewed_images | Annotated[dict[str, ViewedImageData], merge_viewed_images] | merge_viewed_images(合并 / 空字典清空) | view_image 工具 | ViewImageMiddleware.before_model | thread_state.py:55 view_image_tool.py:158-162 |
messages(继承) | AgentState 提供 | add_messages(LangChain 内置) | 全部中间件 / 模型 / 工具 | 全部中间件 / 模型 | thread_state.py:3 thread_state.py:48 |
Common Pitfalls / Tips
- 不要直接复用
ThreadState当中间件 schema,除非需要 reducer 注解:多数中间件定义最小子 schema(如TitleMiddlewareState只声明title),LangGraph 按字段名归并。但ViewImageMiddlewareState必须继承ThreadState,否则viewed_images的Annotatedreducer 注解会丢失,导致并行写入退化为覆盖 view_image_middleware.py:15-16。 viewed_images写空字典是"清空"语义,不是"无更新":若想"不修改"viewed_images,应不返回该键,而非返回{}——后者会触发merge_viewed_images的清空分支 thread_state.py:41-43。artifacts由 reducer 去重,工具可安全并行:present_files注释明确声明可与其他工具并行;依赖merge_artifacts而非自行读-改-写 present_file_tool.py:103。sandbox惰性写入靠原地赋值:lazy_init=True(默认)时不经中间件返回值,而由ensure_sandbox_initialized()直接runtime.state["sandbox"] = ...,这依赖 LangGraph 工具运行时 state 的可变性持久跨工具调用 tools.py:1101-1102。thread_data缺失会让present_files报错:工具用outputs_path做路径归属校验,若ThreadDataMiddleware未先于工具运行写入,present_files抛 "Thread outputs path is not available" present_file_tool.py:58-61。
References
- backend/packages/harness/deerflow/agents/thread_state.py ——
ThreadState及两个 reducer 定义 - backend/packages/harness/deerflow/agents/init.py —— 对外导出
ThreadState/SandboxState - backend/packages/harness/deerflow/agents/lead_agent/agent.py ——
create_agent(state_schema=ThreadState)注册 - backend/packages/harness/deerflow/agents/middlewares/thread_data_middleware.py ——
thread_data写入 - backend/packages/harness/deerflow/agents/middlewares/uploads_middleware.py ——
uploaded_files写入 - backend/packages/harness/deerflow/agents/middlewares/view_image_middleware.py ——
viewed_images读取与注入 - backend/packages/harness/deerflow/agents/middlewares/title_middleware.py ——
title读写 - backend/packages/harness/deerflow/sandbox/middleware.py ——
sandbox写入与释放 - backend/packages/harness/deerflow/tools/builtins/present_file_tool.py ——
artifacts写入工具 - backend/packages/harness/deerflow/tools/builtins/view_image_tool.py ——
viewed_images写入工具 - backend/packages/harness/deerflow/sandbox/tools.py —— 工具侧读
sandbox/thread_data,惰性写sandbox - backend/tests/test_view_image_middleware.py ——
viewed_images清空回归测试
Related Pages
| 页面 | 关系 |
|---|---|
| 10-LeadAgent与Agent工厂.md | ThreadState 由 LeadAgent 工厂经 create_agent(state_schema=ThreadState) 注册;本章是其状态契约的上游 |
| 12-中间件链机制.md | 各中间件用与 ThreadState 兼容的子 schema 在钩子中读写本章字段;中间件链是状态流转的执行框架 |
| 27-文件上传与文档转换.md | uploaded_files 字段承载上传文件元数据,本章描述其在图状态中的存储与读写,该章描述上传与转换管线 |