Skip to content

AI 消息流与 SSE 基础设施

本章目标:

  1. 看懂前端两条 AI 流式管道——LangGraph SDK useStream(chat 路径,直连引擎)与 useAITaskStream(BFF SSE 长任务衬底),分别解决什么场景、为什么不能合一
  2. 掌握通用 SSE 解析器 sse-parser 的边界拆分算法,以及 run-metadata-storage 如何用 sessionStorage 实现刷新/切页后自动 reattach 续流
  3. 理解 insight-engine client 的"双形态 response"(L1 cache 命中走 JSON、miss 走 SSE)如何被三函数契约统一,以及 installLangGraphInterceptor 怎样兜住 SDK 的 thread auto-create 缺口

TL;DR

前端有两套互不替代的 AI 流式基础设施:① chat 路径用 LangGraph SDK 官方 useStream(core/threads/hooks.ts:215),直连 Aegra 运行时,靠 SDK 自带 reconnectOnMount + streamResumable 做断线续流;② acdm SSE 长任务(insight 聚合 / 未来 weekly_report)走 acdm-backend BFF 的 text/event-stream,用自研衬底 core/ai-task——useAITaskStream 管六态生命周期、run-metadata-storage 用 sessionStorage 持久化 {run_id, thread_id}sse-parser 把 fetch ReadableStream 拆成 AsyncIterable<SseMessage>。两套都做"刷新页面继续看进度",但前者是直连引擎、后者是 BFF 协议适配版。insight-engine client 提供 submit/attach/cancel 三函数,并处理"L1 cache 命中返 JSON、miss 返 SSE"的双形态 response。installLangGraphInterceptor 在 SDK client 上打补丁,默认补 ifExists:"do_nothing" 防 thread 重复创建撞 409。

Overview(为什么需要两套流式管道)

一个 AI 应用的"消息流"要回答的核心问题是:LLM 生成要几十秒甚至几分钟,这期间用户刷新页面 / 切走再回来,凭什么还能看到进度而不是从头再跑一遍?

直觉做法是发一个普通 HTTP 请求等响应,但对 AI 长任务这有三个致命问题:

  1. 超时:Nginx / 浏览器对挂起请求有超时(通常 30-60s),insight 聚合跑 9 节点 pipeline 对多场会议串行 AI 抽取要几分钟,普通请求必断。
  2. 无进度:用户盯着转圈几分钟,不知道卡在哪一步,以为挂了。
  3. 不可恢复:刷新页面 = 请求断 = 任务白跑(后端可能还在跑,但前端永远拿不到结果了)。

答案是 SSE(Server-Sent Events)+ run 持久化:服务端持续 yield 进度事件,前端把 run_id 存 sessionStorage;页面重载后凭 run_id 重新 attach 到那个还在跑的 run 继续收事件。

但 a-cdm 有两类长流场景,走了两条不同管道,这是本章最关键的认知:

为什么不合一?chat 路径是用户对话,thread 是稳定 ID、有完整 LangGraph 协议(messages / artifacts / todos / custom events),用 SDK 官方 useStream 最省事且能复用 SDK 的 joinStream 续流。acdm 长任务是确定性 pipeline(9 个固定节点),走 acdm-backend BFF(要做项目鉴权 + L1 cache),BFF 暴露的是简化版 SSE 协议(meta/progress/result/error 四种事件),不是完整 LangGraph 协议——所以需要一套业务无关的 SSE 衬底(core/ai-task)而非套 SDK。frontend/CLAUDE.md 把这套定性为"BFF 协议适配版"。

Architecture

