Skip to content

IM 通道系统

本章目标:

  1. 讲清楚 app/channels 为什么用 langgraph-sdk 走 Gateway 同一条路径,而不是直接调用内部 agent runtime。
  2. 拆解 message_bus(异步 pub/sub)与 manager(核心 dispatcher)如何把外部聊天平台的消息桥接到 DeerFlow thread。
  3. 对比 Slack/Telegram 的 runs.wait 一次性回复与 Feishu 的 runs.stream 卡片增量更新两种交付模式,并给出自定义 Channel 的最小实现模板。

TL;DR

app/channels 把飞书 / Slack / Telegram / 钉钉 / 企业微信 / 微信 / Discord 等外部 IM 平台桥接到 DeerFlow agent。每个平台实现一个 Channel 子类,把收到的消息封装成 InboundMessage 投入 MessageBus 的异步队列;ChannelManager 的单一 dispatch loop 消费队列,通过 langgraph-sdk HTTP client(注入进程内 internal auth + CSRF)在 Gateway 上创建 / 复用 thread,再用 runs.wait(非流式平台)或 runs.stream(飞书 / 企业微信)拿到 agent 输出,经 OutboundMessage 回调发回平台。ChannelStore 用单个 JSON 文件把 channel:chat[:topic] 持久化映射到 thread_id,实现多轮会话续聊。

Overview

DeerFlow 的 agent runtime 内嵌在 Gateway 进程里(端口 8001),Gateway 同时暴露一套 LangGraph 兼容的 HTTP API。IM 通道系统面临一个设计抉择:进程内已经能 import 到 agent,为什么还要绕一圈走 HTTP?

答案是「路径统一」。channels 与前端 Web UI 使用完全相同langgraph-sdk 客户端入口与 thread/run 语义——ChannelManager 在文件头就声明了这个意图(backend/app/channels/manager.py:1),client 通过 langgraph_sdk.get_client(url=...) 创建(backend/app/channels/manager.py:636-649)。这样做的收益:

  • thread 由 Gateway 服务端统一管理:client.threads.create() 创建的 thread 与 Web UI 创建的 thread 走同一套持久化、checkpoint、清理逻辑,IM 会话和网页会话天然可互通同一个 thread_id
  • 中间件链 / 鉴权 / 限流不重复实现:channels 不需要自己拼装 agent、加载工具、跑中间件,所有逻辑落在 Gateway 一侧。
  • 解耦平台与 agent:MessageBus 在 channels 与 dispatcher 之间插入异步队列,平台收消息只管 publish_inbound,dispatcher 不感知平台细节。

代价是引入了「同进程 HTTP 自调用」需要解决鉴权:Gateway 默认要求浏览器 session cookie + CSRF 双提交,而 channel worker 没有浏览器会话。系统用进程本地 internal token(X-DeerFlow-Internal-Token,进程启动时 secrets.token_urlsafe(32) 生成,backend/app/gateway/internal_auth.py:11)加一对自洽的 CSRF cookie/header 解决,详见「Implementation Details」。

Architecture

整个系统由两条解耦的数据通路构成,中间隔着 MessageBus:

Source 列表:

文件角色
message_bus.py异步 pub/sub 中枢,定义 InboundMessage / OutboundMessage / ResolvedAttachment
manager.py核心 dispatcher,thread 生命周期 + runs.wait/stream + 命令路由
store.pyJSON 持久化 channel:chat[:topic]thread_id
base.py抽象 Channel,定义平台实现契约
service.pyconfig.yaml 读取配置,按 registry 懒加载并管理生命周期
commands.py单一权威命令集 KNOWN_CHANNEL_COMMANDS

Components / Subsystems

message_bus —— 异步 pub/sub 中枢

职责:在 channels 与 dispatcher 之间插入无锁解耦层。inbound 用单个 asyncio.Queue 实现「多生产者(各平台)→ 单消费者(dispatch loop)」;outbound 用回调列表实现「单生产者(dispatcher)→ 多订阅者(各 channel)」。

