主题
嵌入式 Python 客户端
本章目标:
- 理解为什么 DeerFlow 需要一个与 Gateway API 对等、但无需启动任何 HTTP 进程的进程内客户端
DeerFlowClient。- 掌握
chat()(同步累积 delta)与stream()(订阅 LangGraphstream_mode)的工作机制,以及两者与 Gateway 流式管线的关系。- 学会用
DeerFlowClient把 DeerFlow 智能体嵌入自己的 Python 程序,并理解checkpointer、reset_agent、lazy agent 等关键约束。
TL;DR
DeerFlowClient 是 DeerFlow 的进程内客户端,直接 import 与 Gateway 相同的 deerflow 模块,无需 FastAPI / LangGraph Server / Gateway 进程即可运行智能体。stream() 订阅 LangGraph stream_mode=["values", "messages", "custom"] 并 yield StreamEvent,chat() 在其之上按消息 id 累积 delta 返回最终 AI 文本。所有 dict 返回值与 Gateway 的 Pydantic 响应模型保持一致,由 tests/test_client.py::TestGatewayConformance 强制校验。与 HTTP 模式相比,上传接收本地 Path 而非 UploadFile,产物 get_artifact() 返回 (bytes, mime_type) 而非 HTTP Response。内部 agent 懒创建并按配置键缓存,reset_agent() 可强制重建。
Overview
为什么需要一个与 Gateway 对等的进程内客户端?
DeerFlow 的标准运行路径是 Gateway:run_agent → StreamBridge → sse_consumer,通过 HTTP/SSE 对外提供服务。但有大量场景并不需要 HTTP 边界:批处理脚本、CLI 工具、数据管线、单元测试、把 DeerFlow 当作库嵌入到别的 Python 程序中。为这些场景强行拉起一个 FastAPI + Nginx + LangGraph 进程栈既笨重又引入网络/序列化开销。
DeerFlowClient 的定位是:与 Gateway 平行的、同步的、进程内的 create_agent() 工厂消费者,而不是 Gateway 的包装层。它的设计明确说明了为什么不复用 Gateway 的 run_agent:run_agent 是 async def 且用 agent.astream();而本客户端是同步生成器,用 agent.stream(),让调用方能直接写 for event in client.stream(...) 而不触碰 asyncio;Gateway 事件要经 serialize() 做 SSE 线序列化,而本客户端直接 yield Python 数据结构;StreamBridge 是跨 HTTP 边界的 asyncio 队列(Last-Event-ID 重放、心跳、多订阅扇出),单一进程内调用方完全不需要这些 client.py:521-551。
为保证「HTTP 模式」与「嵌入模式」对调用方等价,所有 dict 返回值都对齐 Gateway 的 Pydantic 响应 schema,这样消费方代码在两种模式下行为一致 client.py:798-803。
Architecture
DeerFlowClient 与 Gateway 是两条平行路径,共享同一个 create_agent() 智能体工厂与同一套配置/数据目录,但各自有独立的执行模型(同步 vs 异步)与传输层(进程内 dict vs SSE)。
- 入口与文档:
DeerFlowClient类定义说明其无需 LangGraph Server / Gateway 进程 client.py:80-95。 - 智能体构造:
_ensure_agent()用与make_lead_agent相同的create_chat_model+_get_tools+_build_middlewares+apply_prompt_template+ThreadState装配create_agent()client.py:230-252。 - checkpointer:未显式传入时回退到
deerflow.runtime.checkpointer.get_checkpointer(),与 Gateway 共享同一持久化后端 client.py:242-248。 - schema 一致性:
StreamEvent.type取值与 LangGraph SSE 协议对齐,使消费者可在 HTTP 流与嵌入模式间切换而不改事件处理逻辑 client.py:62-78。
Components / Subsystems
chat() — 同步累积 delta
职责:对 stream() 的便捷封装,按消息 id 累积 delta messages-tuple 事件,返回最后一条完成的 AI 消息文本;中间 AI 消息(如 planner 草稿)被丢弃 client.py:762-792。
关键方法:chat(message, *, thread_id=None, **kwargs)。内部用 chunks: dict[str, list[str]] 按 id 分桶,只对 event.type == "messages-tuple" 且 data["type"] == "ai" 且有非空 content 的事件追加,最后 "".join 末条 id 的分片——用 per-id 列表 + 一次性 join 避免长响应上 str + str 的 O(n²) 开销 client.py:781-792。
stream() — 订阅 LangGraph stream_mode
职责:发送一条用户消息,增量 yield StreamEvent,直到智能体完成本轮 client.py:489-569。
关键方法:stream(message, *, thread_id=None, **kwargs)。订阅 stream_mode=["values", "messages", "custom"] client.py:633-638。各模式处理:
custom:原样 yield 为StreamEvent(type="custom")(来自StreamWriter)client.py:645-647。messages:发出(message_chunk, metadata),AI 文本以 delta 形式逐 token 发出,每个 delta 带稳定id,消费者需按id累积才能重建全文 client.py:506-519。values:每个图节点完成后的完整状态快照;已通过messages模式发过的 AI 文本不会从快照重新合成,避免重复投递 client.py:692-758。
流结束时 yield StreamEvent(type="end", data={"usage": cumulative_usage}) client.py:760。
Gateway 对等方法
职责:在不启动 Gateway 的前提下提供与 Gateway API 路由器同名同形的配置/数据查询与变更能力。
关键方法:
- 模型:
list_models()返回{"models": [...], "token_usage": {...}},声明对齐ModelsListResponseschema client.py:798-822;get_model(name)对齐ModelResponseclient.py:869-889。 - MCP:
get_mcp_config()/update_mcp_config()读写extensions_config.json,后者写入后置空_agent并reload_extensions_config(),使下次调用重建智能体 client.py:895-937。 - 技能:
list_skills()/get_skill()/update_skill()/install_skill();update_skill()同样使缓存 agent 失效 client.py:824-845 client.py:965-1028。 - 记忆:
get_memory()/reload_memory()/get_memory_config()/get_memory_status()等,通过get_effective_user_id()做 per-user 隔离 client.py:847-1107。 - 上传:
upload_files()/list_uploads()/delete_upload(),upload_files先全量校验避免半途上传 client.py:1113-1244。 - 产物:
get_artifact(thread_id, path)返回(bytes, mime_type),内部经resolve_virtual_path防路径穿越 client.py:1250-1278。
TestGatewayConformance 把每个 dict 返回值喂进对应 Gateway Pydantic 模型——若 Gateway 新增必填字段而客户端没给,ValidationError 会让 CI 立即捕获漂移 test_client.py:2302-2308。
Agent 生命周期(lazy create + reset)
职责:延迟创建并按配置键缓存内部智能体,外部变更后可强制重建。
关键方法:构造函数仅加载配置,把 agent 创建推迟到首次调用(self._agent = None)client.py:160-162。_ensure_agent() 用 (model_name, thinking_enabled, is_plan_mode, subagent_enabled, agent_name, available_skills) 组成配置键,键不变则复用,键变则重建 client.py:210-251。reset_agent() 把 _agent 和 _agent_config_key 置空,强制下次调用重建——用于记忆/技能等外部变更后刷新系统提示与工具集 client.py:164-172。
Data Flow
下图展示 client.stream() 如何订阅 LangGraph 的 stream_mode 并把每个 chunk 转换为 StreamEvent yield 出去。
Implementation Details
chat() 的 per-id delta 累积
python
chunks: dict[str, list[str]] = {}
last_id: str = ""
for event in self.stream(message, thread_id=thread_id, **kwargs):
if event.type == "messages-tuple" and event.data.get("type") == "ai":
msg_id = event.data.get("id") or ""
delta = event.data.get("content", "")
if delta:
chunks.setdefault(msg_id, []).append(delta)
last_id = msg_id
return "".join(chunks.get(last_id, ()))解读:stream() 在 messages 模式下发的是 token 级 delta,同一条 AI 消息会被拆成多个同 id 的事件。chat() 按 id 分桶收集,只 join 最后一个 id 的分片——这就是文档所说「丢弃中间 AI 消息(如 planner 草稿),只返回最终 id 的累积文本」的实现 client.py:781-792。
lazy agent 的配置键复用
python
key = (
cfg.get("model_name"),
cfg.get("thinking_enabled"),
cfg.get("is_plan_mode"),
cfg.get("subagent_enabled"),
self._agent_name,
frozenset(self._available_skills) if self._available_skills is not None else None,
)
if self._agent is not None and self._agent_config_key == key:
return解读:智能体创建涉及模型实例化、工具装配、中间件链、系统提示渲染,代价不低。_ensure_agent 用这 6 个会影响 agent 行为的参数构成 key,只有 key 变化才重建——长期运行进程里多次 stream() 同配置只构建一次。系统提示(含日期、记忆、技能上下文)在首次创建时生成并随 key 缓存,故记忆/技能更新后需 reset_agent() 才能刷新 client.py:86-95 client.py:210-251。
速查表
Gateway 对等方法 × 返回格式矩阵(均对齐 Gateway Pydantic 响应模型):
| 类别 | 方法 | 返回格式 | Source |
|---|---|---|---|
| 模型 | list_models() | {"models": [...], "token_usage": {...}} | client.py:798-822 |
| 模型 | get_model(name) | {name, model, display_name, ...} 或 None | client.py:869-889 |
| MCP | get_mcp_config() / update_mcp_config() | {"mcp_servers": {...}} | client.py:895-937 |
| 技能 | list_skills() / get_skill() / update_skill() | {"skills": [...]} / {name, enabled, ...} | client.py:824-1013 |
| 技能 | install_skill(path) | {success, skill_name, message} | client.py:1015-1028 |
| 记忆 | get_memory() / get_memory_config() / get_memory_status() | dict | client.py:847-1107 |
| 上传 | upload_files(thread_id, files) | {"success": true, "files": [...], "message": ...} | client.py:1113-1210 |
| 上传 | list_uploads() / delete_upload() | {"files": [...], "count": N} / {success, message} | client.py:1212-1244 |
| 产物 | get_artifact(thread_id, path) | (bytes, mime_type) 元组(非 HTTP Response) | client.py:1250-1278 |
| 会话 | chat() / stream() | str / Generator[StreamEvent] | client.py:489-792 |
与 HTTP 模式的关键区别:upload_files 接收本地 Path 而非 HTTP UploadFile,并在拷贝前拒绝目录路径 client.py:1136-1145;get_artifact 返回 (bytes, mime_type) 而非 HTTP Response client.py:1250-1278。
扩展指南
把 DeerFlowClient 嵌入自己的 Python 程序的最小模板:
python
from deerflow.client import DeerFlowClient
from langgraph.checkpoint.memory import InMemorySaver
# 1. 多轮对话必须显式或回退提供 checkpointer
client = DeerFlowClient(
config_path="config.yaml", # None 则用默认解析顺序
checkpointer=InMemorySaver(), # 不传则回退到 get_checkpointer()
model_name=None, # 覆盖 config 默认模型
thinking_enabled=True,
)
# 2. 一次性问答(同步)
print(client.chat("分析这篇论文", thread_id="paper-1"))
# 3. 流式消费(同步生成器,无需 asyncio)
for event in client.stream("继续", thread_id="paper-1"):
if event.type == "messages-tuple" and event.data.get("type") == "ai":
print(event.data.get("content", ""), end="")
elif event.type == "end":
print("\nusage:", event.data["usage"])
# 4. 外部改了记忆/技能后,强制重建内部 agent
client.update_skill("web-search", enabled=True) # 自动失效缓存 agent
client.reset_agent() # 其他外部变更后手动刷新约束:
- 多轮对话必须有
checkpointer。没有 checkpointer 时每次chat()/stream()都是无状态的,thread_id仅用于文件隔离(uploads/artifacts),不保留对话上下文 client.py:86-89。未显式传入时会回退到deerflow.runtime.checkpointer.get_checkpointer()client.py:242-248。 - 系统提示在 agent 首建时生成并缓存到配置键变化。日期、记忆、技能上下文不会随后续调用自动刷新;长期运行进程在外部状态变更后须调
reset_agent()client.py:90-95。 update_mcp_config()与update_skill()写配置后会自动置空缓存 agent,无需手动reset_agent()client.py:934-937 client.py:1000-1002。agent_name须匹配AGENT_NAME_PATTERN,否则构造函数抛ValueErrorclient.py:148-149。
Common Pitfalls / Tips
- 不要把
messages模式的 delta 当完整文本。AI 文本是逐 token 的 delta,每个事件带稳定id,消费者必须按id累积才能重建全文;chat()已替你做了这件事,需要每个 delta 时才直接用stream()client.py:506-519。 values快照不会重复合成已流式的 AI 文本。已通过messages模式发过的 id 会被记入streamed_ids,values路径只(防御性地)采集其 usage 并跳过重新合成事件,避免消费者看到重复文本 client.py:702-715。- usage 只按 id 计一次。同一消息 id 在
messages终块和values快照里携带相同累计usage_metadata,_account_usage用counted_usage_ids确保只在先到者上计入 cumulative,end 事件的usage不会翻倍 client.py:586-617。 - values 内重复消息按 id 去重。
seen_ids保证同一 id 的消息不会被处理两次 client.py:695-700。 - 两条路径必须在订阅哪些 stream_mode 上保持一致。该不变量由
tests/test_client.py::test_messages_mode_emits_token_deltas强制(而非共享常量,因 Graph/SDK/HTTP 三层命名不同无法共享字符串)test_client.py:436-438。
References
- backend/packages/harness/deerflow/client.py —
DeerFlowClient全部实现 - backend/tests/test_client.py —
TestGatewayConformance、TestStream、token delta 回归 - backend/docs/STREAMING.md — Gateway 与 DeerFlowClient 平行路径设计
- backend/CLAUDE.md — Embedded Client 架构与 Gateway 对等方法表
- backend/packages/harness/deerflow/agents/lead_agent/agent.py —
_build_middlewares,与 client 共享的智能体装配
Related Pages
| 页面 | 关系 |
|---|---|
| 15-Runtime运行时与StreamBridge.md | Gateway 的异步流式管线 run_agent → StreamBridge,与本章嵌入路径平行对照 |
| 13-Gateway-API与路由体系.md | 本章对等方法所对齐的 Gateway 路由器与 Pydantic 响应模型来源 |
| 29-AI消息流与流式渲染.md | 前端如何消费 messages-tuple/values 事件,与本章 StreamEvent 协议一致 |
| 36-测试策略与质量保障.md | TestGatewayConformance 与 token delta 回归测试在整体测试策略中的位置 |