主题
会议洞察引擎
本章目标:
- 看懂跨会议聚合的 BFF endpoint(
/projects/{id}/insights/aggregate)如何用 L1/L2 双层 cache 把"30 秒 + 多次 LLM"压成"50ms 命中"或"只重算聚合层"- 弄清 acdm-backend
insights_service与 deer-flowmeeting_aggregator_realtime9 节点 pipeline 之间"BFF 准备数据 → harness 纯算 → BFF 写回 cache"的契约- 掌握 SSE 流式聚合 + 断线 reattach(create + join_stream +
cancel_on_disconnect=False)的设计,知道为什么切页/刷新不会丢结果
TL;DR
会议洞察引擎回答"用户勾 N 场会议,想看跨会议聚合出的实体/事实/决策/时间轴/冲突树(InsightTree)"。它由两层组成:acdm-backend 的 BFF(insights_service.py + insights_router.py)和 deer-flow 的 9 节点 LangGraph pipeline(meeting_aggregator_realtime)。BFF 用两层 cache 抗成本:L1(meeting_insight_cache,整棵 InsightTree,cache_key = sha256(sorted atom_ids + ALGO_VERSION + project_id))命中直接 200 JSON;L2(meeting_atom_cache,单会议 atom,PK = (project_id, meeting_id, extract_version))命中让 extract 节点 skip LLM。L1 miss 走 SSE:BFF 先下载 NC 文件 + markitdown 转 markdown 塞 meetings_input、预查 L2 塞 l2_cache_hits,再 runs.create + join_stream 转发 9 节点进度事件,pipeline 跑完 BFF 把 InsightTree 写 L1、新抽 atom 写 L2。SSE 用 cancel_on_disconnect=False + stream_resumable,客户端断开后 server 仍跑完,前端凭 sessionStorage 的 run_id 调 attach_stream 续看。
Overview(为什么要"双层 cache + BFF/harness 分层")
用户在工作台勾 5 场会议点"聚合",期望看到跨会议的实体合并、决策演进时间轴、事实冲突标记。直觉做法:每次请求都把这 5 场会议的纪要喂给 LLM 抽原子(atom),再跑关联/归并算法生成 InsightTree。这条直觉路径有三个致命问题:
- 慢且贵:每场会议一次 LLM 抽取(
kimi-k2.6+ thinking,10KB 输入约 90-180 秒),5 场串行就是 7-15 分钟,且每次都重抽——哪怕纪要一个字没改。 - 同一组合反复算:用户切 tab 又切回来、刷新页面,同样的 atom_ids 组合被反复跑整条 pipeline。
- harness 不能碰业务 DB:deer-flow 的 harness 层有铁律——
test_harness_boundary.py强制deerflow.*不许import app.*,所以 pipeline 里既不能开 SQLAlchemy session,也不能拿 Nextcloud 凭据下载文件。
答案是双层 cache + BFF/harness 职责切割:
- L1 cache(
meeting_insight_cache):缓存整棵 InsightTree。同一组 atom_ids(排序无关)+ 同 ALGO_VERSION + 同 project 命中 → 0 次 LLM,直接 200 JSON,p95 目标 < 50ms。 - L2 cache(
meeting_atom_cache):缓存单会议抽取出的 atom。L1 miss 但某些会议的 atom 没变(file_mtime 没动)→ 这些会议 skip LLM,只重跑后 6 个纯算节点(从 30s+ 降到 5-10s)。 - BFF 干脏活:acdm-backend 的
insights_service负责 NC 下载、markitdown 转换、L1/L2 读写、调 LangGraph;pipeline 只做纯计算,通过 state 字段拿输入(meetings_input/l2_cache_hits)、上报输出(insight_tree/new_atoms_to_cache),一行 DB / 一个 NC 请求都不碰。这正好复用了weekly_report验证过的 inline-contents 跨容器范式。
Architecture
引擎横跨两个容器:acdm-backend(8002,BFF + cache)和 deer-flow-langgraph(2024,Aegra runtime 跑 pipeline)。BFF 经 docker network 直调 LangGraph(绕 Caddy 公网 forward_auth,用 X-Auth-User-* header 注入身份)。
| 组件 | 职责 | 入口文件 | Source |
|---|---|---|---|
insights_router | BFF endpoint:校权 + 入参校验 + L1 命中走 JSON / miss 走 SSE | acdm-backend/app/api/insights_router.py:36 | acdm-backend/app/api/insights_router.py:55-379 |
insights_service | cache_key 计算、L1/L2 读写、pre/post-pipeline、SSE 转发、reattach/cancel | acdm-backend/app/services/insights_service.py:1 | acdm-backend/app/services/insights_service.py:68-1026 |
langgraph_client | LangGraph SDK 单例 + Aegra Auth header + join_stream/cancel 封装 | acdm-backend/app/services/langgraph_client.py:84 | acdm-backend/app/services/langgraph_client.py:84-453 |
meeting_aggregator_realtime | 9 节点 StateGraph 工厂(3 前置抽取 + 6 复用聚合) | deer-flow/.../meeting_aggregator_realtime/pipeline.py:55 | deer-flow/backend/packages/harness/deerflow/pipelines/meeting_aggregator_realtime/pipeline.py:55-121 |
meeting_aggregator(被复用) | 后 6 节点实现(load_atoms/relate/reduce/timeline/conflicts/finalize) | deer-flow/.../meeting_aggregator/nodes.py:1 | deer-flow/backend/packages/harness/deerflow/pipelines/meeting_aggregator/nodes.py:29-311 |
ai_extract_worker | 异步 L2 backfill:文件上传后 fire-and-forget 单会议抽取 | acdm-backend/app/services/ai_extract_worker.py:66 | acdm-backend/app/services/ai_extract_worker.py:66-201 |
MeetingInsightCache / MeetingAtomCache | L1 / L2 cache 表 ORM | acdm-backend/app/model/models.py:668 | acdm-backend/app/model/models.py:668-765 |
Components / Subsystems
insights_router(BFF endpoint)
职责:HTTP 入口,根据 cache 状态动态切 Content-Type。
挂载前缀 /projects(acdm-backend/app/main.py:414-419),对外经 Caddy 走 /api/acdm/*。主 endpoint POST /{project_id}/insights/aggregate(acdm-backend/app/api/insights_router.py:55)依赖 require_project_access 校成员权(双层授权见第 20 章),get_current_user 拿 SSO 身份。
关键分支逻辑(acdm-backend/app/api/insights_router.py:74-122):
validate_atom_idsfail-fast——任一 meeting 不属本 project 或既无 minutes 又无 transcript 文件 → 422(acdm-backend/app/api/insights_router.py:75-80)。force=false时lookup_l1_cache,命中 → 直接返ApiResponse[AggregateResponse](application/json),含stale标志(acdm-backend/app/api/insights_router.py:89-101)。force=true或 L1 miss → 返StreamingResponse(text/event-stream),并设X-Accel-Buffering: no防代理缓冲(acdm-backend/app/api/insights_router.py:104-122)。
辅助 endpoint 见下方速查表。_map_business_exc(acdm-backend/app/api/insights_router.py:39-52)把 BusinessException code 翻 HTTP status:42201/42202→422,40301/40302→403,50104→410。
insights_service(BFF 核心)
职责:cache_key 计算、L1/L2 读写、pre/post-pipeline、SSE 转发、reattach/cancel。
关键函数:
compute_l1_cache_key(acdm-backend/app/services/insights_service.py:68):sha256("|".join(sorted(atom_ids)) + "::" + algo_version + "::" + project_id)。sorted让勾选顺序无关命中;project_id进 key 做硬隔离 + 防 hash 碰撞。lookup_l1_cache(acdm-backend/app/services/insights_service.py:164):一次取 cache row,再 JOINmeeting_files取 MAX(uploaded_at) + JOINmeeting_atom_cache(status='ok')取 MAX(computed_at),取二者最大值与computed_at比 → 检出 stale 即服务端UPDATE stale=TRUE(幂等)并返stale=true,仍返旧 payload(由用户决定何时重生成,不强制阻塞)。write_l1_cache(acdm-backend/app/services/insights_service.py:253):INSERT ... ON CONFLICT(cache_key) DO UPDATE,force 重生成时覆盖旧 row + 重置stale=False+ 刷computed_at。validate_atom_ids(acdm-backend/app/services/insights_service.py:90):一条 SQL JOINmeetings+calendars+meeting_files(category IN minutes/transcript),分出invalid_ids(不存在/跨 project)与missing_minutes_ids(没源文件),分别抛 42201/42202。
meeting_aggregator_realtime(9 节点 pipeline)
职责:N 场会议 → InsightTree 的纯计算 DAG。
DAG 线性串(deer-flow/.../meeting_aggregator_realtime/pipeline.py:110-119):
fetch_files → convert_md → extract_atoms → load_atoms → relate
→ reduce_per_kind → build_timeline → mark_conflicts → finalize → END前 3 个节点(本 change 新增)+ 后 6 个节点(import 复用 meeting_aggregator,非 fork):
| 节点 | 职责 | 来源 | Source |
|---|---|---|---|
fetch_files | 把 meetings_input[i].inline_minutes/inline_transcript 装进 raw_text_per_meeting | 新增 | deer-flow/.../meeting_aggregator_realtime/nodes/fetch.py:27-61 |
convert_md | 逐场逐类规范化 markdown(压 3+ 连续空行) | 新增 | deer-flow/.../meeting_aggregator_realtime/nodes/convert.py:34-42 |
extract_atoms | L2 命中 skip / miss 跑 LLM 抽 4 类 atom + 组装 AtomBatch | 新增 | deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:345-445 |
load_atoms | closure 实例化 InlineAtomSource(state.extracted_atoms) + 入口校权 | 复用 | deer-flow/.../meeting_aggregator/nodes.py:29-74 |
relate | 算 atom-pair relevance(0.6 entity + 0.2 time + 0.2 embedding,> 0.4 进 candidate) | 复用 | deer-flow/.../meeting_aggregator/nodes.py:102-126 |
reduce_per_kind | 按 kind 跑 6 类聚合 + entity fingerprint 同义判别 + risk 升级 | 复用 | deer-flow/.../meeting_aggregator/nodes.py:129-152 |
build_timeline | 把 evolution_chain 跨 kind 拍平按时间排序 | 复用 | deer-flow/.../meeting_aggregator/nodes.py:155-200 |
mark_conflicts | 顶层汇总 fact 冲突(列 versions 不仲裁,诚实呈现) | 复用 | deer-flow/.../meeting_aggregator/nodes.py:203-242 |
finalize | 组装完整 InsightTree(scope/atoms/aggregated/timeline/conflicts/stats) | 复用 | deer-flow/.../meeting_aggregator/nodes.py:245-311 |
依赖注入巧妙处:make_pipeline 时还不知道有哪些 atom,所以 load_atoms_node 写成 closure——run 时才读 state.extracted_atoms 实例化 InlineAtomSource(deer-flow/.../meeting_aggregator_realtime/pipeline.py:75-89)。project_auth 默认 InMemoryProjectAuthService(allow_all=True)(真权限校验已在 BFF require_project_access 前置,这里只作日志兜底);embedding_service 默认 ZeroEmbeddingService(真 embedding 是 follow-up)。
extract_atoms 节点(L2 cache hit skip 的关键)
职责:逐场会议决策"用 cache 还是跑 LLM",并组装 AtomBatch。
核心判别(deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:388-398):
python
# 摘自 nodes/extract.py:390-398
cache_hit = state.l2_cache_hits.get(mid)
if cache_hit and cache_hit.file_mtime >= meeting_input.file_uploaded_at:
# L2 cache 命中,跳过 LLM
extracted_payloads[mid] = dict(cache_hit.payload)
continue
# cache miss / file_mtime 落后 → LLM 抽取注意比较是 >=:cache 记录的 file_mtime ≥ 本次输入文件最新 uploaded_at 才算"cache 不旧"。LLM 调用 _do_one_llm_call(deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:101)用 streaming + response_format={"type":"json_object"}——流式是为绕开 jointpilot 公司网关 5min 硬超时墙(kimi + thinking 处理 10KB+ 输入常 4-6 min,非流式必撞墙)。_strip_json_codefence(deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:70-92)做 JSON 抠取加固。单场失败 fail-fast 抛 BusinessException(50002)(deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:266-271),而非 partial-success——实时用户触发场景下静默 partial 体验差。
ai_extract_worker(L2 异步 backfill 旁路)
职责:文件上传后 fire-and-forget,提前把单会议 atom 算进 L2,让用户真点聚合时 L2 已命中。
schedule_extract(acdm-backend/app/services/ai_extract_worker.py:66)由 SQLAlchemy after_commit listener 调,loop.create_task(_safe_extract(mid)) 不阻塞文件上传 API。extract_for_meeting(acdm-backend/app/services/ai_extract_worker.py:93)先标 status='pending'(前端 sidebar 立刻显示 ⏳),NC 下载 + markitdown + LLM 抽取,成功写 status='ok'、失败写 status='failed'+error_message。recover_pending_on_startup(acdm-backend/app/services/ai_extract_worker.py:204)在 lifespan startup 扫 pending 且 started_at < now()-10min 的卡死行重 enqueue。这条旁路与 pipeline 共享 L2 表,write_l2_cache_rows 的 ON CONFLICT 会用 pipeline 结果覆盖 worker 的 pending/failed(acdm-backend/app/services/insights_service.py:560-568)。
Data Flow / 控制流
L1 miss → SSE 全链路
逐步说明(acdm-backend/app/services/insights_service.py:593-697):
compute_l1_cache_key算 key。_assemble_meetings_input:JOIN 取每个 (meeting_id, category) 最新文件 → NCdownload_to_bytes→ markitdown(.docx/.pdf/...)或 utf-8 fallback(.md/.txt/...)→ 装inline_minutes/inline_transcript(至少一个非空,MeetingInputItem.model_validator也兜底校验)。NC 失败抛 50303/50304/50305 fail-fast。lookup_l2_cache_hits:按 (project_id, meeting_id IN, extract_version) 取命中行,返{mid: {file_mtime, payload}}(本步不做 mtime 比对,真比对在 extract 节点)。client.threads.create+client.runs.create(assistant_id="meeting_aggregator_realtime",stream_mode=["updates","values"],stream_resumable=True),metadata 带owner_sub/project_id/atom_ids(reattach/校权用)。- 首 chunk yield
meta事件,前端写 sessionStorage 用于 reattach。 _stream_run_and_write_cache转发 join_stream:updates事件每个 node key → 一次progress(total=TOTAL_NODES=9);values事件存last_values;error事件 → SSE error 后 return。- 流结束取
last_values["insight_tree"],空则 SSEpipeline_no_result;否则write_l2_cache_rows+write_l1_cache,最后 yieldresult。
断线 reattach 状态机
为什么断开不丢结果:Aegra stream_run handler hardcode cancel_on_disconnect=False(acdm-backend/app/services/langgraph_client.py:434-437 注释记录的源码确认),客户端 SSE 关闭后 server-side run 不被 cancel,继续跑完写 cache。配合 stream_resumable=True create,前端拿 sessionStorage 里的 run_id+thread_id 调 POST /{id}/insights/runs/{run_id}/attach_stream,attach_stream_run(acdm-backend/app/services/insights_service.py:866)先 _verify_thread_owner 校 metadata.owner_sub == user_sub + project_id 一致(防越权 reattach 别人的 run),再 _stream_run_and_write_cache 续转发。result 时仍按 cache_key 写 L1/L2——INSERT ON CONFLICT 幂等,aggregate_stream 与 attach 同时赶上 result 写两次也无害。仅显式 cancel 走 /cancel endpoint(acdm-backend/app/api/insights_router.py:349),隐式断开绝不调。
Implementation Details
lazy stale 检测(L1 读时顺手做)
L1 cache 不靠主动失效,而是读时 lazy 检测。lookup_l1_cache 取 cache row 后,额外两条 SQL:
python
# 摘自 insights_service.py:222-238(简化)
effective_latest = max(
(m for m in (latest_mtime, latest_l2_mtime) if m is not None),
default=None,
)
if effective_latest is not None and effective_latest > computed_at and not stale_now:
await session.execute(
update(MeetingInsightCache)
.where(MeetingInsightCache.cache_key == cache_key)
.values(stale=True)
)
await session.commit()
stale_now = True两个 stale 触发源:① meeting_files 最新 uploaded_at(原始纪要被重传)② meeting_atom_cache(status='ok')最新 computed_at——异步 worker 或用户 regenerate 重抽了 atom,L1 也得感知(acdm-backend/app/services/insights_service.py:189-219)。检出后服务端标 stale=TRUE(幂等),但仍返旧 payload + stale=true:用户先看旧结果 + banner 提示,自己决定何时点"重新聚合",不强制阻塞。
force 并发保护(advisory lock)
acquire_force_lock(acdm-backend/app/services/insights_service.py:1010-1026)用 SELECT pg_advisory_xact_lock(hashtext(:cache_key)) 把同 cache_key 的并发 force 请求串行化——scope 在事务内,提交/回滚自动释放,第二个 force 请求等第一个跑完,避免两条 pipeline 同时算同一组合白烧 LLM。
双版本号分离(cache 失效粒度)
ALGO_VERSION 与 EXTRACT_VERSION 分离是有意设计(deer-flow/.../meeting_aggregator_realtime/version.py:1-22):
ALGO_VERSION(聚合层算法)进 L1 cache_key hash。bump → L1 全 invalidate(hash 变),L2 仍可用(extract 不重跑,只重算后 6 节点)。EXTRACT_VERSION(LLM extract prompt/parse)进 L2 PK 段。bump → L2 全 invalidate,L1 连带失效(atom 变 → InsightTree 重算)。- 当前
ALGO_VERSION=1.0.0、EXTRACT_VERSION=2.0.0。铁律:acdm-backendinsights_service.py的ALGO_VERSION(acdm-backend/app/services/insights_service.py:47)必须与 harnessversion.py同步 bump,否则 BFF 算的 cache_key 与 pipeline 写的对不上,cache 永远 miss。
速查表
Insights BFF endpoint
| 端点 | 方法 | 作用 | 返回 | Source |
|---|---|---|---|---|
/{project_id}/insights/aggregate | POST | L1 命中 JSON / miss 走 SSE 9 节点聚合 | JSON 或 SSE | acdm-backend/app/api/insights_router.py:55-122 |
/{project_id}/insights/cache | GET | 轻量探测 L1 状态(不跑 pipeline,p95<50ms),前端切按钮文案 | CachePeekResponse | acdm-backend/app/api/insights_router.py:125-167 |
/{project_id}/insights/runs/{run_id}/attach_stream | POST | reattach 进行中/刚完成的 run 的 SSE | SSE | acdm-backend/app/api/insights_router.py:170-207 |
/{project_id}/insights/runs/{run_id}/cancel | POST | 显式取消 run(用户点[取消生成]) | ApiResponse[bool] | acdm-backend/app/api/insights_router.py:349-379 |
/{project_id}/meetings/{mid}/atom-cache | GET | 读 L2 单会议 atom cache(ok/pending/failed 三态) | AtomCacheResponse | acdm-backend/app/api/insights_router.py:210-251 |
/{project_id}/meetings/{mid}/regenerate-ai-extract | POST | 强制重抽单会议(转 pending + 入队 worker) | RegenerateAiExtractResponse | acdm-backend/app/api/insights_router.py:254-346 |
SSE 事件矩阵
| 事件 | 时机 | data 关键字段 | Source |
|---|---|---|---|
meta | runs.create 后首 chunk(仅 aggregate,attach 无) | run_id/thread_id/started_at | acdm-backend/app/services/insights_service.py:678-682 |
progress | 每个节点 update 事件(共 9 次) | node/completed/total:9 | acdm-backend/app/services/insights_service.py:763-767 |
result | pipeline 完成写完 cache | payload(InsightTree)/stale:false/latest_file_mtime | acdm-backend/app/services/insights_service.py:808-812 |
error | 任一阶段失败(随后关流 fail-fast) | error(code)/message | acdm-backend/app/services/insights_service.py:699-722 |
BusinessException → HTTP / SSE error 映射
| code | 含义 | HTTP | SSE error | Source |
|---|---|---|---|---|
| 42201 | invalid_meeting_ids(不存在/跨 project) | 422 | business_42201 | acdm-backend/app/services/insights_service.py:145-150 |
| 42202 | missing_source_files(无 minutes 也无 transcript) | 422 | business_42202 | acdm-backend/app/services/insights_service.py:151-156 |
| 40301 | forbidden_run_owner(owner_sub 不匹配) | 403 | forbidden_run_owner | acdm-backend/app/services/insights_service.py:850-855 |
| 40302 | forbidden_project_mismatch | 403 | forbidden_project_mismatch | acdm-backend/app/services/insights_service.py:856-862 |
| 50002 | extract LLM 抽取失败(fail-fast) | 400 | pipeline_error | deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:266-271 |
| 50104 | thread/run 不存在(LangGraph 24h 清/重启) | 410 | run_not_found | acdm-backend/app/services/insights_service.py:840-847 |
| 50303/50304/50305 | NC 下载/解析失败 / 解析后均空 | 400 | business_503xx | acdm-backend/app/services/insights_service.py:375-440 |
Configuration
| Config | 默认值 | 含义 | 影响 | Source |
|---|---|---|---|---|
ALGO_VERSION | 1.0.0 | 聚合层算法版本,进 L1 cache_key | bump → L1 全失效,L2 仍用 | acdm-backend/app/services/insights_service.py:47 |
EXTRACT_VERSION | 2.0.0 | extract prompt/parse 版本,进 L2 PK | bump → L2 全失效 + L1 连带 | acdm-backend/app/services/insights_service.py:50 |
AGGREGATE_TIMEOUT_SECONDS | 900 | BFF pipeline 超时(15min) | 超时 yield pipeline_timeout error | acdm-backend/app/services/insights_service.py:53 |
TOTAL_NODES | 9 | SSE progress 事件 total 字段 | 前端进度条分母 | acdm-backend/app/services/insights_service.py:54 |
LANGGRAPH_STREAM_READ_TIMEOUT_SECONDS | 900 | LangGraph SDK httpx read timeout | 太小会硬切长 stream | acdm-backend/app/services/langgraph_client.py:54 |
LANGGRAPH_BASE_URL | http://deer-flow-langgraph:2024 | Aegra runtime 地址(docker network) | 改 runtime 落点 | acdm-backend/app/services/langgraph_client.py:43-45 |
model_name(extract 节点) | kimi-k2.6 | 抽取 LLM | 改抽取质量/成本 | deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:374 |
thinking_enabled(extract 节点) | True | 抽取开 thinking | 关闭质量塌方(实测) | deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:381 |
_STALE_PENDING_MIN | 10 | worker pending 卡死阈值(分钟) | startup recovery 重试窗口 | acdm-backend/app/services/ai_extract_worker.py:51 |
Common Pitfalls / 实战 Tips
- 两端
ALGO_VERSION必须同步:BFF 算 cache_key 用 acdm-backend 端常量,pipeline 写 L1 也用它;不一致 → cache 永远 miss,每次都重跑(deer-flow/.../meeting_aggregator_realtime/version.py:13-15明确警告)。 - stale 不会自动重算:lazy 检测出 stale 后只标记 + 返旧 payload + banner,用户不点"重新聚合"就一直用旧的(
acdm-backend/app/services/insights_service.py:226-245)。这是有意"诚实呈现"设计,不是 bug。 - L2 命中比较是
>=不是==:cache_hit.file_mtime >= meeting_input.file_uploaded_at才算 cache 不旧;文件没动则复用(deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:391)。 - 隐式断开 ≠ cancel:切页/刷新不调 cancel endpoint,server 跑完写 cache;只有用户点[取消生成]才真 cancel(
acdm-backend/app/api/insights_router.py:360-362)。误把断开当 cancel 会丢已花的 LLM 钱。 - harness 不碰 DB/NC:想在 pipeline 里加查库逻辑会撞
test_harness_boundary.py;新数据需求要么走state.meetings_input(BFF 准备),要么走state.new_atoms_to_cache(BFF 写回),deerflow.*永远不import app.*(deer-flow/.../meeting_aggregator_realtime/state.py:12-14)。 - 流式是为绕网关 5min 墙:extract 节点用
bound.stream(...)不是为了实时显示,是 kimi+thinking 处理大输入常 4-6min,非流式撞 jointpilot 网关 idle 超时(deer-flow/.../meeting_aggregator_realtime/nodes/extract.py:108-118)。
References
acdm-backend/app/services/insights_service.py:68-1026— BFF 核心:cache_key/L1/L2/SSE/reattach/cancel(本章主源)acdm-backend/app/api/insights_router.py:55-379— 6 个 BFF endpoint + 异常映射acdm-backend/app/services/langgraph_client.py:84-452— LangGraph SDK 单例 + Aegra Auth header + join_stream/cancelacdm-backend/app/services/ai_extract_worker.py:66-543— 文件上传后异步 L2 backfill workeracdm-backend/app/model/models.py:668-765— L1(meeting_insight_cache)/ L2(meeting_atom_cache)表 ORMdeer-flow/backend/packages/harness/deerflow/pipelines/meeting_aggregator_realtime/pipeline.py:55-121— 9 节点 StateGraph 工厂 + closure 注入deer-flow/backend/packages/harness/deerflow/pipelines/meeting_aggregator_realtime/nodes/extract.py:345-445— extract_atoms:L2 skip + LLM 抽取 + AtomBatch 组装deer-flow/backend/packages/harness/deerflow/pipelines/meeting_aggregator_realtime/state.py:26-166— MeetingInputItem / L2CacheHitItem / 9 节点共享 statedeer-flow/backend/packages/harness/deerflow/pipelines/meeting_aggregator/nodes.py:29-311— 复用的后 6 个聚合节点deer-flow/backend/packages/harness/deerflow/pipelines/meeting_aggregator_realtime/version.py:1-22— ALGO/EXTRACT 双版本号分离设计
Related Pages
| Page | Relationship |
|---|---|
| 会议域与AI原子抽取 | 本章 L2 cache 的 atom 来自该章抽取;extract_atoms 与 ai_extract_worker 共享 prompt/schema |
| Aegra运行时与LangGraph | 本章 pipeline 在该章 Aegra runtime 跑;cancel_on_disconnect=False 是其特性 |
| 控制面与引擎的耦合契约 | 本章 BFF/harness 分层、X-Auth-User-* header 注入是该章契约的具体实例 |
| 可逆DB操作与存储层 | 本章 L1/L2 cache 表 + Nextcloud 文件下载属该章存储层 |
| 项目管理与资源授权 | 本章 require_project_access + owner_sub 二次校权用该章授权层 |
| BA专家报告工作流 | 同属确定性 pipeline,本章 SSE/reattach 范式与 BA 报告 Hybrid 模式可对照 |
| 审计SSE与后台任务 | 本章 ai_extract_worker fire-and-forget + startup recovery 属该章后台任务模式 |