Skip to content

AI 消息流与流式渲染

本章目标:

  1. 讲清前端如何用 langgraph-sdkuseStream 接入后端 SSE 流,以及 values / messages-tuple / custom / end 四类事件在前端各自的消费职责。
  2. 解释为什么 AI 文本必须按 message id 拼接(concat)而不是整段替换,以及前端如何对历史、实时流、乐观消息三路做去重合并。
  3. 梳理 core/streamdown 流式 Markdown 渲染与 components/ai-elements 消息组件如何把 delta 文本渐进式呈现给用户。

TL;DR

前端通过 getAPIClient() 拿到经 CSRF/stream_mode 包装的 langgraph-sdk 单例 client,useThreadStream 用 SDK 的 useStream hook 订阅 lead_agent 的运行。SDK 在 wire 层用 messages-tuple 投递 AI token delta(每片只含本次新增 token,需按 id 累加),useStream 内部维护累加器把同一 id 的 chunk concat 成完整消息,前端再用 mergeMessages 把后端历史、实时 thread.messages、乐观消息三路按 identity(tool_call_idid)去重合并。最终消息经 getMessageGroups 分组,交给 Streamdown(core/streamdown 配置 remark/rehype 插件)做流式 Markdown 渲染。这与后端第 15 章 StreamBridge 的 SSE 生产侧严格对应。

Overview

为什么要按 id 拼接 delta 而非整段替换

LangGraph 的 messages mode 给出的是 delta:每个 AIMessageChunk.content 只包含这一次新 yield 的 token,不是从头的累计文本——这一语义在后端设计文档里被明确写出 STREAMING.md:177。如果前端把每片 chunk 当成"完整消息整段替换",屏幕上只会看到最后一个 BPE token(例如末尾的 "world!"),而非完整回复。

正确的做法是:上游发增量,下游负责按消息 id 累加。同一条 AI 消息的所有 chunk 共享同一个 id;消费者把它们 concat 起来才能重建完整文本。后端 STREAMING.md 对此点明:Gateway 路径里前端 useStream React hook 自己维护累加器 STREAMING.md:179。前端这一侧的累加由 @langchain/langgraph-sdk/reactuseStream 内部完成,业务代码只面对"已经拼好的 thread.messages"。

但前端还面临 SDK 解决不了的第二层去重:同一条消息可能同时出现在三个来源——后端分页历史 useThreadHistory、实时流 thread.messages、本地乐观消息 optimisticMessages。三路必须按 message identity 去重合并,否则用户会看到重复气泡。这就是 mergeMessages + dedupeMessagesByIdentity 存在的原因 hooks.ts:95-126

Architecture

消息流管线分四段:接入(client 包装)→ 订阅(useStream + stream_mode 分流)→ 合并去重(mergeMessages)→ 渲染(group + Streamdown)

  • Client 包装:getAPIClient() 缓存一个 langgraph-sdk Client 单例,通过 onRequest 钩子注入 X-CSRF-Token,并猴补丁 runs.stream/runs.joinStream 以过滤后端不支持的 stream mode api-client.ts:34-72
  • stream mode 白名单:sanitizeRunStreamOptionsSUPPORTED_RUN_STREAM_MODES 白名单(含 values/messages/messages-tuple/custom 等)裁掉不支持的 mode 并 warn 一次 stream-mode.ts:1-68
  • 订阅 hook:useThreadStream 用 SDK 的 useStream<AgentThreadState> 订阅 assistantId: "lead_agent",挂载 onCreated/onUpdateEvent/onCustomEvent/onLangChainEvent/onError/onFinish 回调 hooks.ts:222-360
  • 合并去重:mergeMessages(history, thread.messages, optimisticMessages) 输出统一消息列表 hooks.ts:630-647
  • 渲染:getMessageGroups 把消息归组,MessageListItemMarkdownContentMessageResponse(Streamdown 的 memo 包装)落地 message.tsx:307-320

Source 列表:

