Skip to content

子代理委派与确定性 Pipeline

本章目标:

  1. 弄懂「LLM 自由委派子代理」(task 工具 + SubagentExecutor + 双线程池 + 协作式取消)如何把一段任务隔离到独立上下文异步跑完
  2. 弄懂「确定性 DAG 编排」(weekly_report / meeting_aggregator(_realtime) / ba_report)如何用 LangGraph StateGraph 把固定流程写成无 LLM 自主决策的管线
  3. 能判断一个新需求该走哪种模式,并知道两者的并发上限、超时、注册方式各自约束

TL;DR

a-cdm 引擎里有两套正交的"任务编排"机制。① 子代理委派:Lead Agent 通过 task 工具把一段子任务委派给 SubagentExecutor,在后台线程池里跑一个独立的小 agent(独立上下文、独立工具子集),Lead 端轮询 5 秒拿结果并推 SSE 事件——适合"流程不确定、需要 LLM 边做边判断"的探索/并行场景,硬限 MAX_CONCURRENT_SUBAGENTS=3、15 分钟超时、不可嵌套。② 确定性 Pipeline:把固定业务流程写成 LangGraph StateGraph 的线性/分支 DAG(节点是普通 Python 函数,LLM 只在某些节点被当工具调一次),注册进 langgraph.json / aegra.jsongraphs 段,经标准 /threads/{id}/runs?assistant_id=xxx 触发——适合定时报告、批处理、端到端生成这类"步骤写死、不要 agent 自由发挥"的任务。两者共用 LangGraph 运行时与 checkpointer,但设计取舍完全相反:前者把控制权交给 LLM,后者把控制权写进图结构。

Overview(为什么需要两种模式)

一个 AI 系统里,任务的"控制流由谁决定"有两种极端答案,各解决不同问题。

问题一:Lead Agent 的上下文会被探索性子任务污染。 比如用户让 Lead 写一份调研报告,Lead 需要分别调研财报、负面新闻、行业趋势。如果 Lead 自己一步步做,三条线的中间产物(网页原文、报错、试错)全堆进它的对话历史,token 爆炸且互相干扰。直觉做法是"让 Lead 自己 honor 一个子流程",但 LLM 没有真正的"子调用栈"。答案是子代理委派:task 工具把一段任务连同一个干净的 prompt 丢给一个全新的小 agent,它在自己的上下文里跑完,只把最终结论(一段文本)回传给 Lead。Lead 的上下文只多了一行结果,中间噪声全留在子代理那边。这条机制还顺带支持并行——Lead 一次发 3 个 task 调用,三个子代理同时在后台线程池跑。

问题二:有些流程根本不该让 LLM 自由发挥。 周报生成("取纪要 → 拼 prompt → 调 LLM → 渲染邮件 HTML → 存 artifact")步骤是死的,每一步该干啥写死在代码里最稳。如果硬塞进 agent 框架,等于花 agent 的 overhead(工具装配、中间件链、循环检测)去跑一条直线,还要承担 LLM 自由发挥跑偏的风险。答案是确定性 Pipeline:直接用 LangGraph StateGraph 把这条流程画成 DAG,节点是普通函数,边写死,LLM 只在 call_llm 那个节点被当成一次普通调用。它和 agent 共用 LangGraph Server / Aegra 运行时和 checkpointer,所以照样能 /threads/{id}/runs 触发、照样能持久化状态,但没有"agent 自主决策"这层。

Architecture

两套机制的核心参与方:

组件模式职责Source
task_tool子代理LLM 可见的委派工具,组装 executor、后台启动、轮询、推 SSEdeer-flow/backend/packages/harness/deerflow/tools/builtins/task_tool.py:22-126
SubagentExecutor子代理子代理执行引擎:建小 agent、astream、协作式取消、超时deer-flow/backend/packages/harness/deerflow/subagents/executor.py:128-529
SubagentConfig子代理子代理定义(prompt / 工具白名单 / model / max_turns / timeout)deer-flow/backend/packages/harness/deerflow/subagents/config.py:6-28
registry子代理内置子代理注册表 + config.yaml 覆盖合并deer-flow/backend/packages/harness/deerflow/subagents/registry.py:13-99
SubagentLimitMiddleware子代理after_model 截断超出 MAX_CONCURRENT_SUBAGENTS 的并行 task 调用deer-flow/backend/packages/harness/deerflow/agents/middlewares/subagent_limit_middleware.py:24-75
make_pipeline(各 pipeline)PipelineLangGraph StateGraph 工厂,签名 make_xxx(config),注册进 graphs 段deer-flow/backend/packages/harness/deerflow/pipelines/weekly_report/pipeline.py:27-54
langgraph.json graphs 段两者把 lead_agent 与 4 条 pipeline 一起注册成可触发的 graphdeer-flow/backend/langgraph.json:8-13

