Skip to content

会议域与 AI 原子抽取

本章目标:

  1. 看懂 Calendar / Meeting / MeetingFile 三层数据模型与软删边界,以及"截图建会"的 OCR 识别→候选编辑→批量入库三态流程
  2. 弄清四条上传路径各自的门禁(MIME / 体积 / AI 分类 / 候选格式)与失败码语义
  3. 掌握 ai_extract_worker 事件驱动抽取链路:after_insert → after_commit → fire-and-forget → MeetingAtomCache 状态机 → 服务重启恢复

TL;DR

会议域是 a-cdm 业务后端的核心子系统:Calendar → Meeting → MeetingFile 三层模型(全软删),配一条"日历截图 → 视觉模型 OCR → 候选去重 → 批量事务入库"的导入流水线。文件上传后,SQLAlchemy after_insert 监听器在 after_commit 时机 fire-and-forget 触发 ai_extract_worker,对 minutes/transcript 文件下载→markitdown→kimi 抽取 entities/facts/decisions/participants 四类原子,落进 MeetingAtomCache(三段复合主键 + pending/ok/failed 状态机)。服务重启时 recover_pending_on_startup 扫卡死 10 分钟以上的 pending 行重新入队,保证 fire-and-forget 任务不因进程崩溃而永久丢失。

Overview(为什么"上传即抽取",而不是用时再算)

会议洞察引擎(见 会议洞察引擎)需要把一批会议的纪要聚合成结构化的 InsightTree。最直接的做法是用户点"生成洞察"时现场调 LLM 抽取每场会议的原子。但这有三个硬问题:

  1. :kimi 处理一份 10KB+ 纪要常要 4-6 分钟,N 场会议串行就是几十分钟,用户等不了
  2. 重复算:同一场会议的纪要内容不变,每次聚合都重抽是纯浪费 token
  3. 撞网关墙:jointpilot 公司网关有 5 分钟硬超时,非流式长任务必撞墙(memory jointpilot_5min_wall_weekly_report)

答案是把抽取从"读路径"挪到"写路径":文件上传成功那一刻就异步起抽取任务,结果缓存进 MeetingAtomCache。等用户真要看洞察时,L2 cache 命中即可跳过 LLM extract,聚合只需跑后续 6 个确定性节点(从 ~30s+ 降到 ~5-10s,见 models.py:727)。这就是 ai-extract-as-aggregation-substrate 这个 OpenSpec change 的核心思想——AI 抽取作为聚合的底料(substrate),提前算好。

代价是引入了"事件驱动 + 状态机 + 启动恢复"的复杂度,本章就是讲这套机制怎么落地。

Architecture:数据模型与组件分工

会议域的数据模型是一棵三层软删树。Calendar 挂在 Project 下(可有子日历),Meeting 挂在 Calendar 下,MeetingFile 挂在 Meeting 下。AI 抽取的产物落在独立的 MeetingAtomCache(L2)与 MeetingInsightCache(L1,见洞察引擎章)。

组件职责入口文件Source
Calendar项目下的日历分组,partial unique 索引支持软删后同名重建model/models.pyacdm-backend/app/model/models.py:95-123
Meeting会议实体,scheduled/completed/cancelled 状态 + normal/important 重要度model/models.pyacdm-backend/app/model/models.py:126-153
MeetingFile会议附件,category=minutes/transcript/media/othermodel/models.pyacdm-backend/app/model/models.py:156-184
MeetingAtomCacheL2 单会议原子缓存 + 抽取状态机model/models.pyacdm-backend/app/model/models.py:720-778
MeetingImportService截图建会:analyze(OCR) + detect_conflicts + confirmservices/meeting_import_service.pyacdm-backend/app/services/meeting_import_service.py:109-188
VisionClient日历截图视觉模型封装(vision_calendar role)util/vision_client.pyacdm-backend/app/util/vision_client.py:109-131
FileService两阶段上传(analyze→upload)+ NC 落盘 + 回滚services/file_service.pyacdm-backend/app/services/file_service.py:150-294
ai_extract_listenerMeetingFile after_insert / after_commit 监听器services/ai_extract_listener.pyacdm-backend/app/services/ai_extract_listener.py:31-72
ai_extract_worker单会议原子抽取异步 worker + startup recoveryservices/ai_extract_worker.pyacdm-backend/app/services/ai_extract_worker.py:93-232
meeting_ai_service上传纪要后台抽 1-2 句话回写 descriptionservices/meeting_ai_service.pyacdm-backend/app/services/meeting_ai_service.py:66-228

Components / Subsystems

Calendar / Meeting / MeetingFile 模型

