主题
IM 通道系统
本章目标:
- 讲清楚
app/channels为什么用langgraph-sdk走 Gateway 同一条路径,而不是直接调用内部 agent runtime。- 拆解
message_bus(异步 pub/sub)与manager(核心 dispatcher)如何把外部聊天平台的消息桥接到 DeerFlow thread。- 对比 Slack/Telegram 的
runs.wait一次性回复与 Feishu 的runs.stream卡片增量更新两种交付模式,并给出自定义 Channel 的最小实现模板。
TL;DR
app/channels 把飞书 / Slack / Telegram / 钉钉 / 企业微信 / 微信 / Discord 等外部 IM 平台桥接到 DeerFlow agent。每个平台实现一个 Channel 子类,把收到的消息封装成 InboundMessage 投入 MessageBus 的异步队列;ChannelManager 的单一 dispatch loop 消费队列,通过 langgraph-sdk HTTP client(注入进程内 internal auth + CSRF)在 Gateway 上创建 / 复用 thread,再用 runs.wait(非流式平台)或 runs.stream(飞书 / 企业微信)拿到 agent 输出,经 OutboundMessage 回调发回平台。ChannelStore 用单个 JSON 文件把 channel:chat[:topic] 持久化映射到 thread_id,实现多轮会话续聊。
Overview
DeerFlow 的 agent runtime 内嵌在 Gateway 进程里(端口 8001),Gateway 同时暴露一套 LangGraph 兼容的 HTTP API。IM 通道系统面临一个设计抉择:进程内已经能 import 到 agent,为什么还要绕一圈走 HTTP?
答案是「路径统一」。channels 与前端 Web UI 使用完全相同的 langgraph-sdk 客户端入口与 thread/run 语义——ChannelManager 在文件头就声明了这个意图(backend/app/channels/manager.py:1),client 通过 langgraph_sdk.get_client(url=...) 创建(backend/app/channels/manager.py:636-649)。这样做的收益:
- thread 由 Gateway 服务端统一管理:
client.threads.create()创建的 thread 与 Web UI 创建的 thread 走同一套持久化、checkpoint、清理逻辑,IM 会话和网页会话天然可互通同一个thread_id。 - 中间件链 / 鉴权 / 限流不重复实现:channels 不需要自己拼装 agent、加载工具、跑中间件,所有逻辑落在 Gateway 一侧。
- 解耦平台与 agent:
MessageBus在 channels 与 dispatcher 之间插入异步队列,平台收消息只管publish_inbound,dispatcher 不感知平台细节。
代价是引入了「同进程 HTTP 自调用」需要解决鉴权:Gateway 默认要求浏览器 session cookie + CSRF 双提交,而 channel worker 没有浏览器会话。系统用进程本地 internal token(X-DeerFlow-Internal-Token,进程启动时 secrets.token_urlsafe(32) 生成,backend/app/gateway/internal_auth.py:11)加一对自洽的 CSRF cookie/header 解决,详见「Implementation Details」。
Architecture
整个系统由两条解耦的数据通路构成,中间隔着 MessageBus:
- Inbound 通路(平台 → agent):
Channel.start()监听平台事件,在平台回调里把消息封装为InboundMessage,await bus.publish_inbound(msg)投入asyncio.Queue(backend/app/channels/message_bus.py:131-140)。ChannelManager._dispatch_loop()是唯一消费者,每条消息asyncio.create_task并发处理,受asyncio.Semaphore(max_concurrency=5)限流(backend/app/channels/manager.py:676-694)。 - Outbound 通路(agent → 平台):dispatcher 把 agent 输出封装为
OutboundMessage,bus.publish_outbound(msg)遍历所有已注册回调(backend/app/channels/message_bus.py:160-173)。每个Channel在start()里bus.subscribe_outbound(self._on_outbound),_on_outbound只处理msg.channel_name == self.name的消息,先发文本再传附件(backend/app/channels/base.py:91-112)。
Source 列表:
| 文件 | 角色 |
|---|---|
| message_bus.py | 异步 pub/sub 中枢,定义 InboundMessage / OutboundMessage / ResolvedAttachment |
| manager.py | 核心 dispatcher,thread 生命周期 + runs.wait/stream + 命令路由 |
| store.py | JSON 持久化 channel:chat[:topic] → thread_id |
| base.py | 抽象 Channel,定义平台实现契约 |
| service.py | 从 config.yaml 读取配置,按 registry 懒加载并管理生命周期 |
| commands.py | 单一权威命令集 KNOWN_CHANNEL_COMMANDS |
Components / Subsystems
message_bus —— 异步 pub/sub 中枢
职责:在 channels 与 dispatcher 之间插入无锁解耦层。inbound 用单个 asyncio.Queue 实现「多生产者(各平台)→ 单消费者(dispatch loop)」;outbound 用回调列表实现「单生产者(dispatcher)→ 多订阅者(各 channel)」。
关键类型与方法:
InboundMessage(dataclass,backend/app/channels/message_bus.py:29-58):携带channel_name、chat_id、user_id、text、msg_type、thread_ts、topic_id、files、metadata。topic_id是会话续聊的关键——同一chat_id内相同topic_id复用同一 DeerFlow thread;None时每条消息新建 thread(一次性问答)。OutboundMessage(dataclass,backend/app/channels/message_bus.py:82-107):带is_final标志区分流式中间帧与终帧,attachments是已解析为宿主路径的ResolvedAttachment列表。MessageBus.publish_outbound对每个回调单独 try/except,单个 channel 回调异常不影响其它(backend/app/channels/message_bus.py:169-173)。
manager —— 核心 dispatcher
职责:消费 inbound 队列,管理 thread 生命周期,调用 Gateway 的 LangGraph API,把结果回投 outbound。核心类 ChannelManager(backend/app/channels/manager.py:541-547)。
- thread 复用 / 新建:
_handle_chat先store.get_thread_id,无则_create_thread调client.threads.create()并写回 store(backend/app/channels/manager.py:730-757)。 - 流式 / 非流式分流:
_channel_supports_streaming判断后,流式走_handle_streaming_chat,否则走runs.wait(backend/app/channels/manager.py:778-806)。 - 并发冲突保护:
multitask_strategy="reject",ConflictError或含 "already running a task" 的异常被_is_thread_busy_error识别,回发「会话忙」提示(backend/app/channels/manager.py:119-124)。 - 安全防护:
_resolve_attachments只允许/mnt/user-data/outputs/前缀,并二次校验解析后路径仍在 outputs 目录内,防止路径穿越外泄上传文件 / workspace(backend/app/channels/manager.py:356-403)。
store —— JSON 会话映射持久化
职责:把 IM 会话与 DeerFlow thread 的映射落到单个 JSON 文件,实现网关重启后仍能续聊。核心类 ChannelStore(backend/app/channels/store.py:16-44)。
- key 规则:有
topic_id时channel:chat:topic,否则channel:chat(backend/app/channels/store.py:74-78)。 - 原子写:写临时文件后
Path.replace()替换,失败清理临时文件,并用threading.Lock保护写入(backend/app/channels/store.py:56-70)。 - 默认路径:
{base_dir}/channels/store.json(backend/app/channels/store.py:36-42)。
base —— 抽象 Channel
职责:定义所有平台实现的契约。核心类 Channel(ABC)(backend/app/channels/base.py:14-22)。抽象方法 start / stop / send 必须实现;supports_streaming 默认 False;send_file / receive_file 默认空实现可选覆写;_make_inbound 是构造 InboundMessage 的便捷工厂(backend/app/channels/base.py:68-89)。
service —— 生命周期管理
职责:从 config.yaml 的 channels 段读取配置,按 _CHANNEL_REGISTRY 用 resolve_class 懒加载已启用的 channel 类,启动 ChannelManager 与各 channel。核心类 ChannelService(backend/app/channels/service.py:55-94)。
- 配置了凭据但
enabled: false时,start()会打 warning 提醒用户去开启(backend/app/channels/service.py:106-117)。 - 单例:
get_channel_service/start_channel_service/stop_channel_service(backend/app/channels/service.py:209-232)。
commands —— 权威命令集
职责:KNOWN_CHANNEL_COMMANDS 单一来源,channel 解析器与 dispatcher 共用,新增 / 删除命令只改这一处(backend/app/channels/commands.py:11-20)。
Data Flow
下图以飞书消息为例,展示从平台事件到卡片增量更新的完整链路(Slack/Telegram 走 runs.wait 单次回复分支):
关键节点说明:
- 飞书在
_prepare_inbound里先异步加表情、起 running 卡片,再publish_inbound,不阻塞入队(backend/app/channels/feishu.py:578-583)。 - 流式分支用
STREAM_UPDATE_MIN_INTERVAL_SECONDS = 0.35节流,避免过于频繁地 patch 卡片(backend/app/channels/manager.py:37、backend/app/channels/manager.py:880-899)。 - Slack 在
_handle_message_event投递前,SDK 线程里同步加 eyes 表情并发 "Working on it..." 占位回复(backend/app/channels/slack.py:259-264)。
Implementation Details
内部鉴权 + CSRF 注入
channel worker 没有浏览器 session,但 Gateway 对状态变更请求(创建 thread / run)要求鉴权 + CSRF 双提交。ChannelManager 在懒初始化 langgraph-sdk client 时,同时注入三样东西:
python
def _get_client(self):
if self._client is None:
from langgraph_sdk import get_client
self._client = get_client(
url=self._langgraph_url,
headers={
**create_internal_auth_headers(), # X-DeerFlow-Internal-Token
CSRF_HEADER_NAME: self._csrf_token, # X-CSRF-Token
"Cookie": f"{CSRF_COOKIE_NAME}={self._csrf_token}", # csrf_token cookie
},
)
return self._client解读:
self._csrf_token = generate_csrf_token()在ChannelManager.__init__生成一次(backend/app/channels/manager.py:570)。同一个 token 同时放进 cookie 与 header,天然满足CSRFMiddleware的 double-submit 校验(secrets.compare_digest(cookie_token, header_token),backend/app/gateway/csrf_middleware.py:199-203)——不依赖浏览器会话。X-DeerFlow-Internal-Token由auth_middleware校验:is_valid_internal_auth_token通过则注入合成内部用户(id=DEFAULT_USER_ID、system_role="internal"),跳过 access_token cookie 要求(backend/app/gateway/auth_middleware.py:80-84、backend/app/gateway/internal_auth.py:24-26)。token 是进程启动时随机生成的,只有同进程调用方能持有。
飞书卡片增量 patch
飞书流式的核心是「一张卡片原地刷新」:_send_card_message 用 thread_ts(源消息 id)做 key 查 _running_card_ids,有 running 卡片就 _update_card patch 同一张;非终帧 patch 失败则继续抛出(等下一帧重试),终帧 patch 失败才降级为新回复;终帧时弹出缓存并加 DONE 表情(backend/app/channels/feishu.py:511-552)。卡片 JSON 必须设 config.update_multi=true 才允许飞书 patch API 原地更新(backend/app/channels/feishu.py:406-410)。
速查表
平台 transport / 流式方式 / 难度矩阵
| 平台 | transport | 流式方式(manager) | supports_streaming | 难度 | Source |
|---|---|---|---|---|---|
| feishu | lark-oapi WebSocket 长连接 | runs.stream(卡片增量 patch) | True | 高(独立线程 + loop patch + 卡片) | feishu.py:28-69、manager.py:40-48 |
| slack | slack-sdk Socket Mode(WebSocket) | runs.wait(单次回复) | False | 中 | slack.py:35-96、manager.py:44 |
| telegram | bot polling | runs.wait | False | 中 | ⚠️本次未深入,建议补读 backend/app/channels/telegram.py;能力声明见 manager.py:45 |
| dingtalk | client_id/secret 长连接 | runs.wait(可选 AI Card 流式) | False | 中高 | ⚠️本次未深入,建议补读 backend/app/channels/dingtalk.py;能力声明见 manager.py:41 |
| wecom(企业微信) | HTTP | runs.stream | True | 高 | ⚠️本次未深入,建议补读 backend/app/channels/wecom.py;能力声明见 manager.py:47 |
| wechat(微信) | bot_token | runs.wait | False | 高 | ⚠️本次未深入,建议补读 backend/app/channels/wechat.py;能力声明见 manager.py:46 |
| discord | bot_token | runs.wait | False | 中 | ⚠️本次未深入,建议补读 backend/app/channels/discord.py;能力声明见 manager.py:42 |
注:_channel_supports_streaming 优先取运行中 channel 实例的 supports_streaming,否则回退到 CHANNEL_CAPABILITIES 静态表(backend/app/channels/manager.py:575-584)。
命令清单
KNOWN_CHANNEL_COMMANDS = /bootstrap、/new、/status、/models、/memory、/help(backend/app/channels/commands.py:11-20)。
| 命令 | 行为 | 处理位置 |
|---|---|---|
/bootstrap [text] | 转成 CHAT 消息并附带 is_bootstrap=True context,进入 agent 引导建立流程 | manager.py:951-957 |
/new | threads.create() 新建 thread 并写 store,重置会话 | manager.py:959-971 |
/status | 返回当前 thread_id 或「无活跃会话」 | manager.py:972-974 |
/models | GET {gateway_url}/api/models 列出模型 | manager.py:975-976 |
/memory | GET {gateway_url}/api/memory 返回 fact 数 | manager.py:977-978 |
/help | 返回命令帮助文本 | manager.py:979-988 |
扩展指南
要新增一个平台,继承 Channel 实现三个抽象方法即可。约束清单从 base.Channel 抽象读出:
- 必须实现:
start()(监听平台事件并bus.subscribe_outbound(self._on_outbound))、stop()(优雅停止并bus.unsubscribe_outbound)、send(msg)(把文本发回平台)。 - 可选覆写:
supports_streaming(返回True让 manager 走runs.stream)、send_file(msg, attachment)(返回bool,默认False即不支持附件上传)、receive_file(msg, thread_id)(下载入站附件并改写msg.text为沙箱路径,默认原样返回)。 - 入站封装:用
self._make_inbound(...)构造InboundMessage,设置topic_id决定续聊语义,再await self.bus.publish_inbound(inbound)。 - 出站过滤:不要自己实现 outbound 分发,基类
_on_outbound已按msg.channel_name == self.name过滤并自动处理文本 + 附件顺序(backend/app/channels/base.py:91-112)。 - 注册:在
service._CHANNEL_REGISTRY加"myim": "app.channels.myim:MyIMChannel",在_CHANNEL_CREDENTIAL_KEYS声明凭据键(backend/app/channels/service.py:20-39)。 - 构造约定:
_start_channel以channel_cls(bus=self.bus, config=config)实例化,并向config注入channel_store键(backend/app/channels/service.py:169-180)。
最小模板:
python
from app.channels.base import Channel
from app.channels.message_bus import MessageBus, OutboundMessage
class MyIMChannel(Channel):
def __init__(self, bus: MessageBus, config: dict) -> None:
super().__init__(name="myim", bus=bus, config=config)
async def start(self) -> None:
if self._running:
return
self._running = True
self.bus.subscribe_outbound(self._on_outbound)
# 启动平台监听;收到消息时:
# inbound = self._make_inbound(chat_id=..., user_id=..., text=...,
# thread_ts=..., )
# inbound.topic_id = <会话标识或 None>
# await self.bus.publish_inbound(inbound)
async def stop(self) -> None:
self._running = False
self.bus.unsubscribe_outbound(self._on_outbound)
async def send(self, msg: OutboundMessage) -> None:
# 用 msg.chat_id / msg.thread_ts 把 msg.text 发回平台
...Configuration
配置位于 config.yaml 的 channels 段(config.example.yaml:932-983):
| 键 | 说明 | 默认 | Source |
|---|---|---|---|
langgraph_url | LangGraph 兼容 Gateway API base URL,用于 thread/run | http://localhost:8001/api | manager.py:26、config.example.yaml:937 |
gateway_url | 辅助命令(/models /memory)的 Gateway API URL | http://localhost:8001 | manager.py:27、config.example.yaml:939 |
session | 全通道默认 assistant_id / config / context | — | config.example.yaml:949-956 |
feishu.app_id / app_secret | 🔐 飞书应用凭据(token,建议用 $FEISHU_APP_ID 环境变量引用) | — | config.example.yaml:958-963 |
slack.bot_token / app_token | 🔐 Slack xoxb- / xapp- token(敏感,用 $SLACK_BOT_TOKEN) | — | config.example.yaml:965-969 |
telegram.bot_token | 🔐 Telegram bot token(敏感) | — | config.example.yaml:971-974 |
dingtalk.client_id / client_secret / card_template_id | 🔐 钉钉凭据 + 可选 AI Card 流式模板 | — | config.example.yaml:1028-1031 |
环境变量覆盖:DEER_FLOW_CHANNELS_LANGGRAPH_URL / DEER_FLOW_CHANNELS_GATEWAY_URL 优先级高于 config.yaml 缺省值(backend/app/channels/service.py:41-52)。
安全标注:所有 app_secret / bot_token / app_token / client_secret / card_template_id 均为敏感凭据,config.example.yaml 一律用 $ENV_VAR 形式引用,切勿把明文 token 提交到仓库。
Common Pitfalls/Tips
- Docker 内 localhost ≠ 宿主机:IM channels 跑在
gateway容器内时,localhost指向容器自身。Docker Compose 下必须把langgraph_url设为http://gateway:8001/api、gateway_url设为http://gateway:8001(容器 DNS 名),或用DEER_FLOW_CHANNELS_*环境变量(config.example.yaml:940-946)。 - 配了凭据但忘了
enabled: true:service.start()会专门 warning 提示,看不到 channel 启动时先查日志这条(backend/app/channels/service.py:106-117)。 - 飞书 SDK 与 uvloop 冲突:lark-oapi 在 import 时捕获模块级 event loop,uvicorn 用 uvloop 时该 loop 是主线程已运行的 uvloop,
Client.start()内run_until_complete()会RuntimeError。_run_ws专门起独立线程 + 新 event loop 并打补丁_ws_client_mod.loop规避(backend/app/channels/feishu.py:139-174)。 - 附件只能来自 outputs 目录:agent 产物想发回 IM,虚拟路径必须以
/mnt/user-data/outputs/开头,否则被_resolve_attachments拒绝并打 warning(防外泄上传文件 / workspace)(backend/app/channels/manager.py:372-385)。 - store 是单 JSON 文件:高并发场景下注释明确建议换正式数据库后端;损坏时
_load容错为「重新开始」而非崩溃(backend/app/channels/store.py:31-54)。 - topic_id 决定续聊还是一次性:飞书用
root_id or msg_id当 topic(回复同 thread 续聊,新消息开新 topic);Slack 用thread_ts。topic_id=None时每条消息都新建 thread(backend/app/channels/feishu.py:676-688、backend/app/channels/slack.py:247-257)。
References
- backend/app/channels/message_bus.py —— 异步 pub/sub 与消息类型定义
- backend/app/channels/manager.py —— 核心 dispatcher、runs.wait/stream、命令路由
- backend/app/channels/store.py —— 会话映射 JSON 持久化
- backend/app/channels/base.py —— 抽象 Channel 契约
- backend/app/channels/service.py —— 生命周期与配置加载
- backend/app/channels/feishu.py —— 飞书 WebSocket + 卡片增量 patch 实现
- backend/app/channels/slack.py —— Slack Socket Mode 实现
- backend/app/gateway/internal_auth.py —— 进程本地 internal token
- backend/app/gateway/csrf_middleware.py —— CSRF double-submit 校验
Related Pages
| 页面 | 关系 |
|---|---|
| 13-Gateway-API与路由体系.md | channels 通过 langgraph-sdk 调用本页所述的 Gateway LangGraph API 与 /api/models /api/memory |
| 14-鉴权-CSRF与授权.md | 本章的 internal token + CSRF 注入正是该章鉴权/CSRF 机制的「内部调用方」用例 |
| 15-Runtime运行时与StreamBridge.md | runs.wait / runs.stream 背后是该章的 Runtime 与 StreamBridge |
| 16-持久化与存储层.md | ChannelStore 的 JSON 原子写是该章持久化策略在 IM 域的具体落地 |