主题
会议域与 AI 原子抽取
本章目标:
- 看懂 Calendar / Meeting / MeetingFile 三层数据模型与软删边界,以及"截图建会"的 OCR 识别→候选编辑→批量入库三态流程
- 弄清四条上传路径各自的门禁(MIME / 体积 / AI 分类 / 候选格式)与失败码语义
- 掌握
ai_extract_worker事件驱动抽取链路:after_insert → after_commit → fire-and-forget → MeetingAtomCache 状态机 → 服务重启恢复
TL;DR
会议域是 a-cdm 业务后端的核心子系统:Calendar → Meeting → MeetingFile 三层模型(全软删),配一条"日历截图 → 视觉模型 OCR → 候选去重 → 批量事务入库"的导入流水线。文件上传后,SQLAlchemy after_insert 监听器在 after_commit 时机 fire-and-forget 触发 ai_extract_worker,对 minutes/transcript 文件下载→markitdown→kimi 抽取 entities/facts/decisions/participants 四类原子,落进 MeetingAtomCache(三段复合主键 + pending/ok/failed 状态机)。服务重启时 recover_pending_on_startup 扫卡死 10 分钟以上的 pending 行重新入队,保证 fire-and-forget 任务不因进程崩溃而永久丢失。
Overview(为什么"上传即抽取",而不是用时再算)
会议洞察引擎(见 会议洞察引擎)需要把一批会议的纪要聚合成结构化的 InsightTree。最直接的做法是用户点"生成洞察"时现场调 LLM 抽取每场会议的原子。但这有三个硬问题:
- 慢:kimi 处理一份 10KB+ 纪要常要 4-6 分钟,N 场会议串行就是几十分钟,用户等不了
- 重复算:同一场会议的纪要内容不变,每次聚合都重抽是纯浪费 token
- 撞网关墙:jointpilot 公司网关有 5 分钟硬超时,非流式长任务必撞墙(memory
jointpilot_5min_wall_weekly_report)
答案是把抽取从"读路径"挪到"写路径":文件上传成功那一刻就异步起抽取任务,结果缓存进 MeetingAtomCache。等用户真要看洞察时,L2 cache 命中即可跳过 LLM extract,聚合只需跑后续 6 个确定性节点(从 ~30s+ 降到 ~5-10s,见 models.py:727)。这就是 ai-extract-as-aggregation-substrate 这个 OpenSpec change 的核心思想——AI 抽取作为聚合的底料(substrate),提前算好。
代价是引入了"事件驱动 + 状态机 + 启动恢复"的复杂度,本章就是讲这套机制怎么落地。
Architecture:数据模型与组件分工
会议域的数据模型是一棵三层软删树。Calendar 挂在 Project 下(可有子日历),Meeting 挂在 Calendar 下,MeetingFile 挂在 Meeting 下。AI 抽取的产物落在独立的 MeetingAtomCache(L2)与 MeetingInsightCache(L1,见洞察引擎章)。
| 组件 | 职责 | 入口文件 | Source |
|---|---|---|---|
Calendar | 项目下的日历分组,partial unique 索引支持软删后同名重建 | model/models.py | acdm-backend/app/model/models.py:95-123 |
Meeting | 会议实体,scheduled/completed/cancelled 状态 + normal/important 重要度 | model/models.py | acdm-backend/app/model/models.py:126-153 |
MeetingFile | 会议附件,category=minutes/transcript/media/other | model/models.py | acdm-backend/app/model/models.py:156-184 |
MeetingAtomCache | L2 单会议原子缓存 + 抽取状态机 | model/models.py | acdm-backend/app/model/models.py:720-778 |
MeetingImportService | 截图建会:analyze(OCR) + detect_conflicts + confirm | services/meeting_import_service.py | acdm-backend/app/services/meeting_import_service.py:109-188 |
VisionClient | 日历截图视觉模型封装(vision_calendar role) | util/vision_client.py | acdm-backend/app/util/vision_client.py:109-131 |
FileService | 两阶段上传(analyze→upload)+ NC 落盘 + 回滚 | services/file_service.py | acdm-backend/app/services/file_service.py:150-294 |
ai_extract_listener | MeetingFile after_insert / after_commit 监听器 | services/ai_extract_listener.py | acdm-backend/app/services/ai_extract_listener.py:31-72 |
ai_extract_worker | 单会议原子抽取异步 worker + startup recovery | services/ai_extract_worker.py | acdm-backend/app/services/ai_extract_worker.py:93-232 |
meeting_ai_service | 上传纪要后台抽 1-2 句话回写 description | services/meeting_ai_service.py | acdm-backend/app/services/meeting_ai_service.py:66-228 |
Components / Subsystems
Calendar / Meeting / MeetingFile 模型
职责:三层软删树,所有列表 / JOIN 路径默认过滤 deleted_at IS NULL。
Calendar(models.py:95-123)有两个关键设计:parent_calendar_id 自引用支持子日历层级(sub_calendars relationship,models.py:111-113);calendars_project_id_name_active_idx 是 partial unique 索引,postgresql_where=text("deleted_at IS NULL")(models.py:115-123)——这保证活跃日历在同项目内名字唯一,但软删后可以用同名重建(对齐 calendar_service.py:3-14 的可逆软删设计)。
Meeting(models.py:126-153)的 status 是 scheduled/completed/cancelled 枚举,importance 是 normal/important。注意它没有 project_id 列——项目归属是经 calendar_id → Calendar.project_id 间接推导的,这就是 ai_extract_worker._collect_meeting_context 必须 JOIN Calendar 才能拿到 project_id 的原因(ai_extract_worker.py:256)。
MeetingFile(models.py:156-184)的 category 四值枚举是上传门禁与抽取触发的核心维度。storage_path 记录 Nextcloud 实际路径;uploaded_at 是 MeetingAtomCache.file_mtime 比对的基准——重新上传会更新它,使旧缓存判定为 stale。raw_path / raw_status 是 v0.2.1 遗留的 raw 层字段,本实验保留但未接 compiler(models.py:174-181)。
MeetingImportService:截图建会三态流程
职责:把日历软件截图变成批量会议记录,UI 流程是"识别中骨架屏 → 候选表格编辑 → 批量提交"。
关键方法:
analyze_image()(meeting_import_service.py:112-144):调VisionClient.analyze_calendar_imageOCR,逐条用CandidateMeeting校验。单条脏数据不整体中断——校验失败的条目跳过,理由塞进warnings让前端展示(meeting_import_service.py:135-138)。同时算image_sha256(compute_image_sha256,:104-106)供指纹去重。detect_conflicts()(:326-449):给每条候选打 NEW / DUPLICATE / LIKELY_DUPLICATE / TIME_CONFLICT 标签。判定有优先级:严格四字段全等 = DUPLICATE;Levenshtein 标题距离 ≤ 2 且 start/end 都在 ±60 分钟内 = LIKELY_DUPLICATE(_is_likely_duplicate,:85-101);区间开区间重叠 = TIME_CONFLICT。还检测同批次内候选互相冲突(batch_accepted累积,:428-447)。confirm_batch()(:146-188):单事务批量入库;任一候选_combine_to_utc失败或end<=start整体回滚。若带image_sha256则同事务落一条ImportSession(:174-185)供撤销/去重。revert_session()(:259-310):批量软删该 session 导入的会议,复用MeetingService.delete_meeting(级联清 NC 目录);已 reverted 再调直接返already_reverted=true(幂等)。
实现要点:候选时间字段统一按 Asia/Shanghai 解释,入库前 _combine_to_utc 转 UTC(:50-53)。_title_distance 是空间 O(min(|a|,|b|)) 的迭代 DP Levenshtein,大小写敏感(中文场景下英文大小写差异往往是不同会议,:56-77)。
VisionClient:日历截图 OCR
职责:把图片二进制转 base64 data URL,调 vision_calendar role 模型抽 JSON,失败 fail-fast 抛 BusinessException。
关键类:VisionClient 在 vision_client.py:109。__init__ 经 ai_registry.get_chat_client("vision_calendar") 拿 client+model(:127),不再直读 VOLCENGINE_*——registry 启动期已 fail-fast 校验凭据(见 AI 角色注册表与模型矩阵)。
实现要点:
- 三格式 JSON 兜底:
_extract_json_from_text(:74-106)依次尝试纯 JSON →```json ```代码块 → 贪婪{...}块,全失败抛JSONDecodeError - 2 次重试 + 指数退避 1s→2s(
:174-218):JSON 解析失败时刷新 user_content 加纠错提示(:208-215)再重试;限流/超时错误(429/timeout/502/503)也重试,non-retryable 错误直接转_ERR_API_FAILURE(50304) - 月视图约定:看不到具体时间时 system prompt 要求
start_time/end_time设 null(:59),与候选模型的 nullable 字段对齐
ai_extract_listener:事件驱动触发器
职责:在 MeetingFile INSERT 后、且事务真正 commit 后,fire-and-forget 触发抽取。
为什么不在 after_insert 直接 schedule_extract?因为 after_insert 在 flush 阶段触发,此时事务还可能 rollback(ai_extract_listener.py:11-12)。如果直接调 worker,上传失败回滚后会留下一个对不存在文件的抽取任务。
解法是两阶段分发(参考 db_event_recorder 同款 pattern,ai_extract_listener.py:5-12):
_on_meeting_file_after_insert(:31-45):flush 时只把meeting_id暂存到session.info[_PENDING_KEY](set 去重),且只对category IN (minutes, transcript)暂存_on_session_after_commit(:48-65):commit 后才 pop pending 真分发,延迟 importschedule_extract避免循环依赖_on_session_after_rollback(:68-71):rollback 直接丢弃 pending(没真发生 INSERT)
监听器是模块级注册(@event.listens_for,:31/:48/:68),import 即生效,测试也能走到。install_ai_extract_listener()(:74-82)只做日志声明,是个 no-op 锚点。
ai_extract_worker:单会议抽取主流程
职责:对单个会议下载文件 → markitdown → kimi 抽 4 类原子 → 写 MeetingAtomCache。
关键方法:
schedule_extract(meeting_id)(ai_extract_worker.py:66-79):listener 调它,内部loop.create_task(_safe_extract(...))fire-and-forget;无 running loop 时只 warning 不抛_safe_extract(:82-90):吞所有异常只 log,防止 worker 异常打挂 task poolextract_for_meeting(meeting_id)(:93-201):主流程,自管 session;返回ok/skipped/failed/None(None = 数据 race,会议不存在或无 minutes 也无 transcript)recover_pending_on_startup()(:204-232):启动恢复,见下文状态机节
实现要点:
- IO 在 session 外做:Step 3 标记 pending 后关闭 session,Step 4 的 NC 下载 + markitdown 在 session 外执行(
:126-142),避免长 IO 占着数据库连接 - 流式调用绕网关墙:
_llm_extract_once(:387-439)用stream=True边算边吐 token 保活连接,绕开 jointpilot 5min 硬超时墙;response_format={"type":"json_object"}强约束 +_strip_json_codefence(:442-457)鲁棒抠 JSON 双保险 - retry 当前设为单次:
_RETRY_BACKOFFS_SEC = (0.0,)(:42)——2026-05-12 单场稳定性测试期临时改单次测纯成功率,不让 retry 掩盖失败 - prompt 进程内缓存:
_load_system_prompt(:463-469)读prompts/atom_extract_prompt.md后缓存,prompt 运行时不变
Data Flow:从上传到原子缓存
文字说明关键步骤:
- 上传门禁(
file_router.py:96-109):MIME 白名单(octet-stream放行作浏览器兜底)+MAX_UPLOAD_BYTES体积校验,违规抛 40008/40007 - NC 落盘 + 回滚(
file_service.py:207-294):先传主文件 + meta.json,DB 失败时回滚删 NC 文件(50004);AI 分类结构非法时ValidationError→ 删主文件 + 抛 40003(防注入) - after_commit 真分发(
ai_extract_listener.py:51-58):pop pending set,逐个schedule_extract - skip 短路(
ai_extract_worker.py:115-123):_already_up_to_date检status=='ok' AND file_mtime >= latest_mtime,命中返 skipped(0 LLM 调用) - pending 占位(
:126-135):立刻 UPSERT 一行status=pending,前端 sidebar 立刻显示 ⏳(meeting_repo.py:97-107把这个状态映成前端 affordance) - 抽取 + 写回(
:155-201):kimi 抽取成功后 UPSERTstatus=ok+ payload,失败_record_failure写status=failed+ error_message
category=='minutes' 时 FileService.upload_file 还会额外起一个 run_description_suggestion 后台任务(file_service.py:282-286),LLM 抽 1-2 句话回写 meeting.description(SQL WHERE description IS NULL OR ='' 守门防覆盖用户手填,meeting_ai_service.py:193-197)。这与原子抽取是两条独立的 fire-and-forget 链路。
Implementation Details:MeetingAtomCache 状态机与启动恢复
MeetingAtomCache(models.py:720-778)是三段复合主键 (project_id, meeting_id, extract_version)。extract_version 进主键的妙处:bump EXTRACT_VERSION(当前 "2.0.0",insights_service.py:50)时旧 row 自然不命中新查询,无需手工清缓存(models.py:726)。
状态机三态由 status 列(Enum("ok","pending","failed"))驱动:
核心难点:fire-and-forget 的可靠性。schedule_extract 用 loop.create_task 起任务,任务完全活在进程内存里。如果 worker 跑到一半进程崩溃(OOM / 部署重启),那行 MeetingAtomCache 会永远卡在 status='pending'——没人再去推它,前端永远显示 ⏳。
解法是 recover_pending_on_startup(ai_extract_worker.py:204-232),lifespan startup 调一次(main.py:170):
python
# 摘自 ai_extract_worker.py:211-225
threshold = datetime.now(timezone.utc) - timedelta(minutes=_STALE_PENDING_MIN)
stmt = (
select(MeetingAtomCache.meeting_id)
.where(MeetingAtomCache.status == "pending")
.where(MeetingAtomCache.started_at.is_not(None))
.where(MeetingAtomCache.started_at < threshold)
.where(MeetingAtomCache.extract_version == EXTRACT_VERSION)
)
rows = (await session.execute(stmt)).all()
for (mid,) in rows:
schedule_extract(mid) # 重新入队started_at(models.py:761-765)只在写 pending 时设值,正常完成会被 status='ok'(started_at=None)覆盖。所以"status='pending' 且 started_at 早于 10 分钟前"等价于"这行卡死了"。_STALE_PENDING_MIN = 10(:51)给正常抽取留足时间(kimi 4-6 分钟 + 余量),避免把正在跑的任务误判重入队。main.py:169-172 对这次恢复套了 try/except——恢复失败不阻塞服务启动(non-blocking)。
EXTRACT_VERSION 升级后的批量补跑走独立路径:admin 端点 atom_extract_backfill(admin_meeting_router.py:190-247)用 NOT EXISTS 子查询找出"有 active minutes/transcript 文件但没有当前版本 status='ok' 缓存行"的会议,token bucket 限流逐个 schedule_extract,抗 LLM 上游限流。
速查表:四路上传门禁
会议域有四条"把外部内容写进系统"的入口,每条门禁不同。这是排查"为什么上传被拒/没触发抽取"时第一时间扫的表:
| 路径 | 入口 | MIME 门禁 | 体积门禁 | 内容/格式门禁 | 失败码 | Source |
|---|---|---|---|---|---|---|
| 截图识别 | POST /meetings/import-from-image/analyze | png/jpeg/webp 白名单 → 415 | 10MB → 413 | 视觉模型 JSON 解析 2 次重试 | 41501/41301/50301-50304 | acdm-backend/app/api/meeting_import_router.py:58-96 |
| 文件预分析 | POST /files/analyze | 无(仅文本提取) | MAX_UPLOAD_BYTES → 40007;文本提取截断 10MB | 无(stub 规则分类) | 40007 | acdm-backend/app/api/file_router.py:35-54 |
| 文件正式上传 | POST /files/meeting/{id} | ALLOWED_UPLOAD_MIMES 白名单(octet-stream 放行) → 40008 | MAX_UPLOAD_BYTES → 40007 | AI 分类结构 AIClassificationSchema 校验 → 40003(防注入) | 40008/40007/40003/50003/50004 | acdm-backend/app/api/file_router.py:84-115 · acdm-backend/app/services/file_service.py:243-250 |
| 候选批量入库 | POST /meetings/import-from-image/confirm | 无(已是结构化候选) | 无 | _combine_to_utc 日期/时间格式 + end>start | 42201/42202/42203/42204 | acdm-backend/app/services/meeting_import_service.py:487-520 |
注:/files/analyze 与 /files/meeting/{id} 是"两阶段上传"——analyze 先返回 AI 分类建议 + 标准化文件名建议(FileService.analyze_file,file_service.py:44-134),用户确认后再正式上传。analyze_file 的 AI 分类是 AIServiceStub 规则实现(实验阶段未接真 AI,file_service.py:55/:69-91);分类与用户选的 category 不一致时只产 warning 不拦截(file_service.py:95-101)。
MeetingAtomCache.status 四态对前端的映射(meeting_repo.get_by_project_with_filters,meeting_repo.py:81-107):has_ai_extract_status 取该会议最新一行(按 computed_at desc)的 status,无行返 'none'——故意不按 extract_version 过滤,版本 bump 时旧 row 状态仍能映回前端 affordance,等 worker 重抽后用新版本覆盖(meeting_repo.py:84-86)。
Configuration
| Config | 默认 | 含义 | Source |
|---|---|---|---|
EXTRACT_VERSION | "2.0.0" | 抽取算法版本,进 MeetingAtomCache 复合主键;bump 即失效旧缓存 | acdm-backend/app/services/insights_service.py:50 |
_STALE_PENDING_MIN | 10(分钟) | startup recovery 判 pending 卡死的阈值 | acdm-backend/app/services/ai_extract_worker.py:51 |
_RETRY_BACKOFFS_SEC | (0.0,) | LLM 抽取 retry 退避(当前单次,测试期临时) | acdm-backend/app/services/ai_extract_worker.py:42 |
AI_ROLE | "meeting_atom_extract" | 原子抽取专属 ai_registry role(kimi 非流式) | acdm-backend/app/services/ai_extract_worker.py:38 |
_MARKITDOWN_EXTS | docx/pdf/pptx/xlsx/html… | 走 markitdown 解析的扩展名 | acdm-backend/app/services/ai_extract_worker.py:45-47 |
_LIKELY_TITLE_MAX_DISTANCE | 2 | LIKELY_DUPLICATE 的标题 Levenshtein 阈值 | acdm-backend/app/services/meeting_import_service.py:43 |
_LIKELY_TIME_WINDOW_MIN | 60(分钟) | LIKELY_DUPLICATE 的 start/end 时间窗 | acdm-backend/app/services/meeting_import_service.py:44 |
_IMPORT_DEDUPE_WINDOW_HOURS | 24(小时) | 同 image_sha256 指纹去重窗口 | acdm-backend/app/services/meeting_import_service.py:47 |
_MAX_IMAGE_BYTES | 10MB | 截图 OCR 体积上限 | acdm-backend/app/api/meeting_import_router.py:32 |
Common Pitfalls / 实战 Tips
- after_insert 不能直接调 worker:flush 阶段事务还能 rollback,必须经 after_commit 两阶段分发,否则上传失败会留下幽灵抽取任务(
ai_extract_listener.py:11-12) - fire-and-forget 必配 startup recovery:
loop.create_task任务活在进程内存,进程崩溃即丢;靠started_at+ 10min 阈值识别卡死行重入队(ai_extract_worker.py:204-232) - 路由声明顺序硬约束:
POST /files/meeting/counts必须声明在POST /files/meeting/{meeting_id}之前,否则 FastAPI 把 "counts" 当 meeting_id,月视图附件 badge 全消失(file_router.py:57-62,2026-05-07 prod 实测复现) - 抽取依赖 minutes 或 transcript 任一:
_collect_meeting_context两类文件都没有时返 None 跳过(ai_extract_worker.py:267-268),只有 media/other 文件不会触发抽取(listener_TARGET_CATEGORIES只含 minutes/transcript,ai_extract_listener.py:28) - 重新上传同名文件会刷新 file_mtime:
existing_file分支更新uploaded_at(file_service.py:254-261),使旧 MeetingAtomCache 判定 stale,触发重抽——这是预期行为,不是 bug - description 抽取与原子抽取互不干扰:都在 upload_file 后起,但前者 role=
meeting_description、后者 role=meeting_atom_extract,失败各自静默不影响上传主流程(file_service.py:279-286)
References
acdm-backend/app/model/models.py:95-184— Calendar/Meeting/MeetingFile 三层模型 + 软删/partial unique 索引acdm-backend/app/model/models.py:720-778— MeetingAtomCache 复合主键 + 状态机字段acdm-backend/app/services/ai_extract_worker.py:1-543— 单会议原子抽取 worker(主源)+ startup recoveryacdm-backend/app/services/ai_extract_listener.py:1-83— after_insert/after_commit 两阶段触发器acdm-backend/app/services/meeting_import_service.py:1-521— 截图建会 analyze/conflict/confirm/revertacdm-backend/app/util/vision_client.py:1-256— 日历截图视觉模型封装 + 三格式 JSON 兜底acdm-backend/app/services/file_service.py:150-294— 两阶段上传 + NC 落盘回滚 + 原子抽取触发点acdm-backend/app/api/file_router.py:35-159— 四路上传中三路的门禁acdm-backend/app/api/meeting_import_router.py:58-196— 截图导入 API + MIME/体积门禁acdm-backend/app/services/meeting_ai_service.py:66-228— 上传纪要后台抽 description(并行链路)acdm-backend/app/repository/meeting_repo.py:75-119— has_minutes/has_ai_extract_status 前端 affordance 查询acdm-backend/app/main.py:161-172— lifespan 装配 listener + startup recoveryacdm-backend/app/api/admin_meeting_router.py:190-247— EXTRACT_VERSION bump 后批量补跑
Related Pages
| Page | Relationship |
|---|---|
| 会议洞察引擎 | 本章 MeetingAtomCache(L2)是该章 InsightTree 聚合的底料 |
| 可逆DB操作与存储层 | 本章 delete_meeting/update_meeting 包 business_transaction;listener 复用 db_event_recorder pattern |
| AI角色注册表与模型矩阵 | 本章 vision_calendar / meeting_atom_extract / meeting_description role 由该章 registry 提供 client |
| 环境变量凭据与降级开关 | 本章 MAX_UPLOAD_BYTES / ALLOWED_UPLOAD_MIMES 等 settings 在该章详解 |
| 项目管理与资源授权 | 本章上传/导入端点的 require_meeting_access / require_project_access 在该章详解 |
| acdm-backend控制面架构 | 本章 fire-and-forget worker + lifespan startup 钩子是该章控制面的一部分 |