注意一个关键对称:两者在 langgraph.json 里是平级注册的(lead_agent 是 agent,其余 4 个是 pipeline),都通过同一个 POST /threads/{id}/runs + assistant_id 触发,共用 LangGraph 顶层 checkpointer。区别全在 graph 内部:agent graph 的节点会循环调 LLM 并由 LLM 决定下一步,pipeline graph 的节点是写死的函数序列。

Components / Subsystems(子代理委派)

task 工具:委派入口

职责:LLM 唯一能看见的委派接口。它不直接跑子代理,而是组装 SubagentExecutor、后台启动、并在 Lead 端代替 LLM 轮询结果。

关键函数:task_tool(runtime, description, prompt, subagent_type, tool_call_id, max_turns)(deer-flow/backend/packages/harness/deerflow/tools/builtins/task_tool.py:22-30)。

实现要点:

  • 防嵌套:子代理装配工具时显式 subagent_enabled=False(task_tool.py:111),所以子代理拿不到 task 工具,无法再委派下一层。general-purpose / bashdisallowed_tools 也都含 "task"(general_purpose.py:47)。
  • task_id 复用 tool_call_id:executor.execute_async(prompt, task_id=tool_call_id)(task_tool.py:126),让前端 SSE 事件能和具体那次工具调用对应上。
  • 后端轮询代替 LLM 轮询:task_status 工具不暴露给 LLM(tools/tools.py:18-21),task_tool 内部 while True 每 5 秒 get_background_task_result 拉状态(task_tool.py:142-197),把新产生的 AI 消息逐条以 task_running 事件 writer(...) 推出去(task_tool.py:157-172)。LLM 视角里这就是一个"阻塞到出结果"的普通工具。
  • 协作式取消传播:父任务被取消时捕获 asyncio.CancelledError,调 request_cancel_background_task(task_id) 给后台线程发信号,并起一个 deferred cleanup task 等它真正终止(task_tool.py:211-248)。

SubagentExecutor:执行引擎

职责:真正把一个 SubagentConfig + task 文本变成一次独立 agent 运行。

关键类:SubagentExecutor(deer-flow/backend/packages/harness/deerflow/subagents/executor.py:128)。

实现要点:

  • 工具过滤:_filter_tools(executor.py:83-110)按 config 的 allowlist / denylist 裁剪父代理传入的全量工具。general-purpose 继承全部工具但去掉 task/ask_clarification/present_files;bash 只留 bash/ls/read_file/write_file/str_replace
  • 复用 Lead 的中间件链:_create_agentbuild_subagent_runtime_middlewares(lazy_init=True)(executor.py:174-185),子代理和 Lead 共用同一套运行时中间件组合,行为一致。子代理强制 thinking_enabled=False(executor.py:172)。
  • 三个线程池:_scheduler_pool(3 worker,调度编排)、_execution_pool(3 worker,带超时的实际执行)、_isolated_loop_pool(3 worker,从已在运行的事件循环里发起同步执行时用,隔离事件循环避免 httpx 等 async 原语跨 loop 冲突)(executor.py:73-80)。
  • 事件循环隔离:execute() 检测到当前线程已有 running event loop,就把活儿丢进 _isolated_loop_pool,在一个全新 loop 里跑 _aexecute,跑完彻底清理 pending task / asyncgens(executor.py:380-448)。这是为了让子代理里能安全用 MCP 这类 async 工具,不和父 agent 的 loop 抢 httpx client。
  • 协作式取消:_aexecuteagent.astream(stream_mode="values") 流式跑,每个 chunk 边界检查 cancel_event(executor.py:251-272)。线程池里的线程无法被 Future.cancel() 强杀,所以靠这个 event 在迭代边界优雅停——代价是单个长工具调用要等到它产出下一个 chunk 才会被打断(代码注释明确写了这一点,executor.py:261-264)。
  • 超时:execute_async 把执行 future result(timeout=self.config.timeout_seconds),超时则置 TIMED_OUT 状态并 set() cancel_event + future.cancel()(executor.py:511-520)。