职责:三层软删树,所有列表 / JOIN 路径默认过滤 deleted_at IS NULL

Calendar(models.py:95-123)有两个关键设计:parent_calendar_id 自引用支持子日历层级(sub_calendars relationship,models.py:111-113);calendars_project_id_name_active_idxpartial unique 索引,postgresql_where=text("deleted_at IS NULL")(models.py:115-123)——这保证活跃日历在同项目内名字唯一,但软删后可以用同名重建(对齐 calendar_service.py:3-14 的可逆软删设计)。

Meeting(models.py:126-153)的 statusscheduled/completed/cancelled 枚举,importancenormal/important。注意它没有 project_id 列——项目归属是经 calendar_id → Calendar.project_id 间接推导的,这就是 ai_extract_worker._collect_meeting_context 必须 JOIN Calendar 才能拿到 project_id 的原因(ai_extract_worker.py:256)。

MeetingFile(models.py:156-184)的 category 四值枚举是上传门禁与抽取触发的核心维度。storage_path 记录 Nextcloud 实际路径;uploaded_atMeetingAtomCache.file_mtime 比对的基准——重新上传会更新它,使旧缓存判定为 stale。raw_path / raw_status 是 v0.2.1 遗留的 raw 层字段,本实验保留但未接 compiler(models.py:174-181)。

MeetingImportService:截图建会三态流程

职责:把日历软件截图变成批量会议记录,UI 流程是"识别中骨架屏 → 候选表格编辑 → 批量提交"。

关键方法:

  • analyze_image()(meeting_import_service.py:112-144):调 VisionClient.analyze_calendar_image OCR,逐条用 CandidateMeeting 校验。单条脏数据不整体中断——校验失败的条目跳过,理由塞进 warnings 让前端展示(meeting_import_service.py:135-138)。同时算 image_sha256(compute_image_sha256,:104-106)供指纹去重。
  • detect_conflicts()(:326-449):给每条候选打 NEW / DUPLICATE / LIKELY_DUPLICATE / TIME_CONFLICT 标签。判定有优先级:严格四字段全等 = DUPLICATE;Levenshtein 标题距离 ≤ 2 且 start/end 都在 ±60 分钟内 = LIKELY_DUPLICATE(_is_likely_duplicate,:85-101);区间开区间重叠 = TIME_CONFLICT。还检测同批次内候选互相冲突(batch_accepted 累积,:428-447)。
  • confirm_batch()(:146-188):单事务批量入库;任一候选 _combine_to_utc 失败或 end<=start 整体回滚。若带 image_sha256 则同事务落一条 ImportSession(:174-185)供撤销/去重。
  • revert_session()(:259-310):批量软删该 session 导入的会议,复用 MeetingService.delete_meeting(级联清 NC 目录);已 reverted 再调直接返 already_reverted=true(幂等)。

实现要点:候选时间字段统一按 Asia/Shanghai 解释,入库前 _combine_to_utc 转 UTC(:50-53)。_title_distance 是空间 O(min(|a|,|b|)) 的迭代 DP Levenshtein,大小写敏感(中文场景下英文大小写差异往往是不同会议,:56-77)。

VisionClient:日历截图 OCR

职责:把图片二进制转 base64 data URL,调 vision_calendar role 模型抽 JSON,失败 fail-fast 抛 BusinessException。

关键类:VisionClientvision_client.py:109__init__ai_registry.get_chat_client("vision_calendar") 拿 client+model(:127),不再直读 VOLCENGINE_*——registry 启动期已 fail-fast 校验凭据(见 AI 角色注册表与模型矩阵)。

实现要点:

  • 三格式 JSON 兜底:_extract_json_from_text(:74-106)依次尝试纯 JSON → ```json ``` 代码块 → 贪婪 {...} 块,全失败抛 JSONDecodeError
  • 2 次重试 + 指数退避 1s→2s(:174-218):JSON 解析失败时刷新 user_content 加纠错提示(:208-215)再重试;限流/超时错误(429/timeout/502/503)也重试,non-retryable 错误直接转 _ERR_API_FAILURE(50304)
  • 月视图约定:看不到具体时间时 system prompt 要求 start_time/end_time 设 null(:59),与候选模型的 nullable 字段对齐

ai_extract_listener:事件驱动触发器

职责:在 MeetingFile INSERT 后、且事务真正 commit 后,fire-and-forget 触发抽取。

为什么不在 after_insert 直接 schedule_extract?因为 after_insert 在 flush 阶段触发,此时事务还可能 rollback(ai_extract_listener.py:11-12)。如果直接调 worker,上传失败回滚后会留下一个对不存在文件的抽取任务。