关键文件
Client 包装frontend/src/core/api/api-client.ts
stream modefrontend/src/core/api/stream-mode.ts
订阅与合并frontend/src/core/threads/hooks.ts
消息分组/抽取frontend/src/core/messages/utils.ts
流式渲染frontend/src/core/streamdown/plugins.ts, frontend/src/components/ai-elements/message.tsx
后端对应backend/packages/harness/deerflow/runtime/stream_bridge/base.py

Components / Subsystems

core/api —— langgraph-sdk client 接入

  • 职责:构造并缓存 langgraph-sdk Client 单例,为每个状态变更请求即时注入 X-CSRF-Token,并把不支持的 stream mode 在出站前裁掉。
  • 关键文件:
    • getAPIClient(isMock)mock/default 缓存 client,createCompatibleClientonRequest: injectCsrfHeader 构造 api-client.ts:34-72
    • injectCsrfHeader 仅对状态变更方法读 csrf_token cookie 并设头,处理登录/登出 cookie 轮换 api-client.ts:21-32
    • sanitizeRunStreamOptionsSUPPORTED_RUN_STREAM_MODES 集合过滤 streamMode,warnUnsupportedStreamModes 对每个被丢弃的 mode 仅 warn 一次 stream-mode.ts:36-68

core/threads —— useStream 订阅与三路合并

  • 职责:用 SDK useStream 订阅运行,把不同 stream mode 的事件路由到对应回调;把后端历史、实时流、乐观消息合并去重;管理乐观消息生命周期与 token usage baseline。
  • 关键文件:
    • useThreadStream 配置 useStream<AgentThreadState>({ client, assistantId: "lead_agent", reconnectOnMount: true, fetchStateHistory: { limit: 1 } }),thread.submit 发送 human 消息并带 context hooks.ts:222-227hooks.ts:566-611
    • messageIdentity 优先用 tool:${tool_call_id},否则 message:${id},作为去重键 hooks.ts:52-64
    • mergeMessages 从尾部扫描找历史与实时流的重叠后缀,dedupeMessagesByIdentity 保留每个 identity 的最后一次出现(让实时消息覆盖历史) hooks.ts:66-126
    • useThreadHistory 按 run 分页拉 /runs/{run_id}/messages,过滤掉 callermiddleware: 开头的消息 hooks.ts:709-730

core/messages —— 消息分组与内容抽取

  • 职责:把扁平消息列表归组为 human / assistant / processing / clarification / subagent / present-files,并从结构化或字符串 content 中抽取正文与 reasoning。
  • 关键文件:
    • getMessageGroups 单遍扫描消息,累积连续中间态 AI 消息到 assistant:processing 组,带 content 且无 tool call 的再单独开一个 assistant 气泡 utils.ts:36-128
    • extractContentFromMessage 处理 content 为 string 或数组(text/image_url)两种形态,并剥离内联 <think> reasoning utils.ts:216-240
    • extractReasoningContentFromMessage 兼容 additional_kwargs.reasoning_content、Anthropic 风格 thinking part、以及内联 <think> 三种 reasoning 载体 utils.ts:242-262
    • isHiddenFromUIMessage 隐藏 hide_from_uisummary/loop_warning/todo_reminder 等控制消息 utils.ts:29-34utils.ts:371-377

core/streamdown —— 流式 Markdown 渲染插件

  • 职责:为 streamdown 库提供 remark/rehype 插件预设,区分普通正文、reasoning、人类消息三套配置,并支持 CJK 分词淡入动画。
  • 关键文件:
    • streamdownPlugins 启用 remarkGfm + remarkMath(单美元行内数学)+ rehypeRaw + rehypeKatex plugins.ts:9-18
    • reasoningPlugins 从基础预设里移除 rehypeRaw,防止 LLM 幻觉的 HTML 标签(如 <simd>)被渲染成 DOM plugins.ts:31-38
    • humanMessagePlugins 不含 autolink,避免 URL 渗入相邻文本 plugins.ts:40-50
    • rehypeSplitWordsIntoSpans 在流式时把英文按词、CJK 整段包进 animate-fade-in<span>,实现逐词淡入;useRehypeSplitWordsIntoSpans(isLoading) 仅在 loading 时启用 index.ts(rehype):9-56