组件职责入口文件Source
useThreadStreamchat 路径主 hook:封装 SDK useStream,管 optimistic 消息 / 文件上传 / 提交core/threads/hooks.tsdeer-flow/frontend/src/core/threads/hooks.ts:139-604
installLangGraphInterceptor在 SDK client 上打补丁:ifExists 默认值 + runs.stream 前兜底建 threadcore/api/api-client.tsdeer-flow/frontend/src/core/api/api-client.ts:24-59
sanitizeRunStreamOptions过滤 SDK 不支持的 streamMode,防 Aegra 报错core/api/stream-mode.tsdeer-flow/frontend/src/core/api/stream-mode.ts:36-68
useAITaskStreamBFF 长任务生命周期 hook(六态)+ 自动 reattach + 显式 cancelcore/ai-task/use-ai-task-stream.tsdeer-flow/frontend/src/core/ai-task/use-ai-task-stream.ts:87-303
sse-parser通用 SSE 解析:边界拆分 + 单条 parse + iterateSse 异步迭代器core/ai-task/sse-parser.tsdeer-flow/frontend/src/core/ai-task/sse-parser.ts:17-104
run-metadata-storagesessionStorage 持久化 {run_id, thread_id, started_at_ms},容错损坏 entrycore/ai-task/run-metadata-storage.tsdeer-flow/frontend/src/core/ai-task/run-metadata-storage.ts:33-101
insight-engine clientsubmitAggregate / attachStream / cancelRun 三函数 + 双形态 responsecore/insight-engine/client.tsdeer-flow/frontend/src/core/insight-engine/client.ts:123-228
InsightEnginePanel当前唯一 useAITaskStream consumer:四象限 UI + 进度条 + reattach bannercomponents/.../InsightEnginePanel.tsxdeer-flow/frontend/src/components/workspace/acdm/insight-engine/InsightEnginePanel.tsx:94-393

useAITaskStream 的设计哲学是 KISS + 业务无关注入:hook 本身不知道"insight 聚合"是什么,业务方注入 submit / attach / cancel 三个函数 + onProgress / onResult / onError 三个回调,hook 只管 SSE 生命周期(use-ai-task-stream.ts:20-22 注释明示"只做 lifecycle,不解析业务 payload")。

Components / Subsystems

sse-parser:通用 SSE 解析器

职责:把 fetch 返回的 Response.body(ReadableStream)拆成一条条 SseMessage

关键函数:

  • findSseSeparator(buf)(sse-parser.ts:17):找 SSE 事件分隔符。SSE 规范用空行分隔事件,但换行可能是 \n\n\r\n\r\n,这个函数取两者中位置靠前那个,并返回分隔符长度(2 或 4),供切 buffer 用。
  • parseSseMessage(raw)(sse-parser.ts:33):解析单条事件文本。逐行扫 id: / event: / data: 前缀;多行 data:\n 拼接;只剥 data: 后单个 leading space(贴合 SSE spec);最后 JSON.parse data,失败则原样返字符串。data 为空返 null(跳过纯注释/心跳)。
  • iterateSse(resp)(sse-parser.ts:69):核心异步生成器。用 reader.read() 增量读 + TextDecoder({stream:true}) 处理跨 chunk 的多字节字符,循环找分隔符切出完整事件后 yieldtry/finally 确保 reader.releaseLock()

实现要点——为什么要自己拆边界:fetchReadableStream 不保证一次 read() 正好一条 SSE 事件,可能半条、可能多条粘连。iterateSse 维护一个 buf 字符串累积,每次解码后循环找分隔符(while (sep !== -1)),把完整事件逐个 yield 出去,剩余不完整的留在 buf 等下次:

ts
// 摘自 sse-parser.ts:79-91
while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  buf += decoder.decode(value, { stream: true });
  let sep = findSseSeparator(buf);
  while (sep !== -1) {
    const raw = buf.slice(0, sep.idx);
    buf = buf.slice(sep.idx + sep.len);
    const evt = parseSseMessage(raw);
    sep = findSseSeparator(buf);
    if (evt) yield evt;
  }
}

SseMessage 带可选 id 字段(sse-parser.ts:29-31,注释明示"SSE 标准 id 字段,用于断线重连")——这是为将来基于 Last-Event-ID 的精细续流预留的接口。

run-metadata-storage:reattach 的状态底座

职责:把"哪个 run 正在跑"持久化到 sessionStorage,使页面重载后能找回。

为什么是 sessionStorage 而非 localStorage:run-metadata-storage.ts:13-14 注释说明——关 tab 自动失效正好符合"本会话内继续"语义,跨 session 恢复不在当前范围。

