Skip to content

嵌入式 Python 客户端

本章目标:

  1. 理解为什么 DeerFlow 需要一个与 Gateway API 对等、但无需启动任何 HTTP 进程的进程内客户端 DeerFlowClient
  2. 掌握 chat()(同步累积 delta)与 stream()(订阅 LangGraph stream_mode)的工作机制,以及两者与 Gateway 流式管线的关系。
  3. 学会用 DeerFlowClient 把 DeerFlow 智能体嵌入自己的 Python 程序,并理解 checkpointerreset_agent、lazy agent 等关键约束。

TL;DR

DeerFlowClient 是 DeerFlow 的进程内客户端,直接 import 与 Gateway 相同的 deerflow 模块,无需 FastAPI / LangGraph Server / Gateway 进程即可运行智能体。stream() 订阅 LangGraph stream_mode=["values", "messages", "custom"] 并 yield StreamEvent,chat() 在其之上按消息 id 累积 delta 返回最终 AI 文本。所有 dict 返回值与 Gateway 的 Pydantic 响应模型保持一致,由 tests/test_client.py::TestGatewayConformance 强制校验。与 HTTP 模式相比,上传接收本地 Path 而非 UploadFile,产物 get_artifact() 返回 (bytes, mime_type) 而非 HTTP Response。内部 agent 懒创建并按配置键缓存,reset_agent() 可强制重建。

Overview

为什么需要一个与 Gateway 对等的进程内客户端?

DeerFlow 的标准运行路径是 Gateway:run_agentStreamBridgesse_consumer,通过 HTTP/SSE 对外提供服务。但有大量场景并不需要 HTTP 边界:批处理脚本、CLI 工具、数据管线、单元测试、把 DeerFlow 当作库嵌入到别的 Python 程序中。为这些场景强行拉起一个 FastAPI + Nginx + LangGraph 进程栈既笨重又引入网络/序列化开销。

DeerFlowClient 的定位是:与 Gateway 平行的、同步的、进程内的 create_agent() 工厂消费者,而不是 Gateway 的包装层。它的设计明确说明了为什么不复用 Gateway 的 run_agent:run_agentasync def 且用 agent.astream();而本客户端是同步生成器,用 agent.stream(),让调用方能直接写 for event in client.stream(...) 而不触碰 asyncio;Gateway 事件要经 serialize() 做 SSE 线序列化,而本客户端直接 yield Python 数据结构;StreamBridge 是跨 HTTP 边界的 asyncio 队列(Last-Event-ID 重放、心跳、多订阅扇出),单一进程内调用方完全不需要这些 client.py:521-551

为保证「HTTP 模式」与「嵌入模式」对调用方等价,所有 dict 返回值都对齐 Gateway 的 Pydantic 响应 schema,这样消费方代码在两种模式下行为一致 client.py:798-803

Architecture

DeerFlowClient 与 Gateway 是两条平行路径,共享同一个 create_agent() 智能体工厂与同一套配置/数据目录,但各自有独立的执行模型(同步 vs 异步)与传输层(进程内 dict vs SSE)。

  • 入口与文档:DeerFlowClient 类定义说明其无需 LangGraph Server / Gateway 进程 client.py:80-95
  • 智能体构造:_ensure_agent() 用与 make_lead_agent 相同的 create_chat_model + _get_tools + _build_middlewares + apply_prompt_template + ThreadState 装配 create_agent() client.py:230-252
  • checkpointer:未显式传入时回退到 deerflow.runtime.checkpointer.get_checkpointer(),与 Gateway 共享同一持久化后端 client.py:242-248
  • schema 一致性:StreamEvent.type 取值与 LangGraph SSE 协议对齐,使消费者可在 HTTP 流与嵌入模式间切换而不改事件处理逻辑 client.py:62-78

Components / Subsystems

chat() — 同步累积 delta

职责:对 stream() 的便捷封装,按消息 id 累积 delta messages-tuple 事件,返回最后一条完成的 AI 消息文本;中间 AI 消息(如 planner 草稿)被丢弃 client.py:762-792

关键方法:chat(message, *, thread_id=None, **kwargs)。内部用 chunks: dict[str, list[str]]id 分桶,只对 event.type == "messages-tuple"data["type"] == "ai" 且有非空 content 的事件追加,最后 "".join 末条 id 的分片——用 per-id 列表 + 一次性 join 避免长响应上 str + str 的 O(n²) 开销 client.py:781-792

stream() — 订阅 LangGraph stream_mode

职责:发送一条用户消息,增量 yield StreamEvent,直到智能体完成本轮 client.py:489-569