SubagentConfig 与 registry:子代理定义

职责:声明有哪些子代理、各自的 prompt / 工具边界 / 资源上限,并允许 config.yaml 在运行时覆盖。

关键点:

  • 内置只有两个:general-purpose(复杂多步、需探索+动作)和 bash(命令执行专家),注册在 BUILTIN_SUBAGENTS dict(subagents/builtins/__init__.py:12-15)。
  • SubagentConfig(subagents/config.py:6-28)默认 disallowed_tools=["task"]model="inherit"(用父代理模型)、max_turns=50timeout_seconds=900
  • get_subagent_config(name)(registry.py:13-63)取内置定义后,用 dataclasses.replace 叠加 config.yaml 的 per-agent 覆盖(timeout / max_turns / model),做到改资源上限不改代码。
  • get_available_subagent_names()(registry.py:84-99)关键安全约束:host bash 未放开时,bash 子代理从可用列表里被剔除,task_tool 也会二次拦截(task_tool.py:69-70)。

Data Flow:一次子代理委派的完整时序

文字拆解关键步骤:

  1. LLM 在 SUBAGENT MODE 下被系统提示要求"分解 → 并行委派 → 综合"(agents/lead_agent/prompt.py:190-265),发出一个或多个 task 调用。
  2. task_tool 校验子代理类型和 host bash 权限,组装 SubagentExecutor(注意把父的 sandbox/thread_data/thread_id 透传进去,子代理和父共用同一个 sandbox 目录)。
  3. execute_async_scheduler_pool 里起 run_task,后者再 submit 到 _execution_pool 并带 900s 超时;_aexecuteastream 跑子 agent。
  4. task_tool 主体不退出,while True 每 5 秒拉一次结果,把子代理新产出的 AI 消息逐条 task_running 推前端(实时可见子代理在干啥),并在终态推 task_completed/failed/timed_out/cancelled
  5. 三类终止路径:正常完成回最后一条 AIMessage 文本;超时由线程池 result(timeout=...) 触发置 TIMED_OUT;父任务取消则发协作式取消信号并延迟清理。

Components / Subsystems(确定性 Pipeline)

四条 pipeline 都是 make_xxx(config) 工厂返回 StateGraph(...).compile(),但 DAG 形态差异明显,正好覆盖"确定性编排"的几种典型形态。

weekly_report:最纯的线性 DAG

5 节点一条直线,无分支无 LLM 决策:fetch → prompt → llm → email → artifact → END(pipelines/weekly_report/pipeline.py:39-50)。llm 节点只是把拼好的 prompt 调一次模型,模型不持有控制权——它产出的内容直接进 email 节点渲染 HTML。这是"流程死、只借 LLM 做一次内容生成"的典型,也是和 agent 模式最鲜明的对比:agent 会循环回到 LLM 让它决定下一步,这里 LLM 之后强制走 email。注释还标出了演进路径(V3 在 fetch/prompt 间插并行抽取节点,V4 才升级成 agent)。

meeting_aggregator:带依赖注入的 6 节点 DAG

load_atoms → relate → reduce_per_kind → build_timeline → mark_conflicts → finalize → END(pipelines/meeting_aggregator/pipeline.py:85-91)。亮点是用 closure 做依赖注入:make_pipeline 接收 atom_source / project_auth / embedding_service 三个接口实现(默认 in-memory),用闭包包成 LangGraph 标准 (state, config) 签名的节点(pipeline.py:67-74)。这样切换数据源/权限/embedding 实现完全不动节点代码,只换 make_pipeline 的注入参数。load_atoms 节点内部还做入口鉴权(从 config.configurable 取 user_sub 校验项目权限,pipelines/meeting_aggregator/nodes.py:55-68)——确定性 pipeline 同样要授权,只是授权写在节点而非中间件链。

meeting_aggregator_realtime:9 节点(3 前置抽取 + 6 复用)

