主题
子代理委派系统
本章目标:
- 讲清为什么 DeerFlow 用
task工具把长任务委派给子代理(subagent),而不是让单个 lead agent 一路死扛- 拆解后台执行引擎
SubagentExecutor:调度线程池(scheduler,3 worker)+ 持久化隔离事件循环、15 分钟超时、协作式取消- 给出注册自定义子代理的最小模板与约束清单,并解释并发上限(
MAX_CONCURRENT_SUBAGENTS=3)如何被中间件强制执行
TL;DR
DeerFlow 通过 task 工具把"复杂、多步、产出冗长"的任务委派给独立上下文中的子代理。子代理由 SubagentExecutor.execute_async() 提交到 _scheduler_pool(3 worker 的 ThreadPoolExecutor)调度,真正的协程运行在一条长期存活的"隔离事件循环"线程上,带 15 分钟(900 秒)超时与协作式取消。task_tool 后台每 5 秒轮询一次结果,并把 task_started / task_running / task_completed / task_failed / task_timed_out 事件经 StreamWriter 推给前端。并发由 SubagentLimitMiddleware 在 after_model 阶段截断超过 3 个的并行 task 调用强制保证。子代理的 token 用量由 SubagentTokenCollector 收集并回灌父级 RunJournal。
Overview
为什么需要"子代理委派",而不是单个 agent 死扛?核心是上下文隔离与职责分工。
一个长任务(例如"先探索代码库,再批量改 10 个文件,再跑测试")若全部塞进 lead agent 的单条对话,会带来三个问题:上下文窗口被探索阶段的冗长输出污染、推理链过长导致质量下降、无法并行。task 工具的文档字符串把使用场景说得很清楚:用于"复杂多步任务""产出冗长的任务""需要把上下文与主对话隔离"以及"并行研究/探索" task_tool.py:197-205。
委派的本质是:lead agent 调用 task(description, prompt, subagent_type),系统在独立的 agent 实例 + 独立的消息历史里跑这个子任务,只把最终结果摘要回灌给父对话。子代理拥有与父代理相同的 sandbox 环境(沙箱状态、thread_data 透传),但带有自己的 system prompt、工具白名单、技能白名单和模型 task_tool.py:233-261。为防止无限嵌套,子代理默认禁用 task 工具本身 config.py:31,且创建子代理工具集时强制 subagent_enabled=False task_tool.py:273-281。
Architecture
整个委派系统由三层构成:工具层(task_tool 负责参数解析、轮询、SSE 事件)、执行引擎层(SubagentExecutor 负责后台调度与生命周期)、注册/配置层(registry + config 负责子代理定义解析与覆盖)。并发上限由独立的 SubagentLimitMiddleware 在中间件链中强制。
值得强调的是后台执行的实际线程模型。代码中只有一个进程级线程池 _scheduler_pool(max_workers=3,线程名前缀 subagent-scheduler-),用于调度与编排后台任务 executor.py:88-89。真正的 _aexecute 协程并不在该池里同步跑,而是通过 asyncio.run_coroutine_threadsafe 提交到一条长期存活的隔离事件循环线程(subagent-persistent-loop)执行 executor.py:91-97 executor.py:713-721。这条持久 loop 复用一个长生命周期事件循环,避免每次执行都新建/关闭绑定异步资源(如 httpx 客户端)的临时 loop executor.py:595-602。
⚠️ 说明:
backend/CLAUDE.md描述为"双线程池_scheduler_pool(3)+_execution_pool(3)"。回源码核实:executor.py中仅定义了_scheduler_pool(max_workers=3)executor.py:89,不存在名为_execution_pool的对象;执行侧由持久化隔离事件循环线程承担。本章以源码为准。
并发上限的强制点不在执行引擎里,而在中间件链:SubagentLimitMiddleware 在 lead agent 的 after_model / aafter_model 钩子中检查最后一条 AIMessage 的 tool_calls,若 task 调用数超过 max_concurrent(默认 MAX_CONCURRENT_SUBAGENTS=3,被 clamp 到区间 [2,4]),则截断多余调用 subagent_limit_middleware.py:41-72。它仅在 subagent_enabled=True 时被追加进中间件链 agent.py:301-305。
Source:
- 调度线程池定义 executor.py:88-89
- 持久化隔离事件循环 executor.py:150-178
- 并发常量
MAX_CONCURRENT_SUBAGENTSexecutor.py:749 - 截断中间件 subagent_limit_middleware.py:25-76
- 中间件链装配 agent.py:301-305
Components / Subsystems
task 工具(task_tool)
职责:解析 description / prompt / subagent_type,解析子代理配置,创建 SubagentExecutor,提交后台执行,然后在后台代替 LLM 轮询结果并发 SSE 事件,最后把结果摘要作为字符串返回给父对话。
关键实现:task_tool 是一个 @tool("task", parse_docstring=True) 的 async 工具 task_tool.py:169-176。它用 tool_call_id 作为 task_id 提升可追溯性 task_tool.py:298-299,轮询间隔 5 秒,轮询上限为 (timeout + 60) // 5 作为安全网 task_tool.py:305-306。当父 run 被取消(asyncio.CancelledError)时,它会向后台任务发出协作式取消信号,并 shielded 等待子代理到达终态,以确保 token 用量被回灌 task_tool.py:397-419。
SubagentExecutor(后台执行引擎)
职责:按配置过滤工具、加载技能、构建初始 state、创建子代理、流式执行并收集 AI 消息与 token 用量,管理后台任务生命周期与超时。
关键类:SubagentExecutor executor.py:224-237。它在初始化时即用 _filter_tools 按 config.tools(白名单)与 config.disallowed_tools(黑名单)过滤工具 executor.py:194-221 executor.py:268-273。execute_async() 创建 PENDING 的 SubagentResult 存入全局 _background_tasks 字典(由 _background_tasks_lock 保护),再把 run_task 提交给 _scheduler_pool executor.py:677-746。_aexecute 用 agent.astream(stream_mode="values") 流式执行,逐块捕获 AIMessage,并在每个迭代边界检查 cancel_event 实现协作式取消 executor.py:471-484。子代理 agent 复用与 lead agent 共享的中间件组合 build_subagent_runtime_middlewares,但 thinking_enabled=False executor.py:282-297。
registry(注册表)
职责:按名称解析子代理配置,实现 Codex 式的三层配置叠加。
关键函数:get_subagent_config(name) 的解析顺序为:① 内置子代理(BUILTIN_SUBAGENTS)→ ② config.yaml 的 custom_agents 段 → ③ agents 段的 per-agent 覆盖(timeout / max_turns / model / skills)registry.py:50-116。注意全局默认(顶层 timeout_seconds / max_turns)只作用于内置子代理,不覆盖自定义子代理自带的值 registry.py:72-99。get_available_subagent_names() 还会在 host bash 不被允许时从可见列表里剔除 bash 子代理 registry.py:150-165。
builtins(内置子代理)
职责:提供两个开箱即用的子代理定义,注册在 BUILTIN_SUBAGENTS 字典 builtins/init.py:12-15。
general-purpose:复杂多步任务的通用 agent,tools=None(继承父级全部工具),禁用task / ask_clarification / present_files,max_turns=100general_purpose.py:5-50。bash:命令执行专家,仅给沙箱工具["bash","ls","read_file","write_file","str_replace"],max_turns=60bash_agent.py:5-50。
token_collector(子代理用量归并)
职责:作为 LangChain BaseCallbackHandler,在子代理内部收集每次 LLM 调用的 token 用量,执行完后回灌父级 RunJournal。
关键类:SubagentTokenCollector token_collector.py:15-23。它在 on_llm_end 钩子用 run_id 去重,提取 usage_metadata 累加为记录列表 token_collector.py:24-59。_aexecute 把它作为 callbacks 注入 run_config,执行结束用 snapshot_records() 写入 result.token_usage_records executor.py:436-445。task_tool 通过 _report_subagent_usage 找到 runtime callbacks 中带 record_external_llm_usage_records 的 journal 并回灌,且用 usage_reported 标志保证每个子代理只上报一次 task_tool.py:128-146。
Data Flow
下面这条时序覆盖从 lead agent 调用 task() 到结果回灌的完整链路。
子代理本身也是一个状态机,理解其状态有助于排查"为什么任务卡在 RUNNING"。
Implementation Details
后台任务的调度与超时控制是引擎的核心。下面是 execute_async 内 run_task 闭包的关键片段:
python
def run_task():
with _background_tasks_lock:
_background_tasks[task_id].status = SubagentStatus.RUNNING
_background_tasks[task_id].started_at = datetime.now()
result_holder = _background_tasks[task_id]
try:
# 直接提交到持久隔离 loop,后台路径不再创建临时 loop
execution_future = _submit_to_isolated_loop_in_context(
parent_context,
lambda: self._aexecute(task, result_holder),
)
try:
exec_result = execution_future.result(timeout=self.config.timeout_seconds)
# ... 写回 status/result/ai_messages ...
except FuturesTimeoutError:
with _background_tasks_lock:
if _background_tasks[task_id].status == SubagentStatus.RUNNING:
_background_tasks[task_id].status = SubagentStatus.TIMED_OUT
result_holder.cancel_event.set() # 协作式取消信号
execution_future.cancel()解读:run_task 跑在 _scheduler_pool 的 worker 上,它把真正的协程通过 _submit_to_isolated_loop_in_context 提交给持久隔离 loop,然后用 future.result(timeout=...) 阻塞等待。超时不会强杀线程(子代理线程无法被 Future.cancel() 强制中断),而是设置 cancel_event 由 _aexecute 在下一次 astream 迭代边界协作式退出 executor.py:706-746 executor.py:752-768。
并发截断的关键逻辑在 SubagentLimitMiddleware._truncate_task_calls:
python
task_indices = [i for i, tc in enumerate(tool_calls) if tc.get("name") == "task"]
if len(task_indices) <= self.max_concurrent:
return None
indices_to_drop = set(task_indices[self.max_concurrent :])
truncated_tool_calls = [tc for i, tc in enumerate(tool_calls) if i not in indices_to_drop]
updated_msg = clone_ai_message_with_tool_calls(last_msg, truncated_tool_calls)
return {"messages": [updated_msg]}解读:它只在 after_model 后看最后一条 AIMessage,统计 name == "task" 的调用并保留前 max_concurrent 个,用相同 message id 回写 AIMessage 触发替换 subagent_limit_middleware.py:54-68。这种"动作层硬截断"比"提示词软约束"更可靠。
速查表
SSE 事件类型矩阵
| 事件类型 | 触发时机 | 关键字段 | Source |
|---|---|---|---|
task_started | 后台任务提交后立即发送 | task_id, description | task_tool.py:312 |
task_running | 每捕获一条新 AIMessage | message, message_index, total_messages | task_tool.py:336-344 |
task_completed | 状态变为 COMPLETED | result, usage | task_tool.py:353 |
task_failed | 状态变为 FAILED 或任务消失 | error, usage | task_tool.py:360 |
task_cancelled | 状态变为 CANCELLED | error, usage | task_tool.py:367 |
task_timed_out | 状态变为 TIMED_OUT 或轮询超限 | error, usage | task_tool.py:374 task_tool.py:395 |
内置子代理对比
| 维度 | general-purpose | bash | Source |
|---|---|---|---|
| 定位 | 复杂多步、探索+修改 | bash 命令执行专家 | general_purpose.py:7-15 bash_agent.py:7-15 |
tools | None(继承父级全部) | 仅 bash,ls,read_file,write_file,str_replace | general_purpose.py:46 bash_agent.py:46 |
disallowed_tools | task,ask_clarification,present_files | task,ask_clarification,present_files | general_purpose.py:47 bash_agent.py:47 |
max_turns | 100 | 60 | general_purpose.py:49 bash_agent.py:49 |
| 运行时可见性 | 始终可见 | host bash 不允许时被剔除 | registry.py:163-165 |
扩展指南
注册自定义子代理(最小模板)
自定义子代理通过 config.yaml 的 subagents.custom_agents 段声明,无需写代码。registry 会在内置查找失败后回退到该段构造 SubagentConfig registry.py:22-47。最小模板:
yaml
subagents:
enabled: true # 总开关,否则 task 工具不挂载
timeout_seconds: 900 # 全局默认(仅作用于内置子代理)
custom_agents:
code-reviewer:
description: "代码审查专家;当需要审查 diff 时委派给它" # 必填
system_prompt: "你是代码审查专家,只读分析,输出问题清单。" # 必填
tools: ["bash", "ls", "read_file"] # 可选;None=继承父级全部
disallowed_tools: ["task", "ask_clarification", "present_files"]
skills: null # 可选;None=继承全部启用技能,[]=不加载
model: "inherit" # 可选;'inherit' 用父级模型
max_turns: 50
timeout_seconds: 600
agents: # 可选:对已有(含内置)子代理做 per-agent 覆盖
code-reviewer:
model: "gpt-4o"约束清单(从校验逻辑读出)
description与system_prompt为必填字段;tools/skills默认为None(继承),disallowed_tools默认["task","ask_clarification","present_files"]subagents_config.py:34-68。max_turns与timeout_seconds受 Pydanticge=1约束,必须 ≥ 1;model若设置须min_length=1subagents_config.py:13-67。- 子代理名不要与内置名(
general-purpose/bash)冲突:get_subagent_config先查内置,内置优先 registry.py:65-69;get_subagent_names合并时同名 custom 不会再追加 registry.py:142-147。 - 全局默认(顶层
timeout_seconds/max_turns)只覆盖内置子代理,自定义子代理保留自带值;模型/技能没有全局默认,只能 per-agent 覆盖 registry.py:72-112。 - 强烈建议保留
disallowed_tools含task,否则子代理可再委派造成嵌套;系统在创建子代理工具集时已强制subagent_enabled=False兜底 task_tool.py:273-281。
Configuration
| 配置项 | 默认值 | 作用 | Source |
|---|---|---|---|
MAX_CONCURRENT_SUBAGENTS | 3 | 单次模型响应允许的并行 task 调用上限 | executor.py:749 |
max_concurrent_subagents(runtime cfg) | 3 | 经 configurable 传入,clamp 到 [2,4] | agent.py:304-305 subagent_limit_middleware.py:16-22 |
_scheduler_pool workers | 3 | 后台任务调度线程池大小 | executor.py:89 |
subagents.timeout_seconds | 900(15 分钟) | 子代理执行超时(全局默认,作用于内置) | subagents_config.py:74-78 |
SubagentConfig.timeout_seconds | 900 | 单子代理超时默认 | config.py:35 |
SubagentConfig.max_turns | 50 | agent 递归上限(recursion_limit) | config.py:34 executor.py:441-442 |
subagents.enabled | — | 子代理委派总开关;关闭则 task 工具与截断中间件均不挂载 | agent.py:302-305 |
| 轮询间隔 / 上限 | 5s / (timeout+60)//5 | task_tool 后台轮询节奏与安全网 | task_tool.py:305-306 task_tool.py:380-396 |
Common Pitfalls / Tips
- 超额并行被静默截断:当 LLM 一次生成超过 3 个
task调用,SubagentLimitMiddleware会丢弃多余的,只保留前 3 个并打warning日志Truncated N excess task tool call(s),被丢弃的 task 不会执行也不会报错给模型 subagent_limit_middleware.py:63-68。 - 取消是协作式的,不会立即生效:
cancel_event只在astream的迭代边界被检查,单次迭代内的长时工具调用不会被中途打断,要等下一个 chunk yield executor.py:472-475。 - 超时是双层保险:执行侧
future.result(timeout)设 TIMED_OUT;task_tool还有(timeout+60)//5的轮询上限作为兜底,防止后台任务卡死时父对话永远等待 task_tool.py:389-396。 bash子代理可能不可见:host bash 未被允许时get_available_subagent_names会剔除bash,且task_tool对subagent_type == "bash"二次校验,返回禁用提示 task_tool.py:221-224。- 终态才清理:
cleanup_background_task只删除处于终态(COMPLETED/FAILED/CANCELLED/TIMED_OUT)或已有completed_at的任务,避免与后台执行器写入竞争 executor.py:805-828。 - token 用量只回灌一次:
_report_subagent_usage用result.usage_reported防重复;若 runtime callbacks 里没有带record_external_llm_usage_records的 journal,用量不会被记录(仅 debug 日志)task_tool.py:128-146。
References
- backend/packages/harness/deerflow/subagents/executor.py — 后台执行引擎、调度池、隔离 loop、超时与取消
- backend/packages/harness/deerflow/tools/builtins/task_tool.py —
task工具:配置解析、轮询、SSE 事件、用量回灌 - backend/packages/harness/deerflow/subagents/registry.py — 三层配置叠加与可见性过滤
- backend/packages/harness/deerflow/subagents/config.py —
SubagentConfig数据类与模型解析 - backend/packages/harness/deerflow/config/subagents_config.py —
config.yaml子代理配置 Schema 与覆盖逻辑 - backend/packages/harness/deerflow/agents/middlewares/subagent_limit_middleware.py — 并发截断中间件
- backend/packages/harness/deerflow/subagents/token_collector.py — 子代理 token 用量收集器
- backend/packages/harness/deerflow/subagents/builtins/general_purpose.py —
general-purpose内置定义 - backend/packages/harness/deerflow/subagents/builtins/bash_agent.py —
bash内置定义
Related Pages
| 章节 | 关系 |
|---|---|
| 12-中间件链机制 | SubagentLimitMiddleware 是中间件链的一环,本章的并发截断依赖其装配顺序 |
| 20-工具系统与内置工具 | task 工具由 get_available_tools 装配,子代理工具集复用同一套工具解析 |
| 17-沙箱系统架构 | 子代理透传父级 sandbox_state,与父代理共享同一 thread 沙箱环境 |
| 35-可观测性-Tracing与Token用量 | SubagentTokenCollector 收集的用量回灌父级 RunJournal,纳入全局 token 统计 |