关键类型与方法:

  • InboundMessage(dataclass,backend/app/channels/message_bus.py:29-58):携带 channel_namechat_iduser_idtextmsg_typethread_tstopic_idfilesmetadatatopic_id 是会话续聊的关键——同一 chat_id 内相同 topic_id 复用同一 DeerFlow thread;None 时每条消息新建 thread(一次性问答)。
  • OutboundMessage(dataclass,backend/app/channels/message_bus.py:82-107):带 is_final 标志区分流式中间帧与终帧,attachments 是已解析为宿主路径的 ResolvedAttachment 列表。
  • MessageBus.publish_outbound 对每个回调单独 try/except,单个 channel 回调异常不影响其它(backend/app/channels/message_bus.py:169-173)。

manager —— 核心 dispatcher

职责:消费 inbound 队列,管理 thread 生命周期,调用 Gateway 的 LangGraph API,把结果回投 outbound。核心类 ChannelManager(backend/app/channels/manager.py:541-547)。

store —— JSON 会话映射持久化

职责:把 IM 会话与 DeerFlow thread 的映射落到单个 JSON 文件,实现网关重启后仍能续聊。核心类 ChannelStore(backend/app/channels/store.py:16-44)。

base —— 抽象 Channel

职责:定义所有平台实现的契约。核心类 Channel(ABC)(backend/app/channels/base.py:14-22)。抽象方法 start / stop / send 必须实现;supports_streaming 默认 False;send_file / receive_file 默认空实现可选覆写;_make_inbound 是构造 InboundMessage 的便捷工厂(backend/app/channels/base.py:68-89)。

service —— 生命周期管理

职责:从 config.yamlchannels 段读取配置,按 _CHANNEL_REGISTRYresolve_class 懒加载已启用的 channel 类,启动 ChannelManager 与各 channel。核心类 ChannelService(backend/app/channels/service.py:55-94)。

commands —— 权威命令集

职责:KNOWN_CHANNEL_COMMANDS 单一来源,channel 解析器与 dispatcher 共用,新增 / 删除命令只改这一处(backend/app/channels/commands.py:11-20)。

Data Flow

下图以飞书消息为例,展示从平台事件到卡片增量更新的完整链路(Slack/Telegram 走 runs.wait 单次回复分支):

关键节点说明:

Implementation Details

内部鉴权 + CSRF 注入

channel worker 没有浏览器 session,但 Gateway 对状态变更请求(创建 thread / run)要求鉴权 + CSRF 双提交。ChannelManager 在懒初始化 langgraph-sdk client 时,同时注入三样东西:

python
def _get_client(self):
    if self._client is None:
        from langgraph_sdk import get_client
        self._client = get_client(
            url=self._langgraph_url,
            headers={
                **create_internal_auth_headers(),               # X-DeerFlow-Internal-Token
                CSRF_HEADER_NAME: self._csrf_token,              # X-CSRF-Token
                "Cookie": f"{CSRF_COOKIE_NAME}={self._csrf_token}",  # csrf_token cookie
            },
        )
    return self._client

来源:backend/app/channels/manager.py:636-649

解读:

飞书卡片增量 patch

飞书流式的核心是「一张卡片原地刷新」:_send_card_messagethread_ts(源消息 id)做 key 查 _running_card_ids,有 running 卡片就 _update_card patch 同一张;非终帧 patch 失败则继续抛出(等下一帧重试),终帧 patch 失败才降级为新回复;终帧时弹出缓存并加 DONE 表情(backend/app/channels/feishu.py:511-552)。卡片 JSON 必须设 config.update_multi=true 才允许飞书 patch API 原地更新(backend/app/channels/feishu.py:406-410)。

速查表

平台 transport / 流式方式 / 难度矩阵