在 meeting_aggregator 前面叠 3 个实时抽取节点:fetch_files → convert_md → extract_atoms → [load_atoms → relate → ... → finalize](pipelines/meeting_aggregator_realtime/pipeline.py:110-119)。后 6 节点直接 import 复用 meeting_aggregator 的实现而不是 fork(pipeline.py:43-50)。load_atoms 节点的 closure 在每次 run 时才读 state.extracted_atoms 实例化 InlineAtomSource——因为 make_pipeline 时还不知道 atom 数据,数据是前置抽取节点跑出来塞进 state 的(pipeline.py:74-89)。这体现确定性 pipeline 的可组合性:节点是纯函数,跨 pipeline 复用零成本。

ba_report:从 interrupt DAG 退化为 3 节点单步执行器

ba_report 有两套实现,反映了一个真实的设计演进:

  • 旧版 pipeline.py(已 deprecated):8 步骤 + 8 confirm 节点的 human-in-the-loop DAG,每个 confirm 节点 interrupt_before 等用户确认,confirm 后用 should_continue 条件边决定"重试当前步 / 进下一步"(pipelines/ba_report/pipeline.py:163-243)。模块顶部 warnings.warn(... DeprecationWarning)(pipeline.py:63-69)明确它不在活跃路径。
  • 现役 step_runner.py:langgraph.json 实际注册的是 deerflow.pipelines.ba_report.step_runner:make_step_runner(langgraph.json:13),只有 3 节点 prompt_builder → agent → polish → END(step_runner.py:399-414),单步执行、无 interrupt、状态完全隔离,每次调用独立、结果直接回 acdm-backend 持久化(step_runner.py:5-9)。

step_runneragent 节点有意思:它内部按环境变量 BA_AGENT_STEPS 决定该步走 Skill 模式(纯 LLM 一次 ainvoke,step_runner.py:111-180)还是 Agent 模式(create_react_agent 带文件工具,step_runner.py:183-372)。这是"确定性 pipeline 节点内部嵌一个受限 agent"的混合形态——外层流程仍写死(prompt→agent→polish),只是某个节点视配置决定要不要给 LLM 工具。它和顶层子代理委派的区别:这里没有 task 工具、没有后台线程池、没有并发上限,就是节点内一次有界的 agent.ainvoke(step_runner.py:296-299,带 600s asyncio.wait_for)。

两种模式的设计区别(核心)

维度子代理委派(task)确定性 Pipeline
控制权归属LLM 决定要不要委派、委派什么、几个并行写死在 graph 的 add_edge 里,LLM 无决策权
适用场景探索/并行调研、隔离上下文、流程不确定定时报告、批处理、端到端生成、流程固定
实现载体task 工具 + SubagentExecutor + 双线程池StateGraph DAG,节点=普通函数
并发模型后台线程池,硬限 3 并发,15min 超时LangGraph 引擎调度,DAG 拓扑决定
嵌套禁止(子代理 subagent_enabled=False)节点内可嵌受限 agent(如 ba_report agent 节点)
触发方式LLM 在对话中调工具POST /threads/{id}/runs?assistant_id=xxx
注册位置不单独注册(随 lead_agent 工具装配)langgraph.json / aegra.json graphs 段
取消语义协作式(cancel_event,迭代边界检查)LangGraph 标准 run 取消
状态隔离子代理独立 ThreadState,共享父 sandbox每次 run 独立 state(ba_report 强调完全隔离)
Sourcetask_tool.py:22-126 / executor.py:128-529langgraph.json:8-13 / weekly_report/pipeline.py:27-54

一句话总结取舍:子代理模式把"下一步做什么"的决策权交给 LLM,换取灵活性,代价是不可预测 + 需要并发/超时/取消的工程兜底;Pipeline 模式把决策权写进图结构,换取确定性和低开销,代价是流程一变就要改代码。a-cdm 没有"二选一",而是按任务性质分流——会话式探索走 Lead Agent + 子代理,固定的报告/聚合生成走 pipeline。

速查表:子代理与 Pipeline 全清单

内置子代理:

子代理工具范围modelmax_turns默认 timeout关键约束Source
general-purpose继承全部,去掉 task/ask_clarification/present_filesinherit100900s不可委派、不可澄清deer-flow/backend/packages/harness/deerflow/subagents/builtins/general_purpose.py:5-50
bash仅 bash/ls/read_file/write_file/str_replaceinherit60900shost bash 未放开时不可用deer-flow/backend/packages/harness/deerflow/subagents/builtins/bash_agent.py:5-50

