Skip to content

子代理委派系统

本章目标:

  • 讲清为什么 DeerFlow 用 task 工具把长任务委派给子代理(subagent),而不是让单个 lead agent 一路死扛
  • 拆解后台执行引擎 SubagentExecutor:调度线程池(scheduler,3 worker)+ 持久化隔离事件循环、15 分钟超时、协作式取消
  • 给出注册自定义子代理的最小模板与约束清单,并解释并发上限(MAX_CONCURRENT_SUBAGENTS=3)如何被中间件强制执行

TL;DR

DeerFlow 通过 task 工具把"复杂、多步、产出冗长"的任务委派给独立上下文中的子代理。子代理由 SubagentExecutor.execute_async() 提交到 _scheduler_pool(3 worker 的 ThreadPoolExecutor)调度,真正的协程运行在一条长期存活的"隔离事件循环"线程上,带 15 分钟(900 秒)超时与协作式取消。task_tool 后台每 5 秒轮询一次结果,并把 task_started / task_running / task_completed / task_failed / task_timed_out 事件经 StreamWriter 推给前端。并发由 SubagentLimitMiddlewareafter_model 阶段截断超过 3 个的并行 task 调用强制保证。子代理的 token 用量由 SubagentTokenCollector 收集并回灌父级 RunJournal。

Overview

为什么需要"子代理委派",而不是单个 agent 死扛?核心是上下文隔离职责分工

一个长任务(例如"先探索代码库,再批量改 10 个文件,再跑测试")若全部塞进 lead agent 的单条对话,会带来三个问题:上下文窗口被探索阶段的冗长输出污染、推理链过长导致质量下降、无法并行。task 工具的文档字符串把使用场景说得很清楚:用于"复杂多步任务""产出冗长的任务""需要把上下文与主对话隔离"以及"并行研究/探索" task_tool.py:197-205

委派的本质是:lead agent 调用 task(description, prompt, subagent_type),系统在独立的 agent 实例 + 独立的消息历史里跑这个子任务,只把最终结果摘要回灌给父对话。子代理拥有与父代理相同的 sandbox 环境(沙箱状态、thread_data 透传),但带有自己的 system prompt、工具白名单、技能白名单和模型 task_tool.py:233-261。为防止无限嵌套,子代理默认禁用 task 工具本身 config.py:31,且创建子代理工具集时强制 subagent_enabled=False task_tool.py:273-281

Architecture

整个委派系统由三层构成:工具层(task_tool 负责参数解析、轮询、SSE 事件)、执行引擎层(SubagentExecutor 负责后台调度与生命周期)、注册/配置层(registry + config 负责子代理定义解析与覆盖)。并发上限由独立的 SubagentLimitMiddleware 在中间件链中强制。

值得强调的是后台执行的实际线程模型。代码中只有一个进程级线程池 _scheduler_pool(max_workers=3,线程名前缀 subagent-scheduler-),用于调度与编排后台任务 executor.py:88-89。真正的 _aexecute 协程并不在该池里同步跑,而是通过 asyncio.run_coroutine_threadsafe 提交到一条长期存活的隔离事件循环线程(subagent-persistent-loop)执行 executor.py:91-97 executor.py:713-721。这条持久 loop 复用一个长生命周期事件循环,避免每次执行都新建/关闭绑定异步资源(如 httpx 客户端)的临时 loop executor.py:595-602

⚠️ 说明:backend/CLAUDE.md 描述为"双线程池 _scheduler_pool(3)+ _execution_pool(3)"。回源码核实:executor.py 中仅定义了 _scheduler_pool(max_workers=3)executor.py:89,不存在名为 _execution_pool 的对象;执行侧由持久化隔离事件循环线程承担。本章以源码为准。

并发上限的强制点不在执行引擎里,而在中间件链:SubagentLimitMiddleware 在 lead agent 的 after_model / aafter_model 钩子中检查最后一条 AIMessage 的 tool_calls,若 task 调用数超过 max_concurrent(默认 MAX_CONCURRENT_SUBAGENTS=3,被 clamp 到区间 [2,4]),则截断多余调用 subagent_limit_middleware.py:41-72。它仅在 subagent_enabled=True 时被追加进中间件链 agent.py:301-305