平台transport流式方式(manager)supports_streaming难度Source
feishulark-oapi WebSocket 长连接runs.stream(卡片增量 patch)True高(独立线程 + loop patch + 卡片)feishu.py:28-69manager.py:40-48
slackslack-sdk Socket Mode(WebSocket)runs.wait(单次回复)Falseslack.py:35-96manager.py:44
telegrambot pollingruns.waitFalse⚠️本次未深入,建议补读 backend/app/channels/telegram.py;能力声明见 manager.py:45
dingtalkclient_id/secret 长连接runs.wait(可选 AI Card 流式)False中高⚠️本次未深入,建议补读 backend/app/channels/dingtalk.py;能力声明见 manager.py:41
wecom(企业微信)HTTPruns.streamTrue⚠️本次未深入,建议补读 backend/app/channels/wecom.py;能力声明见 manager.py:47
wechat(微信)bot_tokenruns.waitFalse⚠️本次未深入,建议补读 backend/app/channels/wechat.py;能力声明见 manager.py:46
discordbot_tokenruns.waitFalse⚠️本次未深入,建议补读 backend/app/channels/discord.py;能力声明见 manager.py:42

注:_channel_supports_streaming 优先取运行中 channel 实例的 supports_streaming,否则回退到 CHANNEL_CAPABILITIES 静态表(backend/app/channels/manager.py:575-584)。

命令清单

KNOWN_CHANNEL_COMMANDS = /bootstrap/new/status/models/memory/help(backend/app/channels/commands.py:11-20)。

命令行为处理位置
/bootstrap [text]转成 CHAT 消息并附带 is_bootstrap=True context,进入 agent 引导建立流程manager.py:951-957
/newthreads.create() 新建 thread 并写 store,重置会话manager.py:959-971
/status返回当前 thread_id 或「无活跃会话」manager.py:972-974
/modelsGET {gateway_url}/api/models 列出模型manager.py:975-976
/memoryGET {gateway_url}/api/memory 返回 fact 数manager.py:977-978
/help返回命令帮助文本manager.py:979-988

扩展指南

要新增一个平台,继承 Channel 实现三个抽象方法即可。约束清单从 base.Channel 抽象读出:

  • 必须实现:start()(监听平台事件并 bus.subscribe_outbound(self._on_outbound))、stop()(优雅停止并 bus.unsubscribe_outbound)、send(msg)(把文本发回平台)。
  • 可选覆写:supports_streaming(返回 True 让 manager 走 runs.stream)、send_file(msg, attachment)(返回 bool,默认 False 即不支持附件上传)、receive_file(msg, thread_id)(下载入站附件并改写 msg.text 为沙箱路径,默认原样返回)。
  • 入站封装:用 self._make_inbound(...) 构造 InboundMessage,设置 topic_id 决定续聊语义,再 await self.bus.publish_inbound(inbound)
  • 出站过滤:不要自己实现 outbound 分发,基类 _on_outbound 已按 msg.channel_name == self.name 过滤并自动处理文本 + 附件顺序(backend/app/channels/base.py:91-112)。
  • 注册:在 service._CHANNEL_REGISTRY"myim": "app.channels.myim:MyIMChannel",在 _CHANNEL_CREDENTIAL_KEYS 声明凭据键(backend/app/channels/service.py:20-39)。
  • 构造约定:_start_channelchannel_cls(bus=self.bus, config=config) 实例化,并向 config 注入 channel_store 键(backend/app/channels/service.py:169-180)。

最小模板:

python
from app.channels.base import Channel
from app.channels.message_bus import MessageBus, OutboundMessage

class MyIMChannel(Channel):
    def __init__(self, bus: MessageBus, config: dict) -> None:
        super().__init__(name="myim", bus=bus, config=config)

    async def start(self) -> None:
        if self._running:
            return
        self._running = True
        self.bus.subscribe_outbound(self._on_outbound)
        # 启动平台监听;收到消息时:
        #   inbound = self._make_inbound(chat_id=..., user_id=..., text=...,
        #                                thread_ts=..., )
        #   inbound.topic_id = <会话标识或 None>
        #   await self.bus.publish_inbound(inbound)

    async def stop(self) -> None:
        self._running = False
        self.bus.unsubscribe_outbound(self._on_outbound)

    async def send(self, msg: OutboundMessage) -> None:
        # 用 msg.chat_id / msg.thread_ts 把 msg.text 发回平台
        ...