解法是两阶段分发(参考 db_event_recorder 同款 pattern,ai_extract_listener.py:5-12):

  • _on_meeting_file_after_insert(:31-45):flush 时只把 meeting_id 暂存到 session.info[_PENDING_KEY](set 去重),且只对 category IN (minutes, transcript) 暂存
  • _on_session_after_commit(:48-65):commit 后才 pop pending 真分发,延迟 import schedule_extract 避免循环依赖
  • _on_session_after_rollback(:68-71):rollback 直接丢弃 pending(没真发生 INSERT)

监听器是模块级注册(@event.listens_for,:31/:48/:68),import 即生效,测试也能走到。install_ai_extract_listener()(:74-82)只做日志声明,是个 no-op 锚点。

ai_extract_worker:单会议抽取主流程

职责:对单个会议下载文件 → markitdown → kimi 抽 4 类原子 → 写 MeetingAtomCache。

关键方法:

  • schedule_extract(meeting_id)(ai_extract_worker.py:66-79):listener 调它,内部 loop.create_task(_safe_extract(...)) fire-and-forget;无 running loop 时只 warning 不抛
  • _safe_extract(:82-90):吞所有异常只 log,防止 worker 异常打挂 task pool
  • extract_for_meeting(meeting_id)(:93-201):主流程,自管 session;返回 ok/skipped/failed/None(None = 数据 race,会议不存在或无 minutes 也无 transcript)
  • recover_pending_on_startup()(:204-232):启动恢复,见下文状态机节

实现要点:

  • IO 在 session 外做:Step 3 标记 pending 后关闭 session,Step 4 的 NC 下载 + markitdown 在 session 外执行(:126-142),避免长 IO 占着数据库连接
  • 流式调用绕网关墙:_llm_extract_once(:387-439)用 stream=True 边算边吐 token 保活连接,绕开 jointpilot 5min 硬超时墙;response_format={"type":"json_object"} 强约束 + _strip_json_codefence(:442-457)鲁棒抠 JSON 双保险
  • retry 当前设为单次:_RETRY_BACKOFFS_SEC = (0.0,)(:42)——2026-05-12 单场稳定性测试期临时改单次测纯成功率,不让 retry 掩盖失败
  • prompt 进程内缓存:_load_system_prompt(:463-469)读 prompts/atom_extract_prompt.md 后缓存,prompt 运行时不变

Data Flow:从上传到原子缓存

文字说明关键步骤:

  1. 上传门禁(file_router.py:96-109):MIME 白名单(octet-stream 放行作浏览器兜底)+ MAX_UPLOAD_BYTES 体积校验,违规抛 40008/40007
  2. NC 落盘 + 回滚(file_service.py:207-294):先传主文件 + meta.json,DB 失败时回滚删 NC 文件(50004);AI 分类结构非法时 ValidationError → 删主文件 + 抛 40003(防注入)
  3. after_commit 真分发(ai_extract_listener.py:51-58):pop pending set,逐个 schedule_extract
  4. skip 短路(ai_extract_worker.py:115-123):_already_up_to_datestatus=='ok' AND file_mtime >= latest_mtime,命中返 skipped(0 LLM 调用)
  5. pending 占位(:126-135):立刻 UPSERT 一行 status=pending,前端 sidebar 立刻显示 ⏳(meeting_repo.py:97-107 把这个状态映成前端 affordance)
  6. 抽取 + 写回(:155-201):kimi 抽取成功后 UPSERT status=ok + payload,失败 _record_failurestatus=failed + error_message

category=='minutes'FileService.upload_file 还会额外起一个 run_description_suggestion 后台任务(file_service.py:282-286),LLM 抽 1-2 句话回写 meeting.description(SQL WHERE description IS NULL OR ='' 守门防覆盖用户手填,meeting_ai_service.py:193-197)。这与原子抽取是两条独立的 fire-and-forget 链路。

Implementation Details:MeetingAtomCache 状态机与启动恢复

MeetingAtomCache(models.py:720-778)是三段复合主键 (project_id, meeting_id, extract_version)extract_version 进主键的妙处:bump EXTRACT_VERSION(当前 "2.0.0",insights_service.py:50)时旧 row 自然不命中新查询,无需手工清缓存(models.py:726)。

状态机三态由 status 列(Enum("ok","pending","failed"))驱动:

核心难点:fire-and-forget 的可靠性schedule_extractloop.create_task 起任务,任务完全活在进程内存里。如果 worker 跑到一半进程崩溃(OOM / 部署重启),那行 MeetingAtomCache 会永远卡在 status='pending'——没人再去推它,前端永远显示 ⏳。

