Skip to content

会议洞察引擎

本章目标:

  1. 看懂跨会议聚合的 BFF endpoint(/projects/{id}/insights/aggregate)如何用 L1/L2 双层 cache 把"30 秒 + 多次 LLM"压成"50ms 命中"或"只重算聚合层"
  2. 弄清 acdm-backend insights_service 与 deer-flow meeting_aggregator_realtime 9 节点 pipeline 之间"BFF 准备数据 → harness 纯算 → BFF 写回 cache"的契约
  3. 掌握 SSE 流式聚合 + 断线 reattach(create + join_stream + cancel_on_disconnect=False)的设计,知道为什么切页/刷新不会丢结果

TL;DR

会议洞察引擎回答"用户勾 N 场会议,想看跨会议聚合出的实体/事实/决策/时间轴/冲突树(InsightTree)"。它由两层组成:acdm-backend 的 BFF(insights_service.py + insights_router.py)和 deer-flow 的 9 节点 LangGraph pipeline(meeting_aggregator_realtime)。BFF 用两层 cache 抗成本:L1(meeting_insight_cache,整棵 InsightTree,cache_key = sha256(sorted atom_ids + ALGO_VERSION + project_id))命中直接 200 JSON;L2(meeting_atom_cache,单会议 atom,PK = (project_id, meeting_id, extract_version))命中让 extract 节点 skip LLM。L1 miss 走 SSE:BFF 先下载 NC 文件 + markitdown 转 markdown 塞 meetings_input、预查 L2 塞 l2_cache_hits,再 runs.create + join_stream 转发 9 节点进度事件,pipeline 跑完 BFF 把 InsightTree 写 L1、新抽 atom 写 L2。SSE 用 cancel_on_disconnect=False + stream_resumable,客户端断开后 server 仍跑完,前端凭 sessionStorage 的 run_id 调 attach_stream 续看。

Overview(为什么要"双层 cache + BFF/harness 分层")

用户在工作台勾 5 场会议点"聚合",期望看到跨会议的实体合并、决策演进时间轴、事实冲突标记。直觉做法:每次请求都把这 5 场会议的纪要喂给 LLM 抽原子(atom),再跑关联/归并算法生成 InsightTree。这条直觉路径有三个致命问题:

  1. 慢且贵:每场会议一次 LLM 抽取(kimi-k2.6 + thinking,10KB 输入约 90-180 秒),5 场串行就是 7-15 分钟,且每次都重抽——哪怕纪要一个字没改。
  2. 同一组合反复算:用户切 tab 又切回来、刷新页面,同样的 atom_ids 组合被反复跑整条 pipeline。
  3. harness 不能碰业务 DB:deer-flow 的 harness 层有铁律——test_harness_boundary.py 强制 deerflow.* 不许 import app.*,所以 pipeline 里既不能开 SQLAlchemy session,也不能拿 Nextcloud 凭据下载文件。

答案是双层 cache + BFF/harness 职责切割:

  • L1 cache(meeting_insight_cache):缓存整棵 InsightTree。同一组 atom_ids(排序无关)+ 同 ALGO_VERSION + 同 project 命中 → 0 次 LLM,直接 200 JSON,p95 目标 < 50ms。
  • L2 cache(meeting_atom_cache):缓存单会议抽取出的 atom。L1 miss 但某些会议的 atom 没变(file_mtime 没动)→ 这些会议 skip LLM,只重跑后 6 个纯算节点(从 30s+ 降到 5-10s)。
  • BFF 干脏活:acdm-backend 的 insights_service 负责 NC 下载、markitdown 转换、L1/L2 读写、调 LangGraph;pipeline 只做纯计算,通过 state 字段拿输入(meetings_input / l2_cache_hits)、上报输出(insight_tree / new_atoms_to_cache),一行 DB / 一个 NC 请求都不碰。这正好复用了 weekly_report 验证过的 inline-contents 跨容器范式。

Architecture

引擎横跨两个容器:acdm-backend(8002,BFF + cache)和 deer-flow-langgraph(2024,Aegra runtime 跑 pipeline)。BFF 经 docker network 直调 LangGraph(绕 Caddy 公网 forward_auth,用 X-Auth-User-* header 注入身份)。

