Skip to content

Runtime 运行时与 StreamBridge

本章目标:

  • 讲清 runtime/ 如何把一次 LangGraph 图执行包装成可后台运行、可中断、可重连的「Run」,以及 RunManager + run_agent + StreamBridge 三者的分工。
  • 拆解 StreamBridge 把 graph.astream(...) 的流式输出桥接到 HTTP SSE 的机制,包括 stream_mode(values / messages / custom / end)的语义与序列化路径。
  • 说明 checkpointer、journal、RunStore 三条持久化链路,以及 Gateway 重启后历史 Run 如何从持久化存储恢复给前端。

TL;DR

runtime/ 把每次对话执行抽象为一个 Run:RunManager 维护内存里的 RunRecord 注册表并把可序列化元数据落到 RunStore;run_agent() 在一个 asyncio.Task 里驱动 agent.astream(),把每个 chunk 经 serialize()publishStreamBridge;HTTP 层的 sse_consumer 反过来 subscribe 同一个 run_id,把事件转成 SSE 帧推给浏览器。StreamBridge 用每 Run 一份的内存事件日志解耦生产者与消费者,支持心跳、Last-Event-ID 重连与晚到订阅者补播。Gateway 重启会丢失内存 RunRecord,但 RunManager.aget() / list_by_thread() 会回退查 RunStore,journal 写入的消息/token 也留在 RunEventStore,因此历史 Run 仍可读。

Overview

DeerFlow 的 Agent 本质是一张 LangGraph 图,执行入口是 agent.astream(...) 这个异步生成器。但浏览器需要的是一条 HTTP 长连接上的 SSE(Server-Sent Events)流,而且现实场景要求:用户刷新页面后能重新「join」正在跑的 Run、点停止按钮能中断、一次提交可以后台跑完即使没人在听、网络抖动后用 Last-Event-ID 续传。直接把 astream 的协程绑死在一个 HTTP 请求的生命周期上无法满足这些需求——请求一断,生成器就被取消,Run 也就死了。

runtime/ 的设计因此采用生产者/消费者解耦,对齐 LangGraph Platform 的 Queue + StreamManager 架构 stream_bridge/base.py:1-6:

  • 生产者:run_agent() 在独立 asyncio.Task 里跑图,不依附任何 HTTP 请求,把事件 publishStreamBridge
  • 缓冲层:StreamBridge 为每个 run_id 维护一份带时间窗的事件日志,任意数量的消费者可以在任意时刻订阅,晚到的也能补播。
  • 消费者:每个 HTTP SSE 端点 subscribe 一次,把事件翻译成 SSE 帧;消费者断开不影响生产者。

WHY 这样分层:把「跑图」和「推流」拆开,Run 的生命周期就由 RunManager + StreamBridge 共同管理,而不是被某个 HTTP 请求绑架。这让 POST /runs(后台跑)、POST /runs/stream(跑+推)、GET /runs/{id}/join(中途加入)三种 API 形态共用同一套底层机制 services.py:332-353

Architecture

Source 列表(本章核实过行号的文件):

文件角色
runtime/__init__.py运行时公共 API 再导出
runtime/runs/manager.pyRunManager + RunRecord,内存注册表 + RunStore 回退
runtime/runs/worker.pyrun_agent() 后台执行 + RunContext + 序列化分发
runtime/runs/schemas.pyRunStatus / DisconnectMode 枚举
runtime/stream_bridge/base.pyStreamBridge 抽象 + StreamEvent + 哨兵
runtime/stream_bridge/memory.pyMemoryStreamBridge 内存事件日志实现
runtime/stream_bridge/async_provider.pymake_stream_bridge 异步上下文工厂
runtime/checkpointer/async_provider.pymake_checkpointer 异步检查点工厂
runtime/journal.pyRunJournal LangChain 回调,捕获事件/token
runtime/events/__init__.pyRunEventStore 再导出
runtime/serialization.pyserialize() LangChain 对象→JSON
runtime/converters.pyLangChain→OpenAI 消息格式转换
runtime/user_context.py请求级 user_id ContextVar
app/gateway/services.pyRun 创建 + sse_consumer
app/gateway/routers/thread_runs.pyRun REST + join SSE 路由