注册的 graph(langgraph.json graphs 段):

graph类型DAG / 形态Source
lead_agentagentmake_lead_agent,工具循环deer-flow/backend/langgraph.json:9
weekly_reportpipeline5 节点线性deer-flow/backend/langgraph.json:10
meeting_aggregatorpipeline6 节点 + 依赖注入deer-flow/backend/langgraph.json:11
meeting_aggregator_realtimepipeline9 节点(3 抽取 + 6 复用)deer-flow/backend/langgraph.json:12
ba_reportpipeline3 节点单步执行器(step_runner)deer-flow/backend/langgraph.json:13

子代理执行状态机:

状态含义Source
PENDINGexecute_async 已登记,未真正起跑deer-flow/backend/packages/harness/deerflow/subagents/executor.py:29
RUNNINGrun_task 已置 + astream 进行中deer-flow/backend/packages/harness/deerflow/subagents/executor.py:30
COMPLETED正常完成,result 为最后一条 AIMessage 文本deer-flow/backend/packages/harness/deerflow/subagents/executor.py:31
FAILED执行抛异常deer-flow/backend/packages/harness/deerflow/subagents/executor.py:32
CANCELLEDcancel_event 触发(父任务取消)deer-flow/backend/packages/harness/deerflow/subagents/executor.py:33
TIMED_OUT超过 timeout_secondsdeer-flow/backend/packages/harness/deerflow/subagents/executor.py:34

扩展指南

加一个新子代理

python
# deer-flow/backend/packages/harness/deerflow/subagents/builtins/my_agent.py
from deerflow.subagents.config import SubagentConfig

MY_AGENT_CONFIG = SubagentConfig(
    name="my-agent",
    description="什么时候 Lead 该委派给我(写给 LLM 看)",
    system_prompt="你是 ... 完成任务后返回简洁结论",
    tools=["bash", "read_file"],          # None=继承全部;给 list=白名单
    disallowed_tools=["task"],            # 必须含 task,否则会递归嵌套
    model="inherit",                       # inherit=用父模型
    max_turns=40,
    timeout_seconds=600,
)
# 然后在 subagents/builtins/__init__.py 的 BUILTIN_SUBAGENTS dict 注册一行

约束(从代码校验读出):

  • disallowed_tools 必须含 "task",否则子代理能再发委派造成递归(task_tool.py:110-111 已经在装配层强制 subagent_enabled=False 兜底,但 config 层也应声明)。
  • 想让 config.yaml 能覆盖 timeout/max_turns/model,无需额外代码,registry.get_subagent_config 会自动叠加(registry.py:30-61)。
  • 若新子代理依赖 host bash,记得它会被 get_available_subagent_names 在未放开 host bash 时剔除(registry.py:91-99)——参照 bash 子代理的处理。

加一条新 Pipeline

python
# deer-flow/backend/packages/harness/deerflow/pipelines/my_flow/pipeline.py
from langgraph.graph import END, StateGraph
from .state import MyFlowState
from .nodes import step_a, step_b

def make_pipeline(config=None):           # 签名必须是 make_xxx(config)
    g = StateGraph(MyFlowState)
    g.add_node("a", step_a)
    g.add_node("b", step_b)
    g.set_entry_point("a")
    g.add_edge("a", "b")
    g.add_edge("b", END)
    return g.compile()                     # 不要自己设 checkpointer,顶层会注入

约束:

  • 工厂签名必须 make_xxx(config),且要在 langgraph.jsonaegra.json 两个文件graphs 段都加一行(生产 Aegra runtime 读 aegra.json,本地 dev 读 langgraph.json,meeting_aggregator_realtime/pipeline.py:25-29 注释明确点出这一点)。
  • 节点函数若要拿 config(取 user_sub 等),参数注解必须是 RunnableConfig,LangGraph 才会传——dict 注解实测不传(meeting_aggregator/pipeline.py:67-71 注释)。
  • 不要在 compile() 里自己设 checkpointer,pipeline 是一次性任务,LangGraph Server / Aegra 顶层 checkpointer 会自动注入做 thread 持久化(weekly_report/pipeline.py:52-54)。
  • 需要 human-in-the-loop 就 compile(interrupt_before=[...])(参考已 deprecated 的 ba_report/pipeline.py:232-243),但注意现役 ba_report 已改回无 interrupt 的单步模式,新 pipeline 上 interrupt 前先确认调用方支持 resume。