解法是 recover_pending_on_startup(ai_extract_worker.py:204-232),lifespan startup 调一次(main.py:170):

python
# 摘自 ai_extract_worker.py:211-225
threshold = datetime.now(timezone.utc) - timedelta(minutes=_STALE_PENDING_MIN)
stmt = (
    select(MeetingAtomCache.meeting_id)
    .where(MeetingAtomCache.status == "pending")
    .where(MeetingAtomCache.started_at.is_not(None))
    .where(MeetingAtomCache.started_at < threshold)
    .where(MeetingAtomCache.extract_version == EXTRACT_VERSION)
)
rows = (await session.execute(stmt)).all()
for (mid,) in rows:
    schedule_extract(mid)  # 重新入队

started_at(models.py:761-765)只在写 pending 时设值,正常完成会被 status='ok'(started_at=None)覆盖。所以"status='pending'started_at 早于 10 分钟前"等价于"这行卡死了"。_STALE_PENDING_MIN = 10(:51)给正常抽取留足时间(kimi 4-6 分钟 + 余量),避免把正在跑的任务误判重入队。main.py:169-172 对这次恢复套了 try/except——恢复失败不阻塞服务启动(non-blocking)。

EXTRACT_VERSION 升级后的批量补跑走独立路径:admin 端点 atom_extract_backfill(admin_meeting_router.py:190-247)用 NOT EXISTS 子查询找出"有 active minutes/transcript 文件但没有当前版本 status='ok' 缓存行"的会议,token bucket 限流逐个 schedule_extract,抗 LLM 上游限流。

速查表:四路上传门禁

会议域有四条"把外部内容写进系统"的入口,每条门禁不同。这是排查"为什么上传被拒/没触发抽取"时第一时间扫的表:

路径入口MIME 门禁体积门禁内容/格式门禁失败码Source
截图识别POST /meetings/import-from-image/analyzepng/jpeg/webp 白名单 → 41510MB → 413视觉模型 JSON 解析 2 次重试41501/41301/50301-50304acdm-backend/app/api/meeting_import_router.py:58-96
文件预分析POST /files/analyze无(仅文本提取)MAX_UPLOAD_BYTES → 40007;文本提取截断 10MB无(stub 规则分类)40007acdm-backend/app/api/file_router.py:35-54
文件正式上传POST /files/meeting/{id}ALLOWED_UPLOAD_MIMES 白名单(octet-stream 放行) → 40008MAX_UPLOAD_BYTES → 40007AI 分类结构 AIClassificationSchema 校验 → 40003(防注入)40008/40007/40003/50003/50004acdm-backend/app/api/file_router.py:84-115 · acdm-backend/app/services/file_service.py:243-250
候选批量入库POST /meetings/import-from-image/confirm无(已是结构化候选)_combine_to_utc 日期/时间格式 + end>start42201/42202/42203/42204acdm-backend/app/services/meeting_import_service.py:487-520

注:/files/analyze/files/meeting/{id} 是"两阶段上传"——analyze 先返回 AI 分类建议 + 标准化文件名建议(FileService.analyze_file,file_service.py:44-134),用户确认后再正式上传。analyze_file 的 AI 分类是 AIServiceStub 规则实现(实验阶段未接真 AI,file_service.py:55/:69-91);分类与用户选的 category 不一致时只产 warning 不拦截(file_service.py:95-101)。

MeetingAtomCache.status 四态对前端的映射(meeting_repo.get_by_project_with_filters,meeting_repo.py:81-107):has_ai_extract_status 取该会议最新一行(按 computed_at desc)的 status,无行返 'none'——故意不按 extract_version 过滤,版本 bump 时旧 row 状态仍能映回前端 affordance,等 worker 重抽后用新版本覆盖(meeting_repo.py:84-86)。

Configuration

Config默认含义Source
EXTRACT_VERSION"2.0.0"抽取算法版本,进 MeetingAtomCache 复合主键;bump 即失效旧缓存acdm-backend/app/services/insights_service.py:50
_STALE_PENDING_MIN10(分钟)startup recovery 判 pending 卡死的阈值acdm-backend/app/services/ai_extract_worker.py:51
_RETRY_BACKOFFS_SEC(0.0,)LLM 抽取 retry 退避(当前单次,测试期临时)acdm-backend/app/services/ai_extract_worker.py:42
AI_ROLE"meeting_atom_extract"原子抽取专属 ai_registry role(kimi 非流式)acdm-backend/app/services/ai_extract_worker.py:38
_MARKITDOWN_EXTSdocx/pdf/pptx/xlsx/html…走 markitdown 解析的扩展名acdm-backend/app/services/ai_extract_worker.py:45-47
_LIKELY_TITLE_MAX_DISTANCE2LIKELY_DUPLICATE 的标题 Levenshtein 阈值acdm-backend/app/services/meeting_import_service.py:43
_LIKELY_TIME_WINDOW_MIN60(分钟)LIKELY_DUPLICATE 的 start/end 时间窗acdm-backend/app/services/meeting_import_service.py:44
_IMPORT_DEDUPE_WINDOW_HOURS24(小时)同 image_sha256 指纹去重窗口acdm-backend/app/services/meeting_import_service.py:47
_MAX_IMAGE_BYTES10MB截图 OCR 体积上限acdm-backend/app/api/meeting_import_router.py:32