与 chat 路径的差异(run-metadata-storage.ts:5-11 注释)是理解两套管道的关键:

  • deer-flow chat 路径用 key 前缀 lg:stream:${assistant_id}:${thread_id}单个 run_id 字符串——它的场景 thread 稳定,只有 run_id 动。
  • acdm 长任务 thread + run 都按业务 scope 派生(每组会议一个新 run),所以用 taskKey 直接作 key,存 {run_id, thread_id, started_at_ms} JSON 对象(thread_id 必存,attach API 需要)。

容错设计:parseStoredEntry(run-metadata-storage.ts:84-101)三字段全校验——run_id/thread_id 非空字符串、started_at_ms 是 finite 正数,任一不满足视为损坏,get 时静默 removeItem 清掉返 null(:53-56)。set 时字段不全直接 noop 不写半份脏数据(:60-61)。SSR 环境(无 window)返 no-op stub 避免 ReferenceError(:34-45)。

注意 chat 路径的 getRunMetadataStorage(hooks.ts:85-114)还做了一层 normalizeStoredRunId——SDK 可能把整个 URL(含 /runs/<id> 路径或 ?run_id= query)存进去,这个函数把它归一化成纯 run_id,这是适配 LangGraph SDK 内部存储格式的兼容层。

useAITaskStream:六态生命周期机

职责:管 SSE 长任务的完整生命周期 + 自动 reattach + 显式 cancel。

六个状态(use-ai-task-stream.ts:33-39):idle / submitting / streaming / reattaching / completed / errored

三处 AbortController 协同(use-ai-task-stream.ts:117):abortRef 持有当前 in-flight fetch 的 controller。submit 时先 abort() 旧的再建新的(防 force 重发时旧 stream 还在 dispatch);cancel / unmount 时 abort()(:199-201:284-287)。mountedRef(:119)防 React StrictMode 双 effect 导致 attach 调两次时第二次污染状态。

回调用 ref 不进 deps(use-ai-task-stream.ts:108-111):callbacksRef 每次 effect 同步最新回调,但不放进 useCallback 的 deps 数组,避免业务方传内联函数导致 hook 反复重建。这是 React 性能惯用法。

consumeStream(use-ai-task-stream.ts:125-194)是 submit 流和 attach 流共用的消费循环:for await 迭代 SseMessage,按 evt.event 分发——progress 转发 onProgress;resultonResult + 清 storage + 转 completed;erroronError + 清 storage + 转 erroredAbortError 被显式吞掉(:173)——用户主动 cancel 或 unmount 不算错误。流自然结束没收到 result/error 不报错(:168-169,可能是连接断而非失败)。

insight-engine client:三函数 + 双形态 response

职责:封装 acdm-backend insight 聚合 BFF 的三个端点,适配 useAITaskStream 的三函数契约。

双形态 response 是这里最巧的设计submitAggregate(client.ts:123-162)发 POST 后看 Content-Type:

  • application/json → L1 cache 命中,后端直接返完整结果,不跑 pipeline。client 把它包成 AggregateL1HitError 抛出(:151)——用异常做控制流让调用方走 fast path。
  • text/event-stream → cache miss / force,走 SSE。先 readMetaEvent(:292-308)读首个 meta 事件拿 run_id/thread_id,剩余迭代器作 stream 返回。

为什么用异常表达 L1 hit?因为 submit 契约要求返 {run_id, thread_id, stream},L1 hit 时没有真 run。InsightEnginePaneltaskSubmit(InsightEnginePanel.tsx:184-203)catch 这个异常后,伪造一个只 yield 一条 result 事件的 async generator + 假 run_id "l1-cache-hit",让 useAITaskStream 无差别走完 lifecycle 直接 completed。这样 UI 层完全不用区分"命中缓存"和"真跑了 pipeline"两种路径。

attachStream(client.ts:173-206)返回一个懒执行的 AsyncIterable:首次 next() 才发 fetch。它不读 meta 事件(reattach 时前端已知 run_id 不需重发)。BFF 端 HTTP 状态映射:410 = run/thread Gone(LangGraph 24h 清理或 server 重启,insights_router.py:46),caller 收到后清 storage 转 idle;403 = forbidden_run_owner(防越权 reattach 别人的 run)。