Source:

Components / Subsystems

task 工具(task_tool)

职责:解析 description / prompt / subagent_type,解析子代理配置,创建 SubagentExecutor,提交后台执行,然后在后台代替 LLM 轮询结果并发 SSE 事件,最后把结果摘要作为字符串返回给父对话。

关键实现:task_tool 是一个 @tool("task", parse_docstring=True) 的 async 工具 task_tool.py:169-176。它用 tool_call_id 作为 task_id 提升可追溯性 task_tool.py:298-299,轮询间隔 5 秒,轮询上限为 (timeout + 60) // 5 作为安全网 task_tool.py:305-306。当父 run 被取消(asyncio.CancelledError)时,它会向后台任务发出协作式取消信号,并 shielded 等待子代理到达终态,以确保 token 用量被回灌 task_tool.py:397-419

SubagentExecutor(后台执行引擎)

职责:按配置过滤工具、加载技能、构建初始 state、创建子代理、流式执行并收集 AI 消息与 token 用量,管理后台任务生命周期与超时。

关键类:SubagentExecutor executor.py:224-237。它在初始化时即用 _filter_toolsconfig.tools(白名单)与 config.disallowed_tools(黑名单)过滤工具 executor.py:194-221 executor.py:268-273execute_async() 创建 PENDINGSubagentResult 存入全局 _background_tasks 字典(由 _background_tasks_lock 保护),再把 run_task 提交给 _scheduler_pool executor.py:677-746_aexecuteagent.astream(stream_mode="values") 流式执行,逐块捕获 AIMessage,并在每个迭代边界检查 cancel_event 实现协作式取消 executor.py:471-484。子代理 agent 复用与 lead agent 共享的中间件组合 build_subagent_runtime_middlewares,但 thinking_enabled=False executor.py:282-297

registry(注册表)

职责:按名称解析子代理配置,实现 Codex 式的三层配置叠加。

关键函数:get_subagent_config(name) 的解析顺序为:① 内置子代理(BUILTIN_SUBAGENTS)→ ② config.yamlcustom_agents 段 → ③ agents 段的 per-agent 覆盖(timeout / max_turns / model / skills)registry.py:50-116。注意全局默认(顶层 timeout_seconds / max_turns)只作用于内置子代理,不覆盖自定义子代理自带的值 registry.py:72-99get_available_subagent_names() 还会在 host bash 不被允许时从可见列表里剔除 bash 子代理 registry.py:150-165

builtins(内置子代理)

职责:提供两个开箱即用的子代理定义,注册在 BUILTIN_SUBAGENTS 字典 builtins/init.py:12-15

  • general-purpose:复杂多步任务的通用 agent,tools=None(继承父级全部工具),禁用 task / ask_clarification / present_files,max_turns=100 general_purpose.py:5-50
  • bash:命令执行专家,仅给沙箱工具 ["bash","ls","read_file","write_file","str_replace"],max_turns=60 bash_agent.py:5-50

token_collector(子代理用量归并)

职责:作为 LangChain BaseCallbackHandler,在子代理内部收集每次 LLM 调用的 token 用量,执行完后回灌父级 RunJournal。

关键类:SubagentTokenCollector token_collector.py:15-23。它在 on_llm_end 钩子用 run_id 去重,提取 usage_metadata 累加为记录列表 token_collector.py:24-59_aexecute 把它作为 callbacks 注入 run_config,执行结束用 snapshot_records() 写入 result.token_usage_records executor.py:436-445task_tool 通过 _report_subagent_usage 找到 runtime callbacks 中带 record_external_llm_usage_records 的 journal 并回灌,且用 usage_reported 标志保证每个子代理只上报一次 task_tool.py:128-146

Data Flow

下面这条时序覆盖从 lead agent 调用 task() 到结果回灌的完整链路。

子代理本身也是一个状态机,理解其状态有助于排查"为什么任务卡在 RUNNING"。

Implementation Details

后台任务的调度与超时控制是引擎的核心。下面是 execute_asyncrun_task 闭包的关键片段:

