Skip to content

审计、SSE 与后台任务

本章目标:

  1. 看懂 acdm-backend 的审计三表(api_audit_log / login_history / admin_audit_log)如何由 HTTP middleware 与 admin 路由 fail-safe 写入
  2. 弄清 admin_* 跨用户治理路由为什么要绕开 Aegra SQL 硬过滤直连 aegra DB,以及降级开关怎么用
  3. 理解 SSEConnectionManager 的 PG 持久化 + 断线回放设计,和 lifespan 里 background_executor / sync_scheduler 两条异步基础设施的角色

TL;DR

acdm-backend 的可观测与治理层由三块构成:① 审计——audit_log HTTP middleware 在每个请求末端把变更类(POST/PUT/PATCH/DELETE)或 /admin/* 请求落 api_audit_log,登录落 login_history,admin 升降级与同事务的状态变更一起落 admin_audit_log;审计写入全程 fail-safe(失败只 warn 不破坏业务),且受 ACDM_AUDIT_LOGGING_ENABLED 总开关控制。② 跨用户治理——admin_thread_router 等 admin_* 路由挂 require_admin,其中跨用户读 thread 必须绕开 Aegra HTTP API 的 user_id SQL 硬过滤,直连 aegra DB(asyncpg 读 thread/runs 表 + 官方 SDK 反序列化 checkpoint),并带独立降级开关 ACDM_ADMIN_THREAD_ACCESS_ENABLED。③ 异步基础设施——SSEConnectionManagersse_event_history 表持久化事件,支持多连接 + Last-Event-ID 断线回放;FastAPI lifespan 拉起 background_executor(已弃用)与 ba_report_sync_scheduler(每 10s 巡检卡死的 BA 步骤)两条常驻协程。

Overview(为什么需要这三块)

这一章的三块功能看似无关,实际回答的是同一类问题:生产环境里"看不见的事"如何变得可观测、可治理、可恢复

审计 解决"谁在什么时候改了什么"。没有审计层时,排查越权操作、定位误删源头、复盘安全事件都只能靠 docker 日志——而 CD 每分钟自动部署、容器重启即丢历史日志(acdm-backend/app/main.py:97)。所以审计必须落库。但审计有个硬约束:它不能反过来破坏业务。审计写库失败、审计开关关闭,都不该让用户的正常请求 500。这就是为什么 AuditService 的所有写入方法内部自己 try/except + warn,且有 ACDM_AUDIT_LOGGING_ENABLED 一键熔断(acdm-backend/app/services/audit_service.py:1-9)。

跨用户治理 解决一个 Aegra 切换带来的具体难题。系统 2026-04-25 切到 Aegra 0.9.4 做 Agent Runtime,Aegra 在 thread 表 schema 强绑 user_id,API 层 14 处 SQL 硬过滤 WHERE user_id = identity,auth handler 返回 None 都无法 bypass(acdm-backend/app/api/admin_thread_router.py:1-9)。这意味着 admin 想看任意用户的对话做合规审计,走 Aegra SDK 根本拿不到数据。唯一出路是绕开 Aegra HTTP API,直连它背后的 aegra DB。

异步基础设施 解决两类长耗时操作的可恢复性:BA 报告分析动辄几分钟,前端不能干等也不能断线就丢进度——需要 SSE 实时推 + 断线回放;Agent 跑在 Aegra 进程里,会出现三种失联态——跑完但前端关页面没收到(孤儿)、卡死、超时——需要一个常驻 scheduler 定期巡检兜底。

Architecture

子系统职责入口Source
audit_log middleware每请求末端记日志 + 触发审计落库audit_log()acdm-backend/app/main.py:270-295
AuditService写/查 login_history + api_audit_log,内部 fail-safeAuditServiceacdm-backend/app/services/audit_service.py:34
AuditLogServiceadmin_audit_log,与状态变更同事务AuditLogService.recordacdm-backend/app/services/audit_log_service.py:20-50
audit_router/admin/logins /admin/apis 只读查询routeracdm-backend/app/api/audit_router.py:21-66
admin_user_router用户升降级/启停 + /admin/audit 查询routeracdm-backend/app/api/admin_user_router.py:40-247
admin_thread_router跨用户读 thread(绕 Aegra SQL filter)routeracdm-backend/app/api/admin_thread_router.py:43-336
SSEConnectionManagerSSE 多连接 + PG 持久化 + 断线回放sse_manager 单例acdm-backend/app/services/sse_manager.py:33-238
BackgroundTaskExecutorBA 任务队列消费(已弃用)background_executor 单例acdm-backend/app/services/background_executor.py:65-1003
BAReportSyncScheduler每 10s 巡检卡死 BA 步骤run_sync_scheduleracdm-backend/app/services/ba_report_sync_scheduler.py:274-303

middleware 注册顺序(FastAPI 中 add_middleware / @app.middleware 后注册的先执行)从外到内是:CORS → SlowAPI → reversible_actor_contextmcp_service_authaudit_log,所以 audit_log 最贴近业务路由,能拿到最终响应状态码与真实耗时(acdm-backend/app/main.py:222-295)。

Components / Subsystems

审计三表与 fail-safe 写入

职责:把"谁、何时、对哪个路径、做了什么、结果如何"持久化,且永不反噬业务。

三张表的分工(都刻意不加外键到 acdm_users,Keycloak 删账户后仍可追溯):

写入时机写入者关键设计Source
login_history登录成功 / 登出AuditService.record_login / record_logoutuser_email 冗余快照登录当刻值acdm-backend/app/model/models.py:516-538
api_audit_log变更类请求 或 /admin/* 完成后audit_log middleware → AuditService.record_api只记写操作 + admin GET,防膨胀acdm-backend/app/model/models.py:643-665
admin_audit_logadmin 升/降/启/停用户AuditLogService.record(与状态变更同事务)actor_email/target_email 冗余存事件当刻值acdm-backend/app/model/models.py:329-351

记录范围判定 _should_record_api_audit(acdm-backend/app/main.py:33-39):先按前缀跳过 /health /auth /docs /openapi /redoc /mcp;然后 /admin/*(含 GET)一律记;其余只记 POST/PUT/PATCH/DELETE。GET 业务读接口不记是有意为之——否则审计表会被海量读请求撑爆(acdm-backend/app/model/models.py:646-647)。

fail-safe 的三层防线(审计绝不破坏业务):

  1. middleware 层:_maybe_record_api_audit 整体被 try/except 包,异常只 logger.warning(acdm-backend/app/main.py:285-293)。
  2. service 层:record_api / record_login / record_logout 各自内部 try/except,失败返 None(acdm-backend/app/services/audit_service.py:106-124)。
  3. 开关层:_enabled()ACDM_AUDIT_LOGGING_ENABLED,设为 false/0/no所有写入方法直接 return,但查询方法仍可用(历史数据照查)(acdm-backend/app/services/audit_service.py:25-31)。

middleware 不在请求依赖链里,所以它自管 session 生命周期——用 async_session_maker() 开独立 session,自己 commit(acdm-backend/app/main.py:66-85)。

admin_audit_log 的同事务保证:AuditLogService.recordflushcommit,commit 留给调用方(acdm-backend/app/services/audit_log_service.py:31-50)。在 admin_user_router 里,admin_svc.promote()is_adminaudit_svc.record() 写审计共用同一个 get_db session,路由末尾统一 session.commit()——要么状态变更+审计都落,要么都回滚,杜绝"权限改了但没留审计"的脏状态(acdm-backend/app/api/admin_user_router.py:59-82)。_actor_as_user 这里有个细节:它造一个不加进 session 的轻 User 实例仅作值载体传给审计,避免 identity map 冲突(acdm-backend/app/api/admin_user_router.py:250-263)。

跨用户治理路由(admin_*)

职责:给 admin 提供跨用户的合规可见性与可逆操作,全部 require_admin 守卫。

所有 admin_* 路由统一挂 /admin prefix + auth_dep,router 内部再 Depends(require_admin) 做二次保险(生产 Caddy forward_auth 已前置校验)(acdm-backend/app/main.py:491-545)。清单见下方速查表。

最有技术含量的是 admin_thread_router。它要解决的矛盾是:Aegra 0.9.4 在 SQL 层硬过滤 user_id,admin 用 Aegra SDK 拿不到别人的 thread。解法分两类数据源:

  • thread 列表 / metadata / runs:asyncpg 直连 aegra DB,裸 SQL 读 thread / runs 表(jsonb 字段直读)(acdm-backend/app/api/admin_thread_router.py:94-140)。
  • 完整 thread state(messages/artifacts/todos):这些在 checkpoints + checkpoint_blobs 里是 LangGraph 自家 msgpack 格式。SQL 直读 bytea blob 自己解析 = fork LangGraph 内部 ABI,所以走官方 AsyncPostgresSaver.aget() 反序列化 channel_values(acdm-backend/app/api/admin_thread_router.py:168-235)。

_aegra_db_url() 优先读 AEGRA_DATABASE_URL,否则从 acdm 自己的 DATABASE_URL 衍生:去掉 +asyncpg driver 标记、库名替换为 aegra(生产假设两个 DB 同 platform-db 实例、同 credential)(acdm-backend/app/api/admin_thread_router.py:46-65)。

_load_thread_state 是 fail-soft 设计的典范:LangGraph SDK 是外部依赖,checkpoint schema 随版本变。它把"langgraph 没装"、"读 checkpoint 失败"、"channel_values 类型异常"、"序列化失败"分别返回 (None,None,None,"<原因>"),让端点不 500——admin 仍能看到 metadata + runs 用于审计,前端按 messages_warning 显示红色降级提示(acdm-backend/app/api/admin_thread_router.py:168-235257-336)。

SSEConnectionManager

职责:把 BA 报告这类长耗时任务的进度实时推给前端,支持多前端同看一任务、断线后回放未收到的事件。

关键类:SSEConnectionManageracdm-backend/app/services/sse_manager.py:33,模块级单例 sse_manager(:238)。

为什么不用 Redis:sse_event_history 表注释明确写"PostgreSQL 模式下替代 Redis 存储"(acdm-backend/app/model/ba_report_models.py:107-114)。事件持久化进 PG,断线重连靠 event_id 单调递增序号回放,代价是 push 时一次 SELECT max(event_id) + INSERT + commit(acdm-backend/app/services/sse_manager.py:106-122)。

关键方法:

  • connect(task_id):每连接建一个 asyncio.Queue(maxsize=100),同一 task 多连接放进 _connections[task_id] 列表,_lock 保护(acdm-backend/app/services/sse_manager.py:49-68)。
  • push(db, task_id, event, data):取下一个 event_id → 存 PG → 推所有活跃 queue;queue 满则丢该事件 + warn,不阻塞(acdm-backend/app/services/sse_manager.py:89-139)。
  • get_history(db, task_id, last_event_id):回放 event_id > last_event_id 的事件,断线重连用(acdm-backend/app/services/sse_manager.py:141-172)。
  • clear_expired(db, ttl=3600):删 created_at 早于 cutoff 的事件,防表无限膨胀(acdm-backend/app/services/sse_manager.py:190-208)。
  • format_sse(event):拼 id: / event: / data:(JSON,ensure_ascii=False)三行 + 空行的标准 SSE 报文(acdm-backend/app/services/sse_manager.py:210-234)。

消费端ba_report_router.py/ba-reports/{report_id}/stream:查当前活跃 task,无活跃 task 返回 event: no_task 空流;有则 connect 拿 queue,若带 Last-Event-ID header 先回放历史,再 asyncio.wait_for(queue.get(), timeout=30) 阻塞等新事件——30s 超时发 : heartbeat 保活,收到 completed/error 事件即结束流,finallydisconnect(acdm-backend/app/api/ba_report_router.py:243-328)。

lifespan 异步基础设施

FastAPI _lifespan 在 yield 前拉起两条常驻协程,yield 后逐一优雅关闭(acdm-backend/app/main.py:186-211):

1. background_executor(已弃用,2026-05-17)——文件头部 DeprecationWarning 明确标注"系统已切 SSE direct-to-LangGraph 模式,本模块不在活跃执行路径"(acdm-backend/app/services/background_executor.py:1-1636-42)。它原是 BA 报告的 Hybrid 后台任务执行器:单例 + asyncio.Queue 任务队列 + 消费者协程 + MAX_CONCURRENT_TASKS=3 并发控制 + 启动时 _recover_pending_tasks 恢复未完成任务(acdm-backend/app/services/background_executor.py:65-126830-871)。start()/stop() 仍被 lifespan 调,但 _execute_task 等核心方法均标 unreachable。理解它的价值在于:它是被 SSE direct-to-LangGraph 取代的前一代架构,新架构改由 ba_report_sync_scheduler 兜底。

2. ba_report_sync_scheduler(活跃)——asyncio.create_task(run_sync_scheduler(), name="ba_report_sync_scheduler"),挂 app.state.scheduler_task,关闭时 cancel + await(acdm-backend/app/main.py:191-211)。它每 SYNC_INTERVAL_SECONDS(默认 10s)循环一次,巡检 status="running"started_at 超过 MONITOR_CHECK_MINUTES(默认 2min)的 BA 步骤(acdm-backend/app/services/ba_report_sync_scheduler.py:26-32274-303)。

它的核心是 runs.get() 分层判断(v1.6,替代盲拉 checkpoint):

python
# 摘自 acdm-backend/app/services/ba_report_sync_scheduler.py:151-199
if run_status == "success":
    # Agent 已跑完 → 拉 checkpoint 取成果(孤儿:用户关页面但 agent 跑完了)
    state = await service._fetch_state_from_checkpoint(...)
    if state: await service.handle_step_completed_from_checkpoint(...)
if run_status in ("error", "timeout", "interrupted", "cancelled"):
    # 标 error;error_code 列只 VARCHAR(8),需缩写 AE_ERR/AE_TMO/AE_INT/AE_CAN
    ...
# status=running/pending 或无 run → 检查超时(TIMEOUT_THRESHOLD_MINUTES,默认 15)
if step.started_at and step.started_at < timeout_threshold:
    await self._mark_step_timeout(db, step)

设计要点:① 第一层只调轻量 runs.get()(7 字段)判断 agent 存活/成功/失败,status=success 时才拉重的 checkpoint 取成果——避免每轮盲拉(acdm-backend/app/services/ba_report_sync_scheduler.py:1-1159-72)。② 幂等:已同步步骤跳过。③ 超时有重试计数保护,_mark_step_timeoutstep.ai_metadata.timeout_retry_count,达 MAX_TIMEOUT_RETRIES(默认 3)标 TMO_PERM 永久失败,否则 TIMEOUT + 计数 +1(acdm-backend/app/services/ba_report_sync_scheduler.py:229-271)。④ 单步异常 db.rollback() 后继续下一步,不让一个坏步骤拖垮整轮(acdm-backend/app/services/ba_report_sync_scheduler.py:209-214)。

速查表:admin_* 治理端点 + 审计写入点

admin_ 路由全集*(全部 /admin prefix,均 require_admin 守卫):

端点方法作用降级开关Source
/admin/loginsGET查登录历史acdm-backend/app/api/audit_router.py:24-40
/admin/apisGET查 API 变更流水acdm-backend/app/api/audit_router.py:43-66
/admin/usersGET列用户(分页)acdm-backend/app/api/admin_user_router.py:43-56
/admin/users/{sub}/promotePOST升级 admin(写审计)acdm-backend/app/api/admin_user_router.py:59-82
/admin/users/{sub}/demotePOST降级(LastAdmin 保护)acdm-backend/app/api/admin_user_router.py:85-110
/admin/users/{sub}/disablePOST禁用(Self 保护)acdm-backend/app/api/admin_user_router.py:113-148
/admin/users/{sub}/enablePOST启用(幂等)acdm-backend/app/api/admin_user_router.py:151-180
/admin/users/by-subsPOST批量反查 email(≤100)acdm-backend/app/api/admin_user_router.py:183-202
/admin/auditGET查升降级审计acdm-backend/app/api/admin_user_router.py:205-247
/admin/threads/allGET跨用户列 threadACDM_ADMIN_THREAD_ACCESS_ENABLEDacdm-backend/app/api/admin_thread_router.py:77-140
/admin/threads/{id}/stateGET跨用户读 thread 全态ACDM_ADMIN_THREAD_ACCESS_ENABLEDacdm-backend/app/api/admin_thread_router.py:257-336
/admin/storage-eventsGET/POST列/详/可逆 revertrouter 内三角色判断acdm-backend/app/api/admin_storage_events_router.py:167-346
/admin/trash/{projects,calendars}GET/DELETE回收站列 + 永久删acdm-backend/app/api/admin_trash_router.py:76-145

审计写入点矩阵:

事件触发位置落表fail-safe受总开关
任意写请求 / admin GETaudit_log middlewareapi_audit_log✓(warn 不抛)
登录成功/auth/callbacklogin_history
登出/auth/logoutlogin_history(填 logged_out_at)
admin 升/降/启/停admin_user_router(同事务)admin_audit_log✗(事务一致优先)

admin_audit_log 刻意走 fail-safe:它要的是"权限变更与审计原子一致",审计失败就该让整个事务回滚,而不是悄悄放过权限变更。这与 api_audit_log 的取舍正好相反。

Configuration

Config默认含义影响Source
ACDM_AUDIT_LOGGING_ENABLEDtrue审计写入总开关false/0/no 时停所有审计写,查询仍可用acdm-backend/app/services/audit_service.py:25-31
ACDM_ADMIN_THREAD_ACCESS_ENABLEDtrue跨用户 thread 访问开关false/admin/threads/* 返 503acdm-backend/app/api/admin_thread_router.py:68-92
AEGRA_DATABASE_URL(从 DATABASE_URL 衍生)aegra DB 连接串显式设则用,否则同实例换库名acdm-backend/app/api/admin_thread_router.py:46-65
BA_REPORT_SYNC_INTERVAL10scheduler 巡检间隔(秒)调大降负载、慢恢复acdm-backend/app/services/ba_report_sync_scheduler.py:27
BA_REPORT_TIMEOUT_MINUTES15步骤超时阈值(分钟)超时标 erroracdm-backend/app/services/ba_report_sync_scheduler.py:30
BA_REPORT_MONITOR_CHECK_MINUTES2running 多久后开始巡检防误判刚启动的步骤acdm-backend/app/services/ba_report_sync_scheduler.py:31
BA_REPORT_MAX_TIMEOUT_RETRIES3超时最大重试达上限标 TMO_PERMacdm-backend/app/services/ba_report_sync_scheduler.py:32
SSE_MAX_HISTORY / SSE_HISTORY_TTL100 / 3600SSE 历史保留量/过期秒clear_expired 清理依据acdm-backend/app/services/sse_manager.py:27-30

安全相关:ACDM_ADMIN_THREAD_ACCESS_ENABLED=false 是合规应急开关——一旦发现跨用户读 thread 被滥用或 aegra DB 直连出问题,可立刻熔断而不下线整个 admin 区。改 .env 后需 up -d --force-recreate(restart 不重读 .env)。

Common Pitfalls / 实战 Tips

  • 审计写不出来别慌:record_* 失败只 warn 返 None,业务请求照常 201/200。排查时 grep record_api failed / api_audit middleware failed(acdm-backend/app/services/audit_service.py:123acdm-backend/app/main.py:293)。
  • admin 查不到别人对话先查开关:/admin/threads/* 返 503 多半是 ACDM_ADMIN_THREAD_ACCESS_ENABLED=false,不是 bug(acdm-backend/app/api/admin_thread_router.py:88-92)。
  • messages_warning 非 null 不是端点挂了:checkpoint 反序列化是 fail-soft,metadata+runs 仍可信,只是 messages/artifacts/todos 拿不到——LangGraph 升级后最常见(acdm-backend/app/api/admin_thread_router.py:184-187)。
  • error_code 列只有 VARCHAR(8):scheduler 写错误码必须缩写(AE_ERR/AE_TMO/TMO_PERM/UNRECOV),写全称会被截断(acdm-backend/app/services/ba_report_sync_scheduler.py:177-178)。
  • background_executor 已弃用:看到它的日志 [BackgroundExecutor] 别按它排查 BA 流程,活跃路径是 SSE direct-to-LangGraph + sync_scheduler(acdm-backend/app/services/background_executor.py:1-16)。
  • SSE queue 满会丢事件:queue.put_nowait 满则 warn 丢弃不阻塞 push;前端靠 Last-Event-ID 重连补回(acdm-backend/app/services/sse_manager.py:132-136)。

References

  • acdm-backend/app/main.py:29-85_should_record_api_audit + _maybe_record_api_audit(审计判定与独立 session 写入)
  • acdm-backend/app/main.py:222-295 — middleware 注册顺序 + audit_log middleware 本体
  • acdm-backend/app/main.py:140-211_lifespan:拉起/关闭 background_executor + sync_scheduler
  • acdm-backend/app/services/audit_service.py:25-218 — fail-safe 写入 + 总开关 + 游标分页查询
  • acdm-backend/app/services/audit_log_service.py:20-50 — admin 升降级审计(同事务,只 flush)
  • acdm-backend/app/api/admin_thread_router.py:46-336 — 跨用户 thread:aegra DB 衍生连接 + fail-soft checkpoint 反序列化
  • acdm-backend/app/api/admin_user_router.py:59-263 — 升降级/启停 + /admin/audit + _actor_as_user 值载体技巧
  • acdm-backend/app/services/sse_manager.py:33-238 — SSE 多连接 + PG 持久化 + 回放
  • acdm-backend/app/api/ba_report_router.py:243-328 — SSE 消费端(Last-Event-ID 回放 + 心跳)
  • acdm-backend/app/services/ba_report_sync_scheduler.py:38-303 — runs.get() 分层巡检 + 超时重试保护
  • acdm-backend/app/model/models.py:329-665 — 三张审计表 ORM 定义
PageRelationship
鉴权与授权双层体系本章 require_admin / JWT 解析复用该章鉴权设施
acdm-backend 控制面架构本章 middleware 链与 lifespan 是该章控制面的一部分
Aegra 运行时与 LangGraph本章跨用户治理需绕开该章 Aegra 的 user_id SQL 硬过滤
BA 专家报告工作流本章 SSE / scheduler 服务于该章 BA 报告任务
可逆 DB 操作与存储层本章 /admin/storage-events revert 基于该章可逆事件
项目管理与资源授权本章 /admin/trash 永久删除该章软删的项目/日历

公司内部参考 · 由 claude-wiki-gen 基于源码自动生成的二次分析