关键方法:stream(message, *, thread_id=None, **kwargs)。订阅 stream_mode=["values", "messages", "custom"] client.py:633-638。各模式处理:

  • custom:原样 yield 为 StreamEvent(type="custom")(来自 StreamWriter)client.py:645-647
  • messages:发出 (message_chunk, metadata),AI 文本以 delta 形式逐 token 发出,每个 delta 带稳定 id,消费者需按 id 累积才能重建全文 client.py:506-519
  • values:每个图节点完成后的完整状态快照;已通过 messages 模式发过的 AI 文本不会从快照重新合成,避免重复投递 client.py:692-758

流结束时 yield StreamEvent(type="end", data={"usage": cumulative_usage}) client.py:760

Gateway 对等方法

职责:在不启动 Gateway 的前提下提供与 Gateway API 路由器同名同形的配置/数据查询与变更能力。

关键方法:

  • 模型:list_models() 返回 {"models": [...], "token_usage": {...}},声明对齐 ModelsListResponse schema client.py:798-822;get_model(name) 对齐 ModelResponse client.py:869-889
  • MCP:get_mcp_config() / update_mcp_config() 读写 extensions_config.json,后者写入后置空 _agentreload_extensions_config(),使下次调用重建智能体 client.py:895-937
  • 技能:list_skills() / get_skill() / update_skill() / install_skill();update_skill() 同样使缓存 agent 失效 client.py:824-845 client.py:965-1028
  • 记忆:get_memory() / reload_memory() / get_memory_config() / get_memory_status() 等,通过 get_effective_user_id() 做 per-user 隔离 client.py:847-1107
  • 上传:upload_files() / list_uploads() / delete_upload(),upload_files 先全量校验避免半途上传 client.py:1113-1244
  • 产物:get_artifact(thread_id, path) 返回 (bytes, mime_type),内部经 resolve_virtual_path 防路径穿越 client.py:1250-1278

TestGatewayConformance 把每个 dict 返回值喂进对应 Gateway Pydantic 模型——若 Gateway 新增必填字段而客户端没给,ValidationError 会让 CI 立即捕获漂移 test_client.py:2302-2308

Agent 生命周期(lazy create + reset)

职责:延迟创建并按配置键缓存内部智能体,外部变更后可强制重建。

关键方法:构造函数仅加载配置,把 agent 创建推迟到首次调用(self._agent = None)client.py:160-162_ensure_agent()(model_name, thinking_enabled, is_plan_mode, subagent_enabled, agent_name, available_skills) 组成配置键,键不变则复用,键变则重建 client.py:210-251reset_agent()_agent_agent_config_key 置空,强制下次调用重建——用于记忆/技能等外部变更后刷新系统提示与工具集 client.py:164-172

Data Flow

下图展示 client.stream() 如何订阅 LangGraph 的 stream_mode 并把每个 chunk 转换为 StreamEvent yield 出去。

Implementation Details

chat() 的 per-id delta 累积

python
chunks: dict[str, list[str]] = {}
last_id: str = ""
for event in self.stream(message, thread_id=thread_id, **kwargs):
    if event.type == "messages-tuple" and event.data.get("type") == "ai":
        msg_id = event.data.get("id") or ""
        delta = event.data.get("content", "")
        if delta:
            chunks.setdefault(msg_id, []).append(delta)
            last_id = msg_id
return "".join(chunks.get(last_id, ()))

解读:stream()messages 模式下发的是 token 级 delta,同一条 AI 消息会被拆成多个同 id 的事件。chat()id 分桶收集,只 join 最后一个 id 的分片——这就是文档所说「丢弃中间 AI 消息(如 planner 草稿),只返回最终 id 的累积文本」的实现 client.py:781-792

lazy agent 的配置键复用

python
key = (
    cfg.get("model_name"),
    cfg.get("thinking_enabled"),
    cfg.get("is_plan_mode"),
    cfg.get("subagent_enabled"),
    self._agent_name,
    frozenset(self._available_skills) if self._available_skills is not None else None,
)
if self._agent is not None and self._agent_config_key == key:
    return

解读:智能体创建涉及模型实例化、工具装配、中间件链、系统提示渲染,代价不低。_ensure_agent 用这 6 个会影响 agent 行为的参数构成 key,只有 key 变化才重建——长期运行进程里多次 stream() 同配置只构建一次。系统提示(含日期、记忆、技能上下文)在首次创建时生成并随 key 缓存,故记忆/技能更新后需 reset_agent() 才能刷新 client.py:86-95 client.py:210-251

速查表

Gateway 对等方法 × 返回格式矩阵(均对齐 Gateway Pydantic 响应模型):