Components / Subsystems

RunManager —— Run 注册表与生命周期

职责:在内存里维护 dict[run_id, RunRecord],所有改动用 asyncio.Lock 串行化;当注入了 RunStore 时,把可序列化元数据同步落库,使历史 Run 跨进程重启可读 manager.py:42-53

关键类:RunRecord(可变记录,含 task / abort_event / abort_action / model_name)manager.py:21-39

  • create_or_reject() 在同一把锁内原子地完成「检查在飞 Run + 创建新 Run」,消除了分离 has_inflight + create 的 TOCTOU 竞态;reject 策略下若线程已有 pending/running 则抛 ConflictError,interrupt/rollback 则先取消在飞 Run manager.py:224-290
  • cancel()abort_event.set() 并对 task.cancel(),action="interrupt" 保留检查点,action="rollback" 触发回滚 manager.py:199-222
  • aget()list_by_thread() 在内存查不到时回退到 RunStore,并把 store dict 转回只读 RunRecord(on_disconnect 固定为 cancel,无 task)manager.py:117-167

worker / run_agent —— 后台执行引擎

职责:在 asyncio.Task 内驱动 agent.astream(stream_mode=[...]),逐 chunk 序列化后 publish 到 bridge;管理状态机迁移、journal、回滚、finally 收尾 worker.py:1-14

关键类:RunContext(冻结 dataclass,聚合 checkpointer / store / event_store / thread_store / app_config,避免 run_agent 参数膨胀)worker.py:71-86

  • 启动时若有 checkpointer,先 aget_tuple() 深拷贝一份 pre-run 快照,供 rollback 恢复 worker.py:181-197
  • 手动注入 runtime.context(thread_id / run_id / app_config)到 config['configurable']['__pregel_runtime'],因为不走官方 context= 参数 worker.py:213-220
  • 每个循环迭代检查 record.abort_event.is_set(),被置位则停止流式 worker.py:286-302

StreamBridge —— SSE 桥接缓冲

职责:抽象出 publish / publish_end / subscribe / cleanup 四个原语解耦生产者与消费者;StreamEvent.id 单调递增用作 SSE id: 字段支撑 Last-Event-ID 重连 base.py:16-69

关键类:MemoryStreamBridge,每 run_id 一份 _RunStream(events 列表 + asyncio.Condition + ended 标志 + start_offset)memory.py:17-35

  • publish 追加事件,超过 queue_maxsize(默认 256)时丢弃最旧的并抬高 start_offset,然后 notify_all 唤醒订阅者 memory.py:68-77
  • subscribe 是异步生成器:无新事件时 wait_for(condition.wait(), timeout=heartbeat_interval),超时则吐 HEARTBEAT_SENTINEL,生产者结束后吐 END_SENTINEL;落后于保留窗时从 start_offset 续播并告警 memory.py:85-123
  • make_stream_bridge 是异步上下文管理器,无配置或 type=="memory" 时回退 MemoryStreamBridge;redis 标记为未实现 async_provider.py:28-55

checkpointer —— 图状态持久化

职责:make_checkpointer() 异步上下文管理器为 caller 生命周期产出一个 LangGraph Checkpointer,优先级为「legacy checkpointer: 段 > 统一 database: 段 > 默认 InMemorySaver」;支持 memory / sqlite / postgres 三种后端,sqlite/postgres 各调 saver.setup() async_provider.py:125-160run_agent 在 finally 还会从检查点读 title 同步到 threads_meta.display_name worker.py:381-392

journal —— 事件与 token 捕获

