主题
Runtime 运行时与 StreamBridge
本章目标:
- 讲清
runtime/如何把一次 LangGraph 图执行包装成可后台运行、可中断、可重连的「Run」,以及RunManager+run_agent+StreamBridge三者的分工。- 拆解 StreamBridge 把
graph.astream(...)的流式输出桥接到 HTTP SSE 的机制,包括stream_mode(values / messages / custom / end)的语义与序列化路径。- 说明 checkpointer、journal、RunStore 三条持久化链路,以及 Gateway 重启后历史 Run 如何从持久化存储恢复给前端。
TL;DR
runtime/ 把每次对话执行抽象为一个 Run:RunManager 维护内存里的 RunRecord 注册表并把可序列化元数据落到 RunStore;run_agent() 在一个 asyncio.Task 里驱动 agent.astream(),把每个 chunk 经 serialize() 后 publish 到 StreamBridge;HTTP 层的 sse_consumer 反过来 subscribe 同一个 run_id,把事件转成 SSE 帧推给浏览器。StreamBridge 用每 Run 一份的内存事件日志解耦生产者与消费者,支持心跳、Last-Event-ID 重连与晚到订阅者补播。Gateway 重启会丢失内存 RunRecord,但 RunManager.aget() / list_by_thread() 会回退查 RunStore,journal 写入的消息/token 也留在 RunEventStore,因此历史 Run 仍可读。
Overview
DeerFlow 的 Agent 本质是一张 LangGraph 图,执行入口是 agent.astream(...) 这个异步生成器。但浏览器需要的是一条 HTTP 长连接上的 SSE(Server-Sent Events)流,而且现实场景要求:用户刷新页面后能重新「join」正在跑的 Run、点停止按钮能中断、一次提交可以后台跑完即使没人在听、网络抖动后用 Last-Event-ID 续传。直接把 astream 的协程绑死在一个 HTTP 请求的生命周期上无法满足这些需求——请求一断,生成器就被取消,Run 也就死了。
runtime/ 的设计因此采用生产者/消费者解耦,对齐 LangGraph Platform 的 Queue + StreamManager 架构 stream_bridge/base.py:1-6:
- 生产者:
run_agent()在独立asyncio.Task里跑图,不依附任何 HTTP 请求,把事件publish到StreamBridge。 - 缓冲层:
StreamBridge为每个run_id维护一份带时间窗的事件日志,任意数量的消费者可以在任意时刻订阅,晚到的也能补播。 - 消费者:每个 HTTP SSE 端点
subscribe一次,把事件翻译成 SSE 帧;消费者断开不影响生产者。
WHY 这样分层:把「跑图」和「推流」拆开,Run 的生命周期就由 RunManager + StreamBridge 共同管理,而不是被某个 HTTP 请求绑架。这让 POST /runs(后台跑)、POST /runs/stream(跑+推)、GET /runs/{id}/join(中途加入)三种 API 形态共用同一套底层机制 services.py:332-353。
Architecture
Source 列表(本章核实过行号的文件):
| 文件 | 角色 |
|---|---|
runtime/__init__.py | 运行时公共 API 再导出 |
runtime/runs/manager.py | RunManager + RunRecord,内存注册表 + RunStore 回退 |
runtime/runs/worker.py | run_agent() 后台执行 + RunContext + 序列化分发 |
runtime/runs/schemas.py | RunStatus / DisconnectMode 枚举 |
runtime/stream_bridge/base.py | StreamBridge 抽象 + StreamEvent + 哨兵 |
runtime/stream_bridge/memory.py | MemoryStreamBridge 内存事件日志实现 |
runtime/stream_bridge/async_provider.py | make_stream_bridge 异步上下文工厂 |
runtime/checkpointer/async_provider.py | make_checkpointer 异步检查点工厂 |
runtime/journal.py | RunJournal LangChain 回调,捕获事件/token |
runtime/events/__init__.py | RunEventStore 再导出 |
runtime/serialization.py | serialize() LangChain 对象→JSON |
runtime/converters.py | LangChain→OpenAI 消息格式转换 |
runtime/user_context.py | 请求级 user_id ContextVar |
app/gateway/services.py | Run 创建 + sse_consumer |
app/gateway/routers/thread_runs.py | Run REST + join SSE 路由 |
Components / Subsystems
RunManager —— Run 注册表与生命周期
职责:在内存里维护 dict[run_id, RunRecord],所有改动用 asyncio.Lock 串行化;当注入了 RunStore 时,把可序列化元数据同步落库,使历史 Run 跨进程重启可读 manager.py:42-53。
关键类:RunRecord(可变记录,含 task / abort_event / abort_action / model_name)manager.py:21-39。
create_or_reject()在同一把锁内原子地完成「检查在飞 Run + 创建新 Run」,消除了分离has_inflight+create的 TOCTOU 竞态;reject策略下若线程已有 pending/running 则抛ConflictError,interrupt/rollback则先取消在飞 Run manager.py:224-290。cancel()把abort_event.set()并对task.cancel(),action="interrupt"保留检查点,action="rollback"触发回滚 manager.py:199-222。aget()与list_by_thread()在内存查不到时回退到RunStore,并把 store dict 转回只读RunRecord(on_disconnect固定为cancel,无task)manager.py:117-167。
worker / run_agent —— 后台执行引擎
职责:在 asyncio.Task 内驱动 agent.astream(stream_mode=[...]),逐 chunk 序列化后 publish 到 bridge;管理状态机迁移、journal、回滚、finally 收尾 worker.py:1-14。
关键类:RunContext(冻结 dataclass,聚合 checkpointer / store / event_store / thread_store / app_config,避免 run_agent 参数膨胀)worker.py:71-86。
- 启动时若有 checkpointer,先
aget_tuple()深拷贝一份 pre-run 快照,供rollback恢复 worker.py:181-197。 - 手动注入
runtime.context(thread_id/run_id/app_config)到config['configurable']['__pregel_runtime'],因为不走官方context=参数 worker.py:213-220。 - 每个循环迭代检查
record.abort_event.is_set(),被置位则停止流式 worker.py:286-302。
StreamBridge —— SSE 桥接缓冲
职责:抽象出 publish / publish_end / subscribe / cleanup 四个原语解耦生产者与消费者;StreamEvent.id 单调递增用作 SSE id: 字段支撑 Last-Event-ID 重连 base.py:16-69。
关键类:MemoryStreamBridge,每 run_id 一份 _RunStream(events 列表 + asyncio.Condition + ended 标志 + start_offset)memory.py:17-35。
publish追加事件,超过queue_maxsize(默认 256)时丢弃最旧的并抬高start_offset,然后notify_all唤醒订阅者 memory.py:68-77。subscribe是异步生成器:无新事件时wait_for(condition.wait(), timeout=heartbeat_interval),超时则吐HEARTBEAT_SENTINEL,生产者结束后吐END_SENTINEL;落后于保留窗时从start_offset续播并告警 memory.py:85-123。make_stream_bridge是异步上下文管理器,无配置或type=="memory"时回退MemoryStreamBridge;redis标记为未实现 async_provider.py:28-55。
checkpointer —— 图状态持久化
职责:make_checkpointer() 异步上下文管理器为 caller 生命周期产出一个 LangGraph Checkpointer,优先级为「legacy checkpointer: 段 > 统一 database: 段 > 默认 InMemorySaver」;支持 memory / sqlite / postgres 三种后端,sqlite/postgres 各调 saver.setup() async_provider.py:125-160。run_agent 在 finally 还会从检查点读 title 同步到 threads_meta.display_name worker.py:381-392。
journal —— 事件与 token 捕获
职责:RunJournal 是 BaseCallbackHandler,作为 LangChain 回调注入到 config['callbacks'],把回调数据标准化成 RunEvent 写入 RunEventStore,并在内存累加 token journal.py:1-16。
- 只在
on_llm_end记完整消息,不实现on_llm_new_token;on_chat_model_start提取首条 human 消息(比on_chain_start可靠,内容未被检查点裁剪)journal.py:172-217。 - token 按 LangChain
run_id去重累加,按 caller(lead_agent/subagent:/middleware:)分桶;record_external_llm_usage_records按source_run_id去重接收子代理外部用量 journal.py:277-298 journal.py:405-446。 - 写缓冲达
flush_threshold(20)时_flush_sync,无事件循环则保留到 workerfinally里的flush()一次性落库 journal.py:347-372。
Data Flow
Gateway 重启后历史 Run 恢复:进程重启会清空 RunManager._runs 内存字典。GET /threads/{tid}/runs 调 list_by_thread(),内存没命中就回退查 RunStore,把 store dict 还原成只读 RunRecord 返给前端 thread_runs.py:180-194。Run 详情走 aget() 同理 manager.py:117-129。消息与 token 由 journal 早已写入 RunEventStore,因此重启后历史对话仍完整可读;但 StreamBridge 是纯内存,重启前未结束 Run 的实时流无法再 join,因为内存事件日志和 asyncio.Task 都已丢失。
Implementation Details
run_agent 把请求的 stream_mode 列表翻译成 LangGraph 合法模式后,单模式与多模式走两条 astream 路径,核心序列化分发如下 worker.py:283-309:
python
if len(lg_modes) == 1 and not stream_subgraphs:
single_mode = lg_modes[0]
async for chunk in agent.astream(graph_input, config=runnable_config, stream_mode=single_mode):
if record.abort_event.is_set():
break
sse_event = _lg_mode_to_sse_event(single_mode)
await bridge.publish(run_id, sse_event, serialize(chunk, mode=single_mode))
else:
async for item in agent.astream(graph_input, config=runnable_config,
stream_mode=lg_modes, subgraphs=stream_subgraphs):
if record.abort_event.is_set():
break
mode, chunk = _unpack_stream_item(item, lg_modes, stream_subgraphs)
if mode is None:
continue
sse_event = _lg_mode_to_sse_event(mode)
await bridge.publish(run_id, sse_event, serialize(chunk, mode=mode))解读:单模式时 astream 直接吐裸 chunk;多模式或子图时吐 (mode, chunk) 元组,由 _unpack_stream_item 解包(子图额外带 namespace 前缀)。serialize(chunk, mode=mode) 按模式分派——messages 模式把 (chunk, metadata) 元组拆开各自序列化,values 模式剥掉 __pregel_* / __interrupt__ 内部键以对齐 LangGraph Platform 输出,其余走递归 model_dump() 兜底 serialization.py:67-78。注意 events 模式被显式跳过,因为它需要 astream_events(),无法与 values 快照同时产出(Python 公共 API 限制)worker.py:9-13。
消费端 sse_consumer 把哨兵翻译成 SSE,并在 finally 实现 on_disconnect 语义 services.py:368-388:
python
last_event_id = request.headers.get("Last-Event-ID")
async for entry in bridge.subscribe(record.run_id, last_event_id=last_event_id):
if await request.is_disconnected():
break
if entry is HEARTBEAT_SENTINEL:
yield ": heartbeat\n\n"; continue
if entry is END_SENTINEL:
yield format_sse("end", None, event_id=entry.id or None); return
yield format_sse(entry.event, entry.data, event_id=entry.id or None)
finally:
if record.status in (RunStatus.pending, RunStatus.running):
if record.on_disconnect == DisconnectMode.cancel:
await run_mgr.cancel(record.run_id)速查表
run_agent 把客户端请求的 stream_mode 归一化(messages-tuple → LangGraph messages),无有效模式回退 values worker.py:256-278:
| stream_mode | astream 产出 | SSE event | 序列化处理 | Source |
|---|---|---|---|---|
values | 完整 state 快照 dict | values | serialize_channel_values,剥 __pregel_* / __interrupt__ | serialization.py:45-56 |
messages / messages-tuple | (message_chunk, metadata) | messages | serialize_messages_tuple,拆元组分别序列化(AI 文本为增量) | serialization.py:59-64 |
custom | StreamWriter 自定义负载 | custom | 递归 model_dump() 兜底 | serialization.py:67-78 |
updates / checkpoints / tasks / debug | LangGraph 原生结构 | 同名 | 递归兜底 | worker.py:41 |
events | —(不支持) | — | 显式跳过,需 astream_events 无法与 values 共存 | worker.py:154-159 |
end(哨兵) | publish_end 后 | end | 无 data,流终止符 | services.py:378-380 |
Run 状态机:
Common Pitfalls / Tips
- per-
run_id事件 ID 去重 / 续传:MemoryStreamBridge用{ts}-{seq}(seq从 0 起的每 run 计数器)做单调 ID;Last-Event-ID在保留缓冲里找不到时从最早保留事件重播并告警,而非报错 memory.py:45-64。客户端需自行按消息id拼接增量并对重复事件去重。 - buffer 溢出会丢早期事件:超过
queue_maxsize(默认 256)后最旧事件被删,晚到订阅者只能看到保留窗内的;调高stream_bridge.queue_maxsize可扩大窗口 memory.py:73-76。 - cleanup 有 60s 延迟:worker
finally里asyncio.create_task(bridge.cleanup(run_id, delay=60)),给晚到 / 重连的 join 留 60 秒补播窗口;此后run_id的内存事件被清,再 join 拿不到历史流 worker.py:402-403。 - journal token 双计防护:LangChain 可能对同一响应多次触发
on_llm_end,journal 用_counted_llm_run_ids集合按 LangChainrun_id去重;子代理外部用量用_counted_external_source_ids按source_run_id去重 journal.py:283-298。 get()与aget()别混用:内部需要 livetask/abort_event(如 cancel、join)必须用同步get()(只查内存);只读展示用异步aget()(带 RunStore 回退,但拿不到活 task)manager.py:113-129。join_run/cancel路由用的就是同步get()thread_runs.py:217-244。- rollback 依赖 pre-run 快照:若启动时检查点快照捕获失败(
snapshot_capture_failed),rollback 会被跳过仅记告警,线程状态不会被还原 worker.py:436-438。
References
backend/packages/harness/deerflow/runtime/runs/manager.py— RunManager / RunRecord / 状态机 / RunStore 回退backend/packages/harness/deerflow/runtime/runs/worker.py— run_agent / RunContext / 序列化分发 / rollbackbackend/packages/harness/deerflow/runtime/stream_bridge/memory.py— MemoryStreamBridge 事件日志 / 心跳 / 重连backend/packages/harness/deerflow/runtime/stream_bridge/base.py— StreamBridge 抽象 / StreamEvent / 哨兵backend/packages/harness/deerflow/runtime/checkpointer/async_provider.py— make_checkpointer 多后端工厂backend/packages/harness/deerflow/runtime/journal.py— RunJournal 回调 / token 去重 / flushbackend/packages/harness/deerflow/runtime/serialization.py— serialize 按 stream_mode 分派backend/app/gateway/services.py— Run 创建 / sse_consumer / on_disconnectbackend/app/gateway/routers/thread_runs.py— Run REST / join SSE / 历史恢复读路径
Related Pages
| 页面 | 关系 |
|---|---|
| 13-Gateway-API与路由体系.md | 上游:/api/langgraph/* 路由如何调用 RunManager + sse_consumer 暴露 Run API |
| 16-持久化与存储层.md | 下游:RunStore / RunEventStore / Checkpointer 的存储后端实现与持久化细节 |
| 11-ThreadState与状态管理.md | 相关:values 模式序列化的 ThreadState 结构与 reducer |
| 29-AI消息流与流式渲染.md | 相关:前端如何消费 SSE、按消息 id 拼接 messages 增量与去重 |
| 32-嵌入式Python客户端.md | 相关:DeerFlowClient.stream() 走相同 stream_mode 语义的进程内并行路径 |