组件职责入口文件Source
insights_routerBFF endpoint:校权 + 入参校验 + L1 命中走 JSON / miss 走 SSEacdm-backend/app/api/insights_router.py:36acdm-backend/app/api/insights_router.py:55-379
insights_servicecache_key 计算、L1/L2 读写、pre/post-pipeline、SSE 转发、reattach/cancelacdm-backend/app/services/insights_service.py:1acdm-backend/app/services/insights_service.py:68-1026
langgraph_clientLangGraph SDK 单例 + Aegra Auth header + join_stream/cancel 封装acdm-backend/app/services/langgraph_client.py:84acdm-backend/app/services/langgraph_client.py:84-453
meeting_aggregator_realtime9 节点 StateGraph 工厂(3 前置抽取 + 6 复用聚合)deer-flow/.../meeting_aggregator_realtime/pipeline.py:55deer-flow/backend/packages/harness/deerflow/pipelines/meeting_aggregator_realtime/pipeline.py:55-121
meeting_aggregator(被复用)后 6 节点实现(load_atoms/relate/reduce/timeline/conflicts/finalize)deer-flow/.../meeting_aggregator/nodes.py:1deer-flow/backend/packages/harness/deerflow/pipelines/meeting_aggregator/nodes.py:29-311
ai_extract_worker异步 L2 backfill:文件上传后 fire-and-forget 单会议抽取acdm-backend/app/services/ai_extract_worker.py:66acdm-backend/app/services/ai_extract_worker.py:66-201
MeetingInsightCache / MeetingAtomCacheL1 / L2 cache 表 ORMacdm-backend/app/model/models.py:668acdm-backend/app/model/models.py:668-765

Components / Subsystems

insights_router(BFF endpoint)

职责:HTTP 入口,根据 cache 状态动态切 Content-Type