职责:RunJournalBaseCallbackHandler,作为 LangChain 回调注入到 config['callbacks'],把回调数据标准化成 RunEvent 写入 RunEventStore,并在内存累加 token journal.py:1-16

  • 只在 on_llm_end 记完整消息,不实现 on_llm_new_token;on_chat_model_start 提取首条 human 消息(比 on_chain_start 可靠,内容未被检查点裁剪)journal.py:172-217
  • token 按 LangChain run_id 去重累加,按 caller(lead_agent / subagent: / middleware:)分桶;record_external_llm_usage_recordssource_run_id 去重接收子代理外部用量 journal.py:277-298 journal.py:405-446
  • 写缓冲达 flush_threshold(20)时 _flush_sync,无事件循环则保留到 worker finally 里的 flush() 一次性落库 journal.py:347-372

Data Flow

Gateway 重启后历史 Run 恢复:进程重启会清空 RunManager._runs 内存字典。GET /threads/{tid}/runslist_by_thread(),内存没命中就回退查 RunStore,把 store dict 还原成只读 RunRecord 返给前端 thread_runs.py:180-194。Run 详情走 aget() 同理 manager.py:117-129。消息与 token 由 journal 早已写入 RunEventStore,因此重启后历史对话仍完整可读;但 StreamBridge 是纯内存,重启前未结束 Run 的实时流无法再 join,因为内存事件日志和 asyncio.Task 都已丢失。

Implementation Details

run_agent 把请求的 stream_mode 列表翻译成 LangGraph 合法模式后,单模式与多模式走两条 astream 路径,核心序列化分发如下 worker.py:283-309:

python
if len(lg_modes) == 1 and not stream_subgraphs:
    single_mode = lg_modes[0]
    async for chunk in agent.astream(graph_input, config=runnable_config, stream_mode=single_mode):
        if record.abort_event.is_set():
            break
        sse_event = _lg_mode_to_sse_event(single_mode)
        await bridge.publish(run_id, sse_event, serialize(chunk, mode=single_mode))
else:
    async for item in agent.astream(graph_input, config=runnable_config,
                                     stream_mode=lg_modes, subgraphs=stream_subgraphs):
        if record.abort_event.is_set():
            break
        mode, chunk = _unpack_stream_item(item, lg_modes, stream_subgraphs)
        if mode is None:
            continue
        sse_event = _lg_mode_to_sse_event(mode)
        await bridge.publish(run_id, sse_event, serialize(chunk, mode=mode))

解读:单模式时 astream 直接吐裸 chunk;多模式或子图时吐 (mode, chunk) 元组,由 _unpack_stream_item 解包(子图额外带 namespace 前缀)。serialize(chunk, mode=mode) 按模式分派——messages 模式把 (chunk, metadata) 元组拆开各自序列化,values 模式剥掉 __pregel_* / __interrupt__ 内部键以对齐 LangGraph Platform 输出,其余走递归 model_dump() 兜底 serialization.py:67-78。注意 events 模式被显式跳过,因为它需要 astream_events(),无法与 values 快照同时产出(Python 公共 API 限制)worker.py:9-13

消费端 sse_consumer 把哨兵翻译成 SSE,并在 finally 实现 on_disconnect 语义 services.py:368-388:

python
last_event_id = request.headers.get("Last-Event-ID")
async for entry in bridge.subscribe(record.run_id, last_event_id=last_event_id):
    if await request.is_disconnected():
        break
    if entry is HEARTBEAT_SENTINEL:
        yield ": heartbeat\n\n"; continue
    if entry is END_SENTINEL:
        yield format_sse("end", None, event_id=entry.id or None); return
    yield format_sse(entry.event, entry.data, event_id=entry.id or None)
finally:
    if record.status in (RunStatus.pending, RunStatus.running):
        if record.on_disconnect == DisconnectMode.cancel:
            await run_mgr.cancel(record.run_id)

速查表

run_agent 把客户端请求的 stream_mode 归一化(messages-tuple → LangGraph messages),无有效模式回退 values worker.py:256-278:

stream_modeastream 产出SSE event序列化处理Source
values完整 state 快照 dictvaluesserialize_channel_values,剥 __pregel_* / __interrupt__serialization.py:45-56
messages / messages-tuple(message_chunk, metadata)messagesserialize_messages_tuple,拆元组分别序列化(AI 文本为增量)serialization.py:59-64
customStreamWriter 自定义负载custom递归 model_dump() 兜底serialization.py:67-78
updates / checkpoints / tasks / debugLangGraph 原生结构同名递归兜底worker.py:41
events—(不支持)显式跳过,需 astream_events 无法与 values 共存worker.py:154-159
end(哨兵)publish_endend无 data,流终止符services.py:378-380

Run 状态机:

Common Pitfalls / Tips

  • per-run_id 事件 ID 去重 / 续传:MemoryStreamBridge{ts}-{seq}(seq 从 0 起的每 run 计数器)做单调 ID;Last-Event-ID 在保留缓冲里找不到时从最早保留事件重播并告警,而非报错 memory.py:45-64。客户端需自行按消息 id 拼接增量并对重复事件去重。
  • buffer 溢出会丢早期事件:超过 queue_maxsize(默认 256)后最旧事件被删,晚到订阅者只能看到保留窗内的;调高 stream_bridge.queue_maxsize 可扩大窗口 memory.py:73-76
  • cleanup 有 60s 延迟:worker finallyasyncio.create_task(bridge.cleanup(run_id, delay=60)),给晚到 / 重连的 join 留 60 秒补播窗口;此后 run_id 的内存事件被清,再 join 拿不到历史流 worker.py:402-403
  • journal token 双计防护:LangChain 可能对同一响应多次触发 on_llm_end,journal 用 _counted_llm_run_ids 集合按 LangChain run_id 去重;子代理外部用量用 _counted_external_source_idssource_run_id 去重 journal.py:283-298
  • get()aget() 别混用:内部需要 live task / abort_event(如 cancel、join)必须用同步 get()(只查内存);只读展示用异步 aget()(带 RunStore 回退,但拿不到活 task)manager.py:113-129join_run / cancel 路由用的就是同步 get() thread_runs.py:217-244
  • rollback 依赖 pre-run 快照:若启动时检查点快照捕获失败(snapshot_capture_failed),rollback 会被跳过仅记告警,线程状态不会被还原 worker.py:436-438

References

  • backend/packages/harness/deerflow/runtime/runs/manager.py — RunManager / RunRecord / 状态机 / RunStore 回退
  • backend/packages/harness/deerflow/runtime/runs/worker.py — run_agent / RunContext / 序列化分发 / rollback
  • backend/packages/harness/deerflow/runtime/stream_bridge/memory.py — MemoryStreamBridge 事件日志 / 心跳 / 重连
  • backend/packages/harness/deerflow/runtime/stream_bridge/base.py — StreamBridge 抽象 / StreamEvent / 哨兵
  • backend/packages/harness/deerflow/runtime/checkpointer/async_provider.py — make_checkpointer 多后端工厂
  • backend/packages/harness/deerflow/runtime/journal.py — RunJournal 回调 / token 去重 / flush
  • backend/packages/harness/deerflow/runtime/serialization.py — serialize 按 stream_mode 分派
  • backend/app/gateway/services.py — Run 创建 / sse_consumer / on_disconnect
  • backend/app/gateway/routers/thread_runs.py — Run REST / join SSE / 历史恢复读路径
页面关系
13-Gateway-API与路由体系.md上游:/api/langgraph/* 路由如何调用 RunManager + sse_consumer 暴露 Run API
16-持久化与存储层.md下游:RunStore / RunEventStore / Checkpointer 的存储后端实现与持久化细节
11-ThreadState与状态管理.md相关:values 模式序列化的 ThreadState 结构与 reducer
29-AI消息流与流式渲染.md相关:前端如何消费 SSE、按消息 id 拼接 messages 增量与去重
32-嵌入式Python客户端.md相关:DeerFlowClient.stream() 走相同 stream_mode 语义的进程内并行路径

公司内部参考 · 由 claude-wiki-gen 基于源码自动生成的二次分析