类别方法返回格式Source
模型list_models(){"models": [...], "token_usage": {...}}client.py:798-822
模型get_model(name){name, model, display_name, ...}Noneclient.py:869-889
MCPget_mcp_config() / update_mcp_config(){"mcp_servers": {...}}client.py:895-937
技能list_skills() / get_skill() / update_skill(){"skills": [...]} / {name, enabled, ...}client.py:824-1013
技能install_skill(path){success, skill_name, message}client.py:1015-1028
记忆get_memory() / get_memory_config() / get_memory_status()dictclient.py:847-1107
上传upload_files(thread_id, files){"success": true, "files": [...], "message": ...}client.py:1113-1210
上传list_uploads() / delete_upload(){"files": [...], "count": N} / {success, message}client.py:1212-1244
产物get_artifact(thread_id, path)(bytes, mime_type) 元组(非 HTTP Response)client.py:1250-1278
会话chat() / stream()str / Generator[StreamEvent]client.py:489-792

与 HTTP 模式的关键区别:upload_files 接收本地 Path 而非 HTTP UploadFile,并在拷贝前拒绝目录路径 client.py:1136-1145;get_artifact 返回 (bytes, mime_type) 而非 HTTP Response client.py:1250-1278

扩展指南

把 DeerFlowClient 嵌入自己的 Python 程序的最小模板:

python
from deerflow.client import DeerFlowClient
from langgraph.checkpoint.memory import InMemorySaver

# 1. 多轮对话必须显式或回退提供 checkpointer
client = DeerFlowClient(
    config_path="config.yaml",       # None 则用默认解析顺序
    checkpointer=InMemorySaver(),    # 不传则回退到 get_checkpointer()
    model_name=None,                 # 覆盖 config 默认模型
    thinking_enabled=True,
)

# 2. 一次性问答(同步)
print(client.chat("分析这篇论文", thread_id="paper-1"))

# 3. 流式消费(同步生成器,无需 asyncio)
for event in client.stream("继续", thread_id="paper-1"):
    if event.type == "messages-tuple" and event.data.get("type") == "ai":
        print(event.data.get("content", ""), end="")
    elif event.type == "end":
        print("\nusage:", event.data["usage"])

# 4. 外部改了记忆/技能后,强制重建内部 agent
client.update_skill("web-search", enabled=True)   # 自动失效缓存 agent
client.reset_agent()                              # 其他外部变更后手动刷新

约束:

  • 多轮对话必须有 checkpointer。没有 checkpointer 时每次 chat()/stream() 都是无状态的,thread_id 仅用于文件隔离(uploads/artifacts),不保留对话上下文 client.py:86-89。未显式传入时会回退到 deerflow.runtime.checkpointer.get_checkpointer() client.py:242-248
  • 系统提示在 agent 首建时生成并缓存到配置键变化。日期、记忆、技能上下文不会随后续调用自动刷新;长期运行进程在外部状态变更后须调 reset_agent() client.py:90-95
  • update_mcp_config()update_skill() 写配置后会自动置空缓存 agent,无需手动 reset_agent() client.py:934-937 client.py:1000-1002
  • agent_name 须匹配 AGENT_NAME_PATTERN,否则构造函数抛 ValueError client.py:148-149

Common Pitfalls / Tips

  • 不要把 messages 模式的 delta 当完整文本。AI 文本是逐 token 的 delta,每个事件带稳定 id,消费者必须按 id 累积才能重建全文;chat() 已替你做了这件事,需要每个 delta 时才直接用 stream() client.py:506-519
  • values 快照不会重复合成已流式的 AI 文本。已通过 messages 模式发过的 id 会被记入 streamed_ids,values 路径只(防御性地)采集其 usage 并跳过重新合成事件,避免消费者看到重复文本 client.py:702-715
  • usage 只按 id 计一次。同一消息 id 在 messages 终块和 values 快照里携带相同累计 usage_metadata,_account_usagecounted_usage_ids 确保只在先到者上计入 cumulative,end 事件的 usage 不会翻倍 client.py:586-617
  • values 内重复消息按 id 去重seen_ids 保证同一 id 的消息不会被处理两次 client.py:695-700
  • 两条路径必须在订阅哪些 stream_mode 上保持一致。该不变量由 tests/test_client.py::test_messages_mode_emits_token_deltas 强制(而非共享常量,因 Graph/SDK/HTTP 三层命名不同无法共享字符串)test_client.py:436-438

References

页面关系
15-Runtime运行时与StreamBridge.mdGateway 的异步流式管线 run_agentStreamBridge,与本章嵌入路径平行对照
13-Gateway-API与路由体系.md本章对等方法所对齐的 Gateway 路由器与 Pydantic 响应模型来源
29-AI消息流与流式渲染.md前端如何消费 messages-tuple/values 事件,与本章 StreamEvent 协议一致
36-测试策略与质量保障.mdTestGatewayConformance 与 token delta 回归测试在整体测试策略中的位置

公司内部参考 · 由 claude-wiki-gen 基于源码自动生成的二次分析