挂载前缀 /projects(acdm-backend/app/main.py:414-419),对外经 Caddy 走 /api/acdm/*。主 endpoint POST /{project_id}/insights/aggregate(acdm-backend/app/api/insights_router.py:55)依赖 require_project_access 校成员权(双层授权见第 20 章),get_current_user 拿 SSO 身份。

关键分支逻辑(acdm-backend/app/api/insights_router.py:74-122):

  1. validate_atom_ids fail-fast——任一 meeting 不属本 project 或既无 minutes 又无 transcript 文件 → 422(acdm-backend/app/api/insights_router.py:75-80)。
  2. force=falselookup_l1_cache,命中 → 直接返 ApiResponse[AggregateResponse](application/json),含 stale 标志(acdm-backend/app/api/insights_router.py:89-101)。
  3. force=true 或 L1 miss → 返 StreamingResponse(text/event-stream),并设 X-Accel-Buffering: no 防代理缓冲(acdm-backend/app/api/insights_router.py:104-122)。

辅助 endpoint 见下方速查表。_map_business_exc(acdm-backend/app/api/insights_router.py:39-52)把 BusinessException code 翻 HTTP status:42201/42202→422,40301/40302→403,50104→410。

insights_service(BFF 核心)

职责:cache_key 计算、L1/L2 读写、pre/post-pipeline、SSE 转发、reattach/cancel。

关键函数:

  • compute_l1_cache_key(acdm-backend/app/services/insights_service.py:68):sha256("|".join(sorted(atom_ids)) + "::" + algo_version + "::" + project_id)sorted 让勾选顺序无关命中;project_id 进 key 做硬隔离 + 防 hash 碰撞。
  • lookup_l1_cache(acdm-backend/app/services/insights_service.py:164):一次取 cache row,再 JOIN meeting_files 取 MAX(uploaded_at) + JOIN meeting_atom_cache(status='ok')取 MAX(computed_at),取二者最大值与 computed_at 比 → 检出 stale 即服务端 UPDATE stale=TRUE(幂等)并返 stale=true,仍返旧 payload(由用户决定何时重生成,不强制阻塞)。
  • write_l1_cache(acdm-backend/app/services/insights_service.py:253):INSERT ... ON CONFLICT(cache_key) DO UPDATE,force 重生成时覆盖旧 row + 重置 stale=False + 刷 computed_at
  • validate_atom_ids(acdm-backend/app/services/insights_service.py:90):一条 SQL JOIN meetings+calendars+meeting_files(category IN minutes/transcript),分出 invalid_ids(不存在/跨 project)与 missing_minutes_ids(没源文件),分别抛 42201/42202。

meeting_aggregator_realtime(9 节点 pipeline)

职责:N 场会议 → InsightTree 的纯计算 DAG。

DAG 线性串(deer-flow/.../meeting_aggregator_realtime/pipeline.py:110-119):

fetch_files → convert_md → extract_atoms → load_atoms → relate
  → reduce_per_kind → build_timeline → mark_conflicts → finalize → END

前 3 个节点(本 change 新增)+ 后 6 个节点(import 复用 meeting_aggregator,非 fork):

节点职责来源Source
fetch_filesmeetings_input[i].inline_minutes/inline_transcript 装进 raw_text_per_meeting新增deer-flow/.../meeting_aggregator_realtime/nodes/fetch.py:27-61
convert_md逐场逐类规范化 markdown(压 3+ 连续空行)新增deer-flow/.../meeting_aggregator_realtime/nodes/convert.py:34-42
extract_atomsL2 命中 skip / miss 跑 LLM 抽 4 类 atom + 组装 AtomBatch新增deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:345-445
load_atomsclosure 实例化 InlineAtomSource(state.extracted_atoms) + 入口校权复用deer-flow/.../meeting_aggregator/nodes.py:29-74
relate算 atom-pair relevance(0.6 entity + 0.2 time + 0.2 embedding,> 0.4 进 candidate)复用deer-flow/.../meeting_aggregator/nodes.py:102-126
reduce_per_kind按 kind 跑 6 类聚合 + entity fingerprint 同义判别 + risk 升级复用deer-flow/.../meeting_aggregator/nodes.py:129-152
build_timeline把 evolution_chain 跨 kind 拍平按时间排序复用deer-flow/.../meeting_aggregator/nodes.py:155-200
mark_conflicts顶层汇总 fact 冲突(列 versions 不仲裁,诚实呈现)复用deer-flow/.../meeting_aggregator/nodes.py:203-242
finalize组装完整 InsightTree(scope/atoms/aggregated/timeline/conflicts/stats)复用deer-flow/.../meeting_aggregator/nodes.py:245-311

依赖注入巧妙处:make_pipeline 时还不知道有哪些 atom,所以 load_atoms_node 写成 closure——run 时才读 state.extracted_atoms 实例化 InlineAtomSource(deer-flow/.../meeting_aggregator_realtime/pipeline.py:75-89)。project_auth 默认 InMemoryProjectAuthService(allow_all=True)(真权限校验已在 BFF require_project_access 前置,这里只作日志兜底);embedding_service 默认 ZeroEmbeddingService(真 embedding 是 follow-up)。

extract_atoms 节点(L2 cache hit skip 的关键)

职责:逐场会议决策"用 cache 还是跑 LLM",并组装 AtomBatch。

核心判别(deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:388-398):

python
# 摘自 nodes/extract.py:390-398
cache_hit = state.l2_cache_hits.get(mid)
if cache_hit and cache_hit.file_mtime >= meeting_input.file_uploaded_at:
    # L2 cache 命中,跳过 LLM
    extracted_payloads[mid] = dict(cache_hit.payload)
    continue
# cache miss / file_mtime 落后 → LLM 抽取

注意比较是 >=:cache 记录的 file_mtime ≥ 本次输入文件最新 uploaded_at 才算"cache 不旧"。LLM 调用 _do_one_llm_call(deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:101)用 streaming + response_format={"type":"json_object"}——流式是为绕开 jointpilot 公司网关 5min 硬超时墙(kimi + thinking 处理 10KB+ 输入常 4-6 min,非流式必撞墙)。_strip_json_codefence(deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:70-92)做 JSON 抠取加固。单场失败 fail-fastBusinessException(50002)(deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:266-271),而非 partial-success——实时用户触发场景下静默 partial 体验差。

ai_extract_worker(L2 异步 backfill 旁路)

职责:文件上传后 fire-and-forget,提前把单会议 atom 算进 L2,让用户真点聚合时 L2 已命中。

schedule_extract(acdm-backend/app/services/ai_extract_worker.py:66)由 SQLAlchemy after_commit listener 调,loop.create_task(_safe_extract(mid)) 不阻塞文件上传 API。extract_for_meeting(acdm-backend/app/services/ai_extract_worker.py:93)先标 status='pending'(前端 sidebar 立刻显示 ⏳),NC 下载 + markitdown + LLM 抽取,成功写 status='ok'、失败写 status='failed'+error_messagerecover_pending_on_startup(acdm-backend/app/services/ai_extract_worker.py:204)在 lifespan startup 扫 pendingstarted_at < now()-10min 的卡死行重 enqueue。这条旁路与 pipeline 共享 L2 表,write_l2_cache_rows 的 ON CONFLICT 会用 pipeline 结果覆盖 worker 的 pending/failed(acdm-backend/app/services/insights_service.py:560-568)。

Data Flow / 控制流

L1 miss → SSE 全链路

逐步说明(acdm-backend/app/services/insights_service.py:593-697):

  1. compute_l1_cache_key 算 key。
  2. _assemble_meetings_input:JOIN 取每个 (meeting_id, category) 最新文件 → NC download_to_bytes → markitdown(.docx/.pdf/...)或 utf-8 fallback(.md/.txt/...)→ 装 inline_minutes/inline_transcript(至少一个非空,MeetingInputItem.model_validator 也兜底校验)。NC 失败抛 50303/50304/50305 fail-fast。
  3. lookup_l2_cache_hits:按 (project_id, meeting_id IN, extract_version) 取命中行,返 {mid: {file_mtime, payload}}(本步不做 mtime 比对,真比对在 extract 节点)。
  4. client.threads.create + client.runs.create(assistant_id="meeting_aggregator_realtime",stream_mode=["updates","values"],stream_resumable=True),metadata 带 owner_sub/project_id/atom_ids(reattach/校权用)。
  5. 首 chunk yield meta 事件,前端写 sessionStorage 用于 reattach。
  6. _stream_run_and_write_cache 转发 join_stream:updates 事件每个 node key → 一次 progress(total=TOTAL_NODES=9);values 事件存 last_values;error 事件 → SSE error 后 return。
  7. 流结束取 last_values["insight_tree"],空则 SSE pipeline_no_result;否则 write_l2_cache_rows + write_l1_cache,最后 yield result

断线 reattach 状态机

为什么断开不丢结果:Aegra stream_run handler hardcode cancel_on_disconnect=False(acdm-backend/app/services/langgraph_client.py:434-437 注释记录的源码确认),客户端 SSE 关闭后 server-side run 不被 cancel,继续跑完写 cache。配合 stream_resumable=True create,前端拿 sessionStorage 里的 run_id+thread_idPOST /{id}/insights/runs/{run_id}/attach_stream,attach_stream_run(acdm-backend/app/services/insights_service.py:866)先 _verify_thread_ownermetadata.owner_sub == user_sub + project_id 一致(防越权 reattach 别人的 run),再 _stream_run_and_write_cache 续转发。result 时仍按 cache_key 写 L1/L2——INSERT ON CONFLICT 幂等,aggregate_stream 与 attach 同时赶上 result 写两次也无害。仅显式 cancel/cancel endpoint(acdm-backend/app/api/insights_router.py:349),隐式断开绝不调。

Implementation Details

lazy stale 检测(L1 读时顺手做)

L1 cache 不靠主动失效,而是读时 lazy 检测lookup_l1_cache 取 cache row 后,额外两条 SQL:

python
# 摘自 insights_service.py:222-238(简化)
effective_latest = max(
    (m for m in (latest_mtime, latest_l2_mtime) if m is not None),
    default=None,
)
if effective_latest is not None and effective_latest > computed_at and not stale_now:
    await session.execute(
        update(MeetingInsightCache)
        .where(MeetingInsightCache.cache_key == cache_key)
        .values(stale=True)
    )
    await session.commit()
    stale_now = True

两个 stale 触发源:① meeting_files 最新 uploaded_at(原始纪要被重传)② meeting_atom_cache(status='ok')最新 computed_at——异步 worker 或用户 regenerate 重抽了 atom,L1 也得感知(acdm-backend/app/services/insights_service.py:189-219)。检出后服务端标 stale=TRUE(幂等),但仍返旧 payload + stale=true:用户先看旧结果 + banner 提示,自己决定何时点"重新聚合",不强制阻塞。

force 并发保护(advisory lock)

acquire_force_lock(acdm-backend/app/services/insights_service.py:1010-1026)用 SELECT pg_advisory_xact_lock(hashtext(:cache_key)) 把同 cache_key 的并发 force 请求串行化——scope 在事务内,提交/回滚自动释放,第二个 force 请求等第一个跑完,避免两条 pipeline 同时算同一组合白烧 LLM。

双版本号分离(cache 失效粒度)

ALGO_VERSIONEXTRACT_VERSION 分离是有意设计(deer-flow/.../meeting_aggregator_realtime/version.py:1-22):

  • ALGO_VERSION(聚合层算法)进 L1 cache_key hash。bump → L1 全 invalidate(hash 变),L2 仍可用(extract 不重跑,只重算后 6 节点)。
  • EXTRACT_VERSION(LLM extract prompt/parse)进 L2 PK 段。bump → L2 全 invalidate,L1 连带失效(atom 变 → InsightTree 重算)。
  • 当前 ALGO_VERSION=1.0.0EXTRACT_VERSION=2.0.0铁律:acdm-backend insights_service.pyALGO_VERSION(acdm-backend/app/services/insights_service.py:47)必须与 harness version.py 同步 bump,否则 BFF 算的 cache_key 与 pipeline 写的对不上,cache 永远 miss。

速查表

Insights BFF endpoint

端点方法作用返回Source
/{project_id}/insights/aggregatePOSTL1 命中 JSON / miss 走 SSE 9 节点聚合JSON 或 SSEacdm-backend/app/api/insights_router.py:55-122
/{project_id}/insights/cacheGET轻量探测 L1 状态(不跑 pipeline,p95<50ms),前端切按钮文案CachePeekResponseacdm-backend/app/api/insights_router.py:125-167
/{project_id}/insights/runs/{run_id}/attach_streamPOSTreattach 进行中/刚完成的 run 的 SSESSEacdm-backend/app/api/insights_router.py:170-207
/{project_id}/insights/runs/{run_id}/cancelPOST显式取消 run(用户点[取消生成])ApiResponse[bool]acdm-backend/app/api/insights_router.py:349-379
/{project_id}/meetings/{mid}/atom-cacheGET读 L2 单会议 atom cache(ok/pending/failed 三态)AtomCacheResponseacdm-backend/app/api/insights_router.py:210-251
/{project_id}/meetings/{mid}/regenerate-ai-extractPOST强制重抽单会议(转 pending + 入队 worker)RegenerateAiExtractResponseacdm-backend/app/api/insights_router.py:254-346

SSE 事件矩阵

事件时机data 关键字段Source
metaruns.create 后首 chunk(仅 aggregate,attach 无)run_id/thread_id/started_atacdm-backend/app/services/insights_service.py:678-682
progress每个节点 update 事件(共 9 次)node/completed/total:9acdm-backend/app/services/insights_service.py:763-767
resultpipeline 完成写完 cachepayload(InsightTree)/stale:false/latest_file_mtimeacdm-backend/app/services/insights_service.py:808-812
error任一阶段失败(随后关流 fail-fast)error(code)/messageacdm-backend/app/services/insights_service.py:699-722

BusinessException → HTTP / SSE error 映射

code含义HTTPSSE errorSource
42201invalid_meeting_ids(不存在/跨 project)422business_42201acdm-backend/app/services/insights_service.py:145-150
42202missing_source_files(无 minutes 也无 transcript)422business_42202acdm-backend/app/services/insights_service.py:151-156
40301forbidden_run_owner(owner_sub 不匹配)403forbidden_run_owneracdm-backend/app/services/insights_service.py:850-855
40302forbidden_project_mismatch403forbidden_project_mismatchacdm-backend/app/services/insights_service.py:856-862
50002extract LLM 抽取失败(fail-fast)400pipeline_errordeer-flow/.../meeting_aggregator_realtime/nodes/extract.py:266-271
50104thread/run 不存在(LangGraph 24h 清/重启)410run_not_foundacdm-backend/app/services/insights_service.py:840-847
50303/50304/50305NC 下载/解析失败 / 解析后均空400business_503xxacdm-backend/app/services/insights_service.py:375-440

Configuration

Config默认值含义影响Source
ALGO_VERSION1.0.0聚合层算法版本,进 L1 cache_keybump → L1 全失效,L2 仍用acdm-backend/app/services/insights_service.py:47
EXTRACT_VERSION2.0.0extract prompt/parse 版本,进 L2 PKbump → L2 全失效 + L1 连带acdm-backend/app/services/insights_service.py:50
AGGREGATE_TIMEOUT_SECONDS900BFF pipeline 超时(15min)超时 yield pipeline_timeout erroracdm-backend/app/services/insights_service.py:53
TOTAL_NODES9SSE progress 事件 total 字段前端进度条分母acdm-backend/app/services/insights_service.py:54
LANGGRAPH_STREAM_READ_TIMEOUT_SECONDS900LangGraph SDK httpx read timeout太小会硬切长 streamacdm-backend/app/services/langgraph_client.py:54
LANGGRAPH_BASE_URLhttp://deer-flow-langgraph:2024Aegra runtime 地址(docker network)改 runtime 落点acdm-backend/app/services/langgraph_client.py:43-45
model_name(extract 节点)kimi-k2.6抽取 LLM改抽取质量/成本deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:374
thinking_enabled(extract 节点)True抽取开 thinking关闭质量塌方(实测)deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:381
_STALE_PENDING_MIN10worker pending 卡死阈值(分钟)startup recovery 重试窗口acdm-backend/app/services/ai_extract_worker.py:51

Common Pitfalls / 实战 Tips

  • 两端 ALGO_VERSION 必须同步:BFF 算 cache_key 用 acdm-backend 端常量,pipeline 写 L1 也用它;不一致 → cache 永远 miss,每次都重跑(deer-flow/.../meeting_aggregator_realtime/version.py:13-15 明确警告)。
  • stale 不会自动重算:lazy 检测出 stale 后只标记 + 返旧 payload + banner,用户不点"重新聚合"就一直用旧的(acdm-backend/app/services/insights_service.py:226-245)。这是有意"诚实呈现"设计,不是 bug。
  • L2 命中比较是 >= 不是 ==:cache_hit.file_mtime >= meeting_input.file_uploaded_at 才算 cache 不旧;文件没动则复用(deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:391)。
  • 隐式断开 ≠ cancel:切页/刷新不调 cancel endpoint,server 跑完写 cache;只有用户点[取消生成]才真 cancel(acdm-backend/app/api/insights_router.py:360-362)。误把断开当 cancel 会丢已花的 LLM 钱。
  • harness 不碰 DB/NC:想在 pipeline 里加查库逻辑会撞 test_harness_boundary.py;新数据需求要么走 state.meetings_input(BFF 准备),要么走 state.new_atoms_to_cache(BFF 写回),deerflow.* 永远不 import app.*(deer-flow/.../meeting_aggregator_realtime/state.py:12-14)。
  • 流式是为绕网关 5min 墙:extract 节点用 bound.stream(...) 不是为了实时显示,是 kimi+thinking 处理大输入常 4-6min,非流式撞 jointpilot 网关 idle 超时(deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:108-118)。

References

  • acdm-backend/app/services/insights_service.py:68-1026 — BFF 核心:cache_key/L1/L2/SSE/reattach/cancel(本章主源)
  • acdm-backend/app/api/insights_router.py:55-379 — 6 个 BFF endpoint + 异常映射
  • acdm-backend/app/services/langgraph_client.py:84-452 — LangGraph SDK 单例 + Aegra Auth header + join_stream/cancel
  • acdm-backend/app/services/ai_extract_worker.py:66-543 — 文件上传后异步 L2 backfill worker
  • acdm-backend/app/model/models.py:668-765 — L1(meeting_insight_cache)/ L2(meeting_atom_cache)表 ORM
  • deer-flow/backend/packages/harness/deerflow/pipelines/meeting_aggregator_realtime/pipeline.py:55-121 — 9 节点 StateGraph 工厂 + closure 注入
  • deer-flow/backend/packages/harness/deerflow/pipelines/meeting_aggregator_realtime/nodes/extract.py:345-445 — extract_atoms:L2 skip + LLM 抽取 + AtomBatch 组装
  • deer-flow/backend/packages/harness/deerflow/pipelines/meeting_aggregator_realtime/state.py:26-166 — MeetingInputItem / L2CacheHitItem / 9 节点共享 state
  • deer-flow/backend/packages/harness/deerflow/pipelines/meeting_aggregator/nodes.py:29-311 — 复用的后 6 个聚合节点
  • deer-flow/backend/packages/harness/deerflow/pipelines/meeting_aggregator_realtime/version.py:1-22 — ALGO/EXTRACT 双版本号分离设计
PageRelationship
会议域与AI原子抽取本章 L2 cache 的 atom 来自该章抽取;extract_atoms 与 ai_extract_worker 共享 prompt/schema
Aegra运行时与LangGraph本章 pipeline 在该章 Aegra runtime 跑;cancel_on_disconnect=False 是其特性
控制面与引擎的耦合契约本章 BFF/harness 分层、X-Auth-User-* header 注入是该章契约的具体实例
可逆DB操作与存储层本章 L1/L2 cache 表 + Nextcloud 文件下载属该章存储层
项目管理与资源授权本章 require_project_access + owner_sub 二次校权用该章授权层
BA专家报告工作流同属确定性 pipeline,本章 SSE/reattach 范式与 BA 报告 Hybrid 模式可对照
审计SSE与后台任务本章 ai_extract_worker fire-and-forget + startup recovery 属该章后台任务模式

公司内部参考 · 由 claude-wiki-gen 基于源码自动生成的二次分析