components/ai-elements —— 消息 UI 组件

  • 职责:把分好组的消息渲染成气泡、reasoning 折叠面板、流式指示器,并对 Streamdown 做 children 浅比较 memo 化以减少重渲染。
  • 关键文件:
    • MessageResponseStreamdownmemo 包装,自定义比较器仅当 children(即文本)变化时才重渲染 message.tsx:307-320
    • ReasoningContentreasoningPlugins 渲染思考内容,ReasoningisStreaming 期间自动展开、结束后延时 1s 自动折叠并计算耗时 reasoning.tsx:70-92reasoning.tsx:171-184
    • MessageContent_ 按消息形态分支:上传占位(element === "task")→ 仅 reasoning → human → 普通 AI 正文,普通正文走 MarkdownContent 并把 rehypeSplitWordsIntoSpans 注入 rehype 链 message-list-item.tsx:262-336

Data Flow

四类事件在前端的落点:

  • messages-tuple:SDK 内部把同 id 的 delta concat 进 thread.messages,业务代码不直接处理 chunk。后端 wire 层这一 mode 名对应 Graph 层 messages、前端 useStreammessages-tuple,是三个协议层的别名 STREAMING.md:81-98
  • values:onUpdateEventSummarizationMiddleware.before_model 把被摘要消息搬入历史,并捕获 title 更新写入 query 缓存 hooks.ts:246-302
  • custom:onCustomEvent 处理 task_running(更新子任务最新消息)与 llm_retry(toast 提示) hooks.ts:303-331
  • end / 错误:onFinish / onError 重置 token usage baseline 并失效 threads/token-usage 查询 hooks.ts:332-359

合并后的扁平消息列表经 getMessageGroups 单遍扫描归组,流式过程中同一条 AI 消息会随 content 变长而在不同分支间流转:

Implementation Details

前端"按 id 拼接 delta"的去重不变式集中体现在 mergeMessages + dedupeMessagesByIdentity。SDK 已把同 id 的 token delta concat 进 thread.messages,前端再做跨来源去重——核心是"同一 identity 只保留最后出现的一份",从而让实时流覆盖历史快照:

ts
function dedupeMessagesByIdentity(messages: Message[]): Message[] {
  const lastIndexByIdentity = new Map<string, number>();
  messages.forEach((message, index) => {
    const identity = messageIdentity(message); // tool:<id> 或 message:<id>
    if (identity) lastIndexByIdentity.set(identity, index);
  });
  return messages.filter((message, index) => {
    const identity = messageIdentity(message);
    return !identity || lastIndexByIdentity.get(identity) === index;
  });
}

hooks.ts:66-80

解读:lastIndexByIdentity 记录每个 identity 最后一次出现的下标;filter 只保留"自己就是最后一次出现"的那条。因为 mergeMessages 的拼接顺序是 [历史(裁掉重叠后缀), thread.messages, optimistic] hooks.ts:121-125,实时流里同 id 的消息排在历史之后,自然成为"最后出现",于是历史中的旧副本被丢弃、实时(已 concat 完整文本的)版本胜出。useStream 把 streaming 中的 AI 消息持续重写为更长的字符串,但 id 不变,前端无需感知 chunk——它只看到 thread.messages 里那条消息的 content 在每次重渲染时变长。这正是"按 id 拼接"在前端的最终体现:SDK 负责 token 级 concat,dedupeMessagesByIdentity 负责跨来源唯一化。单元测试断言了"实时覆盖历史"与"按 tool_call_id 去重"两条性质 message-merge.test.ts:21-60

速查表

stream_mode后端语义SDK 层名称前端处理点Source
values每个 graph 节点完成后的完整 state 快照valuesonUpdateEvent:摘要搬历史、捕获 title;thread.values 更新hooks.ts:246-302
messagesLLM 每 chunk 的 (AIMessageChunk, meta) deltamessages-tupleuseStream 内部按 id concat 进 thread.messages,业务不直接处理STREAMING.md:75-98
customStreamWriter.write() 的任意 dictcustomonCustomEvent:task_runningupdateSubtask;llm_retrytoasthooks.ts:303-331
end(流结束)运行完成 sentinelonFinish 回调重置 token usage baseline,失效 threads/token-usage 查询hooks.ts:346-359
(错误)异常事件onError 回调清乐观消息、toast.error、重置 usage baselinehooks.ts:332-345
on_tool_end(LangChain 事件)工具结束onLangChainEvent转发 onToolEnd({ name, data })hooks.ts:238-245
不支持的 mode————sanitizeRunStreamOptions 在出站前裁掉并 warn 一次stream-mode.ts:36-68