python
def run_task():
    with _background_tasks_lock:
        _background_tasks[task_id].status = SubagentStatus.RUNNING
        _background_tasks[task_id].started_at = datetime.now()
        result_holder = _background_tasks[task_id]
    try:
        # 直接提交到持久隔离 loop,后台路径不再创建临时 loop
        execution_future = _submit_to_isolated_loop_in_context(
            parent_context,
            lambda: self._aexecute(task, result_holder),
        )
        try:
            exec_result = execution_future.result(timeout=self.config.timeout_seconds)
            # ... 写回 status/result/ai_messages ...
        except FuturesTimeoutError:
            with _background_tasks_lock:
                if _background_tasks[task_id].status == SubagentStatus.RUNNING:
                    _background_tasks[task_id].status = SubagentStatus.TIMED_OUT
            result_holder.cancel_event.set()   # 协作式取消信号
            execution_future.cancel()

解读:run_task 跑在 _scheduler_pool 的 worker 上,它把真正的协程通过 _submit_to_isolated_loop_in_context 提交给持久隔离 loop,然后用 future.result(timeout=...) 阻塞等待。超时不会强杀线程(子代理线程无法被 Future.cancel() 强制中断),而是设置 cancel_event_aexecute 在下一次 astream 迭代边界协作式退出 executor.py:706-746 executor.py:752-768

并发截断的关键逻辑在 SubagentLimitMiddleware._truncate_task_calls:

python
task_indices = [i for i, tc in enumerate(tool_calls) if tc.get("name") == "task"]
if len(task_indices) <= self.max_concurrent:
    return None
indices_to_drop = set(task_indices[self.max_concurrent :])
truncated_tool_calls = [tc for i, tc in enumerate(tool_calls) if i not in indices_to_drop]
updated_msg = clone_ai_message_with_tool_calls(last_msg, truncated_tool_calls)
return {"messages": [updated_msg]}

解读:它只在 after_model 后看最后一条 AIMessage,统计 name == "task" 的调用并保留前 max_concurrent 个,用相同 message id 回写 AIMessage 触发替换 subagent_limit_middleware.py:54-68。这种"动作层硬截断"比"提示词软约束"更可靠。

速查表

SSE 事件类型矩阵

事件类型触发时机关键字段Source
task_started后台任务提交后立即发送task_id, descriptiontask_tool.py:312
task_running每捕获一条新 AIMessagemessage, message_index, total_messagestask_tool.py:336-344
task_completed状态变为 COMPLETEDresult, usagetask_tool.py:353
task_failed状态变为 FAILED 或任务消失error, usagetask_tool.py:360
task_cancelled状态变为 CANCELLEDerror, usagetask_tool.py:367
task_timed_out状态变为 TIMED_OUT 或轮询超限error, usagetask_tool.py:374 task_tool.py:395

内置子代理对比

维度general-purposebashSource
定位复杂多步、探索+修改bash 命令执行专家general_purpose.py:7-15 bash_agent.py:7-15
toolsNone(继承父级全部)bash,ls,read_file,write_file,str_replacegeneral_purpose.py:46 bash_agent.py:46
disallowed_toolstask,ask_clarification,present_filestask,ask_clarification,present_filesgeneral_purpose.py:47 bash_agent.py:47
max_turns10060general_purpose.py:49 bash_agent.py:49
运行时可见性始终可见host bash 不允许时被剔除registry.py:163-165

扩展指南

注册自定义子代理(最小模板)

自定义子代理通过 config.yamlsubagents.custom_agents 段声明,无需写代码。registry 会在内置查找失败后回退到该段构造 SubagentConfig registry.py:22-47。最小模板:

yaml
subagents:
  enabled: true            # 总开关,否则 task 工具不挂载
  timeout_seconds: 900     # 全局默认(仅作用于内置子代理)
  custom_agents:
    code-reviewer:
      description: "代码审查专家;当需要审查 diff 时委派给它"   # 必填
      system_prompt: "你是代码审查专家,只读分析,输出问题清单。" # 必填
      tools: ["bash", "ls", "read_file"]   # 可选;None=继承父级全部
      disallowed_tools: ["task", "ask_clarification", "present_files"]
      skills: null          # 可选;None=继承全部启用技能,[]=不加载
      model: "inherit"      # 可选;'inherit' 用父级模型
      max_turns: 50
      timeout_seconds: 600
  agents:                   # 可选:对已有(含内置)子代理做 per-agent 覆盖
    code-reviewer:
      model: "gpt-4o"