Common Pitfalls / 实战 Tips

  • 子代理取消不是即时的:cancel_event 只在 astream 的 chunk 边界检查,单个长工具调用要等它产出下一个 chunk 才会被打断(executor.py:261-264 代码注释原文)。别指望毫秒级停。
  • 超出 3 个并行 task 会被静默截断:SubagentLimitMiddleware.after_model 直接丢弃超出 MAX_CONCURRENT_SUBAGENTS(默认 3,clamp 到 [2,4])的 task 调用并 logger.warning(subagent_limit_middleware.py:54-67)。LLM 一口气发 5 个 task,只有前 3 个真跑。
  • bash 子代理常常"不存在":host bash 未放开时它从可用列表被剔除,LLM 调它会拿到 LOCAL_BASH_SUBAGENT_DISABLED_MESSAGE(task_tool.py:69-70)。这是设计,不是 bug。
  • 子代理和父共享 sandbox:task_tool 把父的 sandbox_state/thread_data/thread_id 透传给 executor(task_tool.py:92-122),子代理读写的是同一套 /mnt/user-data/* 目录,不是隔离文件系统——隔离的是对话上下文,不是磁盘。
  • ba_report 别再用 pipeline.py:它已 DeprecationWarning,活跃路径是 step_runner.py(langgraph.json:13 注册的就是 make_step_runner),改 ba_report 逻辑去改 step_runner。
  • pipeline 节点里的 LLM 不是 agent:weekly_report 的 llm 节点跑完强制走 email,LLM 没有"再想想"的回路。要 agent 那种自由发挥得显式在节点内建 agent(ba_report 的 Agent 模式),且那是节点内的有界 ainvoke,不享受顶层中间件链/子代理委派。

References

  • deer-flow/backend/packages/harness/deerflow/tools/builtins/task_tool.py:22-248 — task 工具:委派、后台轮询、SSE、协作式取消
  • deer-flow/backend/packages/harness/deerflow/subagents/executor.py:128-612 — SubagentExecutor:双线程池、事件循环隔离、超时、取消、后台任务表
  • deer-flow/backend/packages/harness/deerflow/subagents/registry.py:13-99 — 子代理注册表 + config.yaml 覆盖 + host bash 可见性
  • deer-flow/backend/packages/harness/deerflow/subagents/config.py:6-28 — SubagentConfig 字段与默认值
  • deer-flow/backend/packages/harness/deerflow/agents/middlewares/subagent_limit_middleware.py:24-75 — after_model 截断超限并行 task
  • deer-flow/backend/langgraph.json:8-13 — agent 与 4 条 pipeline 平级注册
  • deer-flow/backend/packages/harness/deerflow/pipelines/weekly_report/pipeline.py:27-54 — 最纯线性 DAG 工厂
  • deer-flow/backend/packages/harness/deerflow/pipelines/meeting_aggregator/pipeline.py:44-93 — closure 依赖注入式 6 节点 DAG
  • deer-flow/backend/packages/harness/deerflow/pipelines/meeting_aggregator_realtime/pipeline.py:55-121 — 9 节点(3 抽取 + 6 复用)DAG
  • deer-flow/backend/packages/harness/deerflow/pipelines/ba_report/step_runner.py:1-414 — 现役 3 节点单步执行器(Skill/Agent 双模式)
PageRelationship
MCP 集成与工具装配本章 task 工具由该章 get_available_toolssubagent_enabled 时装配
LeadAgent 设计与图构建本章子代理由该章 Lead Agent 通过 task 工具委派,subagent_enabled 在该章 _build_middlewares 解析
Agent 中间件链机制本章 SubagentLimitMiddleware / 子代理复用的中间件链在该章详解
沙箱架构与文件工具本章子代理共享父 sandbox,bash 子代理依赖该章 host bash 开关
Aegra 运行时与 LangGraph本章 pipeline 注册在 langgraph.json/aegra.json,由该章运行时触发
会议洞察引擎本章 meeting_aggregator(_realtime) pipeline 是该章引擎的图实现
BA 专家报告工作流本章 ba_report step_runner 是该章工作流的确定性 graph 载体

公司内部参考 · 由 claude-wiki-gen 基于源码自动生成的二次分析