模板字段依据:backend/app/channels/base.py:24-89

Configuration

配置位于 config.yamlchannels 段(config.example.yaml:932-983):

说明默认Source
langgraph_urlLangGraph 兼容 Gateway API base URL,用于 thread/runhttp://localhost:8001/apimanager.py:26config.example.yaml:937
gateway_url辅助命令(/models /memory)的 Gateway API URLhttp://localhost:8001manager.py:27config.example.yaml:939
session全通道默认 assistant_id / config / contextconfig.example.yaml:949-956
feishu.app_id / app_secret🔐 飞书应用凭据(token,建议用 $FEISHU_APP_ID 环境变量引用)config.example.yaml:958-963
slack.bot_token / app_token🔐 Slack xoxb- / xapp- token(敏感,用 $SLACK_BOT_TOKEN)config.example.yaml:965-969
telegram.bot_token🔐 Telegram bot token(敏感)config.example.yaml:971-974
dingtalk.client_id / client_secret / card_template_id🔐 钉钉凭据 + 可选 AI Card 流式模板config.example.yaml:1028-1031

环境变量覆盖:DEER_FLOW_CHANNELS_LANGGRAPH_URL / DEER_FLOW_CHANNELS_GATEWAY_URL 优先级高于 config.yaml 缺省值(backend/app/channels/service.py:41-52)。

安全标注:所有 app_secret / bot_token / app_token / client_secret / card_template_id 均为敏感凭据,config.example.yaml 一律用 $ENV_VAR 形式引用,切勿把明文 token 提交到仓库。

Common Pitfalls/Tips

  • Docker 内 localhost ≠ 宿主机:IM channels 跑在 gateway 容器内时,localhost 指向容器自身。Docker Compose 下必须把 langgraph_url 设为 http://gateway:8001/apigateway_url 设为 http://gateway:8001(容器 DNS 名),或用 DEER_FLOW_CHANNELS_* 环境变量(config.example.yaml:940-946)。
  • 配了凭据但忘了 enabled: true:service.start() 会专门 warning 提示,看不到 channel 启动时先查日志这条(backend/app/channels/service.py:106-117)。
  • 飞书 SDK 与 uvloop 冲突:lark-oapi 在 import 时捕获模块级 event loop,uvicorn 用 uvloop 时该 loop 是主线程已运行的 uvloop,Client.start()run_until_complete()RuntimeError_run_ws 专门起独立线程 + 新 event loop 并打补丁 _ws_client_mod.loop 规避(backend/app/channels/feishu.py:139-174)。
  • 附件只能来自 outputs 目录:agent 产物想发回 IM,虚拟路径必须以 /mnt/user-data/outputs/ 开头,否则被 _resolve_attachments 拒绝并打 warning(防外泄上传文件 / workspace)(backend/app/channels/manager.py:372-385)。
  • store 是单 JSON 文件:高并发场景下注释明确建议换正式数据库后端;损坏时 _load 容错为「重新开始」而非崩溃(backend/app/channels/store.py:31-54)。
  • topic_id 决定续聊还是一次性:飞书用 root_id or msg_id 当 topic(回复同 thread 续聊,新消息开新 topic);Slack 用 thread_tstopic_id=None 时每条消息都新建 thread(backend/app/channels/feishu.py:676-688backend/app/channels/slack.py:247-257)。

References

页面关系
13-Gateway-API与路由体系.mdchannels 通过 langgraph-sdk 调用本页所述的 Gateway LangGraph API 与 /api/models /api/memory
14-鉴权-CSRF与授权.md本章的 internal token + CSRF 注入正是该章鉴权/CSRF 机制的「内部调用方」用例
15-Runtime运行时与StreamBridge.mdruns.wait / runs.stream 背后是该章的 Runtime 与 StreamBridge
16-持久化与存储层.mdChannelStore 的 JSON 原子写是该章持久化策略在 IM 域的具体落地

公司内部参考 · 由 claude-wiki-gen 基于源码自动生成的二次分析