约束清单(从校验逻辑读出)

  • descriptionsystem_prompt 为必填字段;tools/skills 默认为 None(继承),disallowed_tools 默认 ["task","ask_clarification","present_files"] subagents_config.py:34-68
  • max_turnstimeout_seconds 受 Pydantic ge=1 约束,必须 ≥ 1;model 若设置须 min_length=1 subagents_config.py:13-67
  • 子代理名不要与内置名(general-purpose / bash)冲突:get_subagent_config 先查内置,内置优先 registry.py:65-69;get_subagent_names 合并时同名 custom 不会再追加 registry.py:142-147
  • 全局默认(顶层 timeout_seconds / max_turns)只覆盖内置子代理,自定义子代理保留自带值;模型/技能没有全局默认,只能 per-agent 覆盖 registry.py:72-112
  • 强烈建议保留 disallowed_toolstask,否则子代理可再委派造成嵌套;系统在创建子代理工具集时已强制 subagent_enabled=False 兜底 task_tool.py:273-281

Configuration

配置项默认值作用Source
MAX_CONCURRENT_SUBAGENTS3单次模型响应允许的并行 task 调用上限executor.py:749
max_concurrent_subagents(runtime cfg)3configurable 传入,clamp 到 [2,4]agent.py:304-305 subagent_limit_middleware.py:16-22
_scheduler_pool workers3后台任务调度线程池大小executor.py:89
subagents.timeout_seconds900(15 分钟)子代理执行超时(全局默认,作用于内置)subagents_config.py:74-78
SubagentConfig.timeout_seconds900单子代理超时默认config.py:35
SubagentConfig.max_turns50agent 递归上限(recursion_limit)config.py:34 executor.py:441-442
subagents.enabled子代理委派总开关;关闭则 task 工具与截断中间件均不挂载agent.py:302-305
轮询间隔 / 上限5s / (timeout+60)//5task_tool 后台轮询节奏与安全网task_tool.py:305-306 task_tool.py:380-396

Common Pitfalls / Tips

  • 超额并行被静默截断:当 LLM 一次生成超过 3 个 task 调用,SubagentLimitMiddleware 会丢弃多余的,只保留前 3 个并打 warning 日志 Truncated N excess task tool call(s),被丢弃的 task 不会执行也不会报错给模型 subagent_limit_middleware.py:63-68
  • 取消是协作式的,不会立即生效:cancel_event 只在 astream 的迭代边界被检查,单次迭代内的长时工具调用不会被中途打断,要等下一个 chunk yield executor.py:472-475
  • 超时是双层保险:执行侧 future.result(timeout) 设 TIMED_OUT;task_tool 还有 (timeout+60)//5 的轮询上限作为兜底,防止后台任务卡死时父对话永远等待 task_tool.py:389-396
  • bash 子代理可能不可见:host bash 未被允许时 get_available_subagent_names 会剔除 bash,且 task_toolsubagent_type == "bash" 二次校验,返回禁用提示 task_tool.py:221-224
  • 终态才清理:cleanup_background_task 只删除处于终态(COMPLETED/FAILED/CANCELLED/TIMED_OUT)或已有 completed_at 的任务,避免与后台执行器写入竞争 executor.py:805-828
  • token 用量只回灌一次:_report_subagent_usageresult.usage_reported 防重复;若 runtime callbacks 里没有带 record_external_llm_usage_records 的 journal,用量不会被记录(仅 debug 日志)task_tool.py:128-146

References

章节关系
12-中间件链机制SubagentLimitMiddleware 是中间件链的一环,本章的并发截断依赖其装配顺序
20-工具系统与内置工具task 工具由 get_available_tools 装配,子代理工具集复用同一套工具解析
17-沙箱系统架构子代理透传父级 sandbox_state,与父代理共享同一 thread 沙箱环境
35-可观测性-Tracing与Token用量SubagentTokenCollector 收集的用量回灌父级 RunJournal,纳入全局 token 统计

公司内部参考 · 由 claude-wiki-gen 基于源码自动生成的二次分析