installLangGraphInterceptor:SDK 补丁层

职责:在 LangGraph SDK client 上 monkey-patch 四个方法,兜住 SDK 内部行为缺口。

两个关键补丁(api-client.ts:24-59):

  1. threads.createifExists 时自动补 "do_nothing"(:30-33:61-75)——useStream 内部 submit 会隐式调 threads.create 不传 ifExists,撞到前端 pre-create 已建的同 threadId 会返 409;补 do_nothing 幂等化。
  2. runs.stream / runs.wait 前若 threadId 非空,先 ensureThread(:35-51)——兜住 SDK 在 stream 内部 auto-create 不带 metadata 的缺口。ensureThread 失败只 console.warn 不阻断(:77-91),让真错误由 stream 自己 surface 给 onError

runs.joinStream(:53-58)和 runs.stream 都过 sanitizeRunStreamOptions——Aegra 0.9.4 不支持 SDK 全部 streamMode,这个 sanitizer(stream-mode.ts:36-68)过滤掉不支持的 mode 并去重打一次 warn,防 Aegra 报错。client 是按 mock/default 缓存的单例(:109-120)。

Data Flow

chat 路径:submit + 断线 reattach(SDK 内建)

chat 路径的续流是 SDK 内建:useStreamreconnectOnMount: () => runMetadataStorage(hooks.ts:219-221)+ submit 时 streamResumable: true(hooks.ts:539)。SDK 自己把 run_id 写进 lg:stream:* key,mount 时读回调 joinStream 续流。前端只提供存储实现 + 归一化逻辑。

acdm SSE 长任务:六态流转(自研衬底)

文字说明 reattach 路径(本章重点)——用户刷新页面后:

  1. InsightEnginePanel 重 mount,useAITaskStream 的 mount effect(use-ai-task-stream.ts:262)跑。
  2. storage.get(taskKey) 读 sessionStorage(:264)。taskKeyprojectId + sorted(atomIds) 派生(InsightEnginePanel.tsx:178-182),保证同一组会议跨 mount 同 key。
  3. 有 entry → 恢复 runId/threadId/startedAtMs,setIsReattached(true),状态先 reattaching(:265-272)。
  4. attachFn(→ attachStreamcancelFn 不涉及),拿到懒 AsyncIterable(:273-277)。
  5. 立即转 streaming(:280),consumeStream 开始 for await 收 BFF 续发的 progress/result/error(:281)。
  6. UI 渲染 <ReattachBanner>(InsightEnginePanel.tsx:287-289)提示"已恢复进行中任务,发起于 X 分 Y 秒前"——startedAtMs 取自 storage 而非当前时刻,所以计时连续。
  7. 收到 result → 清 storage(:139)、转 completed、渲染四象限结果。若 run 已不存在(BFF 返 410),attachStream fetch 抛错被 consumeStream catch → errored → 清 storage。

速查表

两套流式管道对比

维度chat 路径acdm SSE 长任务Source
入口 hookuseThreadStreamuseAITaskStreamhooks.ts:139 / use-ai-task-stream.ts:87
底层流LangGraph SDK useStreamfetch + iterateSsehooks.ts:215 / sse-parser.ts:69
后端Aegra(直连,LangGraph 协议)acdm-backend BFF(text/event-stream)api-client.ts:101-107 / insights_router.py:104
续流机制SDK reconnectOnMount + joinStream自研 sessionStorage + attachStreamhooks.ts:219 / use-ai-task-stream.ts:262
storage keylg:stream:${assistant}:${thread} 单值taskKey{run_id,thread_id,started_at_ms} JSONhooks.ts:85-114 / run-metadata-storage.ts:48-71
事件类型messages / updates / custom / langchainmeta / progress / result / errorhooks.ts:234-308 / use-ai-task-stream.ts:134-165
当前 consumer所有 chat 页InsightEnginePanel(未来 weekly_report)hooks.ts:139 / InsightEnginePanel.tsx:224