Common Pitfalls / Tips

  • delta 不是累计文本:messages mode 每片只含本次新增 token,直接整段替换会只显示末尾几个字符;必须按 id 累加。前端这层由 useStream 代劳,但 core/messages 的内容抽取仍需正确处理 content 为数组的形态 utils.ts:216-240
  • values 快照不要重复合成:同一条 AI 消息既走 messages token 流又出现在 values 快照里;mergeMessages 的"保留最后出现"配合 SDK 的 streamed 跟踪,确保不出现重复气泡 hooks.ts:66-126。后端同侧不变式见 STREAMING.md:236-244
  • 历史与实时流重叠:历史是 thread.messages 的连续前缀;mergeMessages 从尾部扫描历史找"已在实时流里"的后缀并裁掉,避免重复加载 hooks.ts:104-125
  • 乐观 human 消息过早清除:AI 的 messages-tuple 可能先于 human 的 values 到达;清除乐观消息要等服务端 human 消息出现(humanMessageCount 增长)才执行,否则用户输入会瞬间消失 hooks.ts:411-426
  • reasoning 不要用 rehypeRaw:LLM 可能在思考里吐出形似 HTML 的 token(如 <simd>);reasoning 渲染用 reasoningPlugins(去掉 rehypeRaw)防止被当成真实 DOM 元素 plugins.ts:31-38
  • 逐词淡入仅 streaming 启用:useRehypeSplitWordsIntoSpans(isLoading) 只在 loading 时注入分词 span;非流式时关闭,避免静态消息的无谓 DOM 膨胀 index.ts(rehype):50-56
  • CSRF cookie 每请求读取:client 不在构造时固化 token,而是 onRequest 时即时读 cookie,以兼容登录/登出/改密时的 cookie 轮换 api-client.ts:10-32

References

  • frontend/src/core/threads/hooks.ts —— useThreadStream / mergeMessages / useThreadHistory,流订阅与三路合并的核心 L52-L126
  • frontend/src/core/api/api-client.ts —— langgraph-sdk client 单例与 CSRF 注入 L34-L72
  • frontend/src/core/api/stream-mode.ts —— stream mode 白名单与裁剪 L1-L68
  • frontend/src/core/messages/utils.ts —— 消息分组与内容/reasoning 抽取 L36-L262
  • frontend/src/core/streamdown/plugins.ts —— 流式 Markdown remark/rehype 插件预设 L9-L50
  • frontend/src/core/rehype/index.ts —— 流式逐词淡入 rehype 插件 L9-L56
  • frontend/src/components/ai-elements/message.tsx —— MessageResponse(Streamdown memo 包装)L305-L320
  • frontend/src/components/workspace/messages/message-list-item.tsx —— 消息形态分支与流式渲染落地 L262-L336
  • backend/docs/STREAMING.md —— delta vs cumulative 语义、多协议层命名、去重不变式 L75-L244
  • backend/packages/harness/deerflow/runtime/stream_bridge/base.py —— SSE 生产侧 StreamEvent/publish/subscribe 契约 L37-L72
章节关系
15-Runtime运行时与StreamBridge后端 SSE 生产侧:StreamBridge 如何 publish/subscribe 事件,与本章前端消费侧严格对应
28-前端技术栈与应用结构本章所处的 Next.js + langgraph-sdk 技术栈与 core/ 分层背景
30-工作区与聊天界面消费本章合并后消息列表的聊天 UI:MessageList/MessageGroup 渲染
32-嵌入式Python客户端另一条并行流式路径 DeerFlowClient.stream(),同样按 id 累加 delta,可对照前端理解

公司内部参考 · 由 claude-wiki-gen 基于源码自动生成的二次分析