Common Pitfalls / 实战 Tips

  • after_insert 不能直接调 worker:flush 阶段事务还能 rollback,必须经 after_commit 两阶段分发,否则上传失败会留下幽灵抽取任务(ai_extract_listener.py:11-12)
  • fire-and-forget 必配 startup recovery:loop.create_task 任务活在进程内存,进程崩溃即丢;靠 started_at + 10min 阈值识别卡死行重入队(ai_extract_worker.py:204-232)
  • 路由声明顺序硬约束:POST /files/meeting/counts 必须声明在 POST /files/meeting/{meeting_id} 之前,否则 FastAPI 把 "counts" 当 meeting_id,月视图附件 badge 全消失(file_router.py:57-62,2026-05-07 prod 实测复现)
  • 抽取依赖 minutes 或 transcript 任一:_collect_meeting_context 两类文件都没有时返 None 跳过(ai_extract_worker.py:267-268),只有 media/other 文件不会触发抽取(listener _TARGET_CATEGORIES 只含 minutes/transcript,ai_extract_listener.py:28)
  • 重新上传同名文件会刷新 file_mtime:existing_file 分支更新 uploaded_at(file_service.py:254-261),使旧 MeetingAtomCache 判定 stale,触发重抽——这是预期行为,不是 bug
  • description 抽取与原子抽取互不干扰:都在 upload_file 后起,但前者 role=meeting_description、后者 role=meeting_atom_extract,失败各自静默不影响上传主流程(file_service.py:279-286)

References

  • acdm-backend/app/model/models.py:95-184 — Calendar/Meeting/MeetingFile 三层模型 + 软删/partial unique 索引
  • acdm-backend/app/model/models.py:720-778 — MeetingAtomCache 复合主键 + 状态机字段
  • acdm-backend/app/services/ai_extract_worker.py:1-543 — 单会议原子抽取 worker(主源)+ startup recovery
  • acdm-backend/app/services/ai_extract_listener.py:1-83 — after_insert/after_commit 两阶段触发器
  • acdm-backend/app/services/meeting_import_service.py:1-521 — 截图建会 analyze/conflict/confirm/revert
  • acdm-backend/app/util/vision_client.py:1-256 — 日历截图视觉模型封装 + 三格式 JSON 兜底
  • acdm-backend/app/services/file_service.py:150-294 — 两阶段上传 + NC 落盘回滚 + 原子抽取触发点
  • acdm-backend/app/api/file_router.py:35-159 — 四路上传中三路的门禁
  • acdm-backend/app/api/meeting_import_router.py:58-196 — 截图导入 API + MIME/体积门禁
  • acdm-backend/app/services/meeting_ai_service.py:66-228 — 上传纪要后台抽 description(并行链路)
  • acdm-backend/app/repository/meeting_repo.py:75-119 — has_minutes/has_ai_extract_status 前端 affordance 查询
  • acdm-backend/app/main.py:161-172 — lifespan 装配 listener + startup recovery
  • acdm-backend/app/api/admin_meeting_router.py:190-247 — EXTRACT_VERSION bump 后批量补跑
PageRelationship
会议洞察引擎本章 MeetingAtomCache(L2)是该章 InsightTree 聚合的底料
可逆DB操作与存储层本章 delete_meeting/update_meeting 包 business_transaction;listener 复用 db_event_recorder pattern
AI角色注册表与模型矩阵本章 vision_calendar / meeting_atom_extract / meeting_description role 由该章 registry 提供 client
环境变量凭据与降级开关本章 MAX_UPLOAD_BYTES / ALLOWED_UPLOAD_MIMES 等 settings 在该章详解
项目管理与资源授权本章上传/导入端点的 require_meeting_access / require_project_access 在该章详解
acdm-backend控制面架构本章 fire-and-forget worker + lifespan startup 钩子是该章控制面的一部分

公司内部参考 · 由 claude-wiki-gen 基于源码自动生成的二次分析