BFF SSE 事件类型(event: 字段)

event何时发data 内容前端处理Source
metaSSE 流首 chunk(仅 submit,attach 不发){run_id, thread_id, started_at}readMetaEvent 剥离,不进 consumeStreamclient.ts:292-308
progress每个 pipeline 节点完成{node, completed, total}onProgress → 进度条use-ai-task-stream.ts:134-135
resultpipeline 完成 / L1 hitAggregateResult(payload+stale)onResult + 清 storage + completeduse-ai-task-stream.ts:136-147
errorpipeline 失败{error, message}onError + 清 storage + erroreduse-ai-task-stream.ts:148-165

useAITaskStream 状态来源

状态触发startedAtMs 来源isReattachedSource
submittingsubmit() 调用,等 submitFnfalseuse-ai-task-stream.ts:203
streamingsubmitFn 返回 / reattach 后Date.now() / storage 值false / true:220:280
reattachingmount 检出 storage entrystorage started_at_mstrue:268-272
completedresult 事件清 null清 false:142-146
errorederror / stream 异常 / submit 失败清 null清 false:156-163
idle显式 cancel()清 null清 false:253-258

扩展指南:接入一个新的 SSE 长任务

core/ai-task 设计成业务无关衬底,接新长任务(如未来的 weekly_report stream)无需重写 lifecycle,只需提供三函数 + 一个 taskKey:

ts
// 1. client 侧实现三函数(参考 insight-engine/client.ts)
async function submitMyTask(req, signal): Promise<{run_id, thread_id, stream}> {
  const resp = await fetch("/api/acdm/.../my-task", {
    method: "POST", body: JSON.stringify(req),
    credentials: "include", signal,
  });
  await throwIfHttpError(resp);
  const iter = iterateSse(resp);              // 复用通用解析器
  const meta = await readMetaEvent(iter);     // 读首 meta 事件
  return { run_id: meta.run_id, thread_id: meta.thread_id, stream: iter };
}
function attachMyTask({run_id, thread_id, signal}): AsyncIterable<SseMessage> { /* 懒 fetch */ }
function cancelMyTask({run_id, thread_id}): Promise<void> { /* POST cancel */ }

// 2. 组件里一行接入,自动获得 reattach + cancel
const task = useAITaskStream<MyResult>({
  taskKey: `ai-task:my-task:${scopeId}`,      // 同 scope 跨 mount 必须同 key
  submit: submitMyTask,
  attach: attachMyTask,
  cancel: cancelMyTask,
  onProgress: (p) => setProgress(p),
  onResult: (r) => setResult(r),
  onError: (e) => setError(e),
});

约束清单(从代码读出):

  • BFF 必须发 meta 首事件run_id/thread_id,否则 readMetaEvent(client.ts:296-307)抛 "SSE first event expected 'meta'"。attach 端点不发 meta。
  • 三函数签名固定:submit({force,signal})AITaskSubmitResultattach({run_id,thread_id,signal})AsyncIterable<SseMessage>cancel({run_id,thread_id})Promise<void>(use-ai-task-stream.ts:55-64)。
  • taskKey 必须按业务 scope 稳定派生:相同任务跨 mount 必须同 key 才能 reattach;不同 scope 必须不同 key 防串(InsightEnginePanel 用 projectId + sorted(atomIds))。
  • 结果事件 event: 名必须是 progress/result/error,衬底硬编码这三个名分发(use-ai-task-stream.ts:134-165),其他 event 名被忽略。
  • storage 字段三件套必齐全:run_id/thread_id 非空 string + started_at_ms 正数,缺一视损坏被清(run-metadata-storage.ts:99)。

Configuration

Config默认值含义影响Source
assistantId"lead_agent"chat 路径绑定的 Aegra assistant决定 chat 走哪个 agent graphhooks.ts:217
reconnectOnMountstorage getter / falseSDK 续流开关有 storage 才 mount 续流hooks.ts:219-221
streamResumabletrue(submit 时)run 可恢复配合 reconnect 续流hooks.ts:539
fetchStateHistory{ limit: 1 }mount 拉历史 state 条数重 mount 恢复最新 statehooks.ts:222
recursion_limit1000LangGraph 图递归上限防深度子代理链中断hooks.ts:541
taskKeyai-task:insight-aggregate:${pid}::${sorted ids}长任务唯一标识 + storage key决定 reattach 边界InsightEnginePanel.tsx:178-182
SUPPORTED_RUN_STREAM_MODES9 种Aegra 支持的 streamMode 白名单过滤不支持的 modestream-mode.ts:1-11

Common Pitfalls / 实战 Tips

  • chat 续流和长任务续流是两套,storage key 不同:chat 用 lg:stream:* 单值 + URL 归一化;长任务用 taskKey JSON 对象。别把 core/ai-task 那套套到 chat 上(协议不同,chat 是完整 LangGraph 协议)。代码注释 run-metadata-storage.ts:5-11 明确这个分野。
  • L1 cache 命中走的是异常控制流:submitAggregate 命中缓存抛 AggregateL1HitError(client.ts:105-112),不是返回值。漏 catch 会被当真错误进 erroredInsightEnginePanel.tsx:188-200 的伪 generator 模式是标准接法。
  • reattach 计时取 storage 的 started_at_ms 不是当前时刻:ReattachBanner 显示"发起于 X 分前"靠这个,所以刷新页面后计时连续不归零(use-ai-task-stream.ts:269InsightEnginePanel.tsx:432-439)。
  • stream 自然结束无 result 不报错:consumeStream(use-ai-task-stream.ts:168-169)对"流断但没收到 result/error"故意不报错,因为可能只是连接断、后端还在跑——下次 mount 仍能 reattach。这是设计不是 bug。
  • JWT 过期在 sendMessage 处提前拦:chat 路径 hooks.ts:356-369acdmUser.sub 为空时直接跳 SSO,不让请求撞 401 才发现。
  • ensureThread 失败只 warn 不阻断:api-client.ts:86-90 故意吞 ensureThread 错误,让真错误由 stream 的 onError surface——别在这里加抛错,会掩盖真实失败原因。

References

  • deer-flow/frontend/src/core/ai-task/use-ai-task-stream.ts:1-303 — 六态生命周期 hook(本章核心,业务无关衬底)
  • deer-flow/frontend/src/core/ai-task/sse-parser.ts:1-104 — 通用 SSE 解析(findSseSeparator / parseSseMessage / iterateSse)
  • deer-flow/frontend/src/core/ai-task/run-metadata-storage.ts:1-101 — sessionStorage 持久化 + 损坏容错
  • deer-flow/frontend/src/core/insight-engine/client.ts:1-308 — 三函数 + 双形态 response + L1HitError 控制流
  • deer-flow/frontend/src/core/threads/hooks.ts:139-604 — chat 路径 useThreadStream(SDK useStream 封装)
  • deer-flow/frontend/src/core/api/api-client.ts:1-125 — installLangGraphInterceptor SDK 补丁层
  • deer-flow/frontend/src/core/api/stream-mode.ts:1-68 — streamMode sanitizer(Aegra 兼容)
  • deer-flow/frontend/src/components/workspace/acdm/insight-engine/InsightEnginePanel.tsx:94-393 — 唯一 useAITaskStream consumer
  • acdm-backend/app/api/insights_router.py:55-208 — BFF 端 aggregate/attach_stream(SSE 协议另一端)
PageRelationship
前端技术栈与架构本章是该章 core/ 业务逻辑层中流式管道的深入展开
proxy 鉴权守卫与 API 反代本章 fetch credentials:include 经该章 proxy 反代到 BFF
项目工作台与工作区本章 chat 路径 useThreadStream 服务于该章工作区聊天
会议洞察引擎本章 insight-engine client 是该章 9 节点 pipeline 的前端入口
Aegra 运行时与 LangGraph本章 chat 路径直连该章 Aegra,SDK streamMode 受其约束
请求生命周期与服务拓扑本章两条流式路径对应该章不同服务拓扑分支

公司内部参考 · 由 claude-wiki-gen 基于源码自动生成的二次分析