主题
审计、SSE 与后台任务
本章目标:
- 看懂 acdm-backend 的审计三表(
api_audit_log/login_history/admin_audit_log)如何由 HTTP middleware 与 admin 路由 fail-safe 写入- 弄清 admin_* 跨用户治理路由为什么要绕开 Aegra SQL 硬过滤直连
aegraDB,以及降级开关怎么用- 理解
SSEConnectionManager的 PG 持久化 + 断线回放设计,和 lifespan 里 background_executor / sync_scheduler 两条异步基础设施的角色
TL;DR
acdm-backend 的可观测与治理层由三块构成:① 审计——audit_log HTTP middleware 在每个请求末端把变更类(POST/PUT/PATCH/DELETE)或 /admin/* 请求落 api_audit_log,登录落 login_history,admin 升降级与同事务的状态变更一起落 admin_audit_log;审计写入全程 fail-safe(失败只 warn 不破坏业务),且受 ACDM_AUDIT_LOGGING_ENABLED 总开关控制。② 跨用户治理——admin_thread_router 等 admin_* 路由挂 require_admin,其中跨用户读 thread 必须绕开 Aegra HTTP API 的 user_id SQL 硬过滤,直连 aegra DB(asyncpg 读 thread/runs 表 + 官方 SDK 反序列化 checkpoint),并带独立降级开关 ACDM_ADMIN_THREAD_ACCESS_ENABLED。③ 异步基础设施——SSEConnectionManager 用 sse_event_history 表持久化事件,支持多连接 + Last-Event-ID 断线回放;FastAPI lifespan 拉起 background_executor(已弃用)与 ba_report_sync_scheduler(每 10s 巡检卡死的 BA 步骤)两条常驻协程。
Overview(为什么需要这三块)
这一章的三块功能看似无关,实际回答的是同一类问题:生产环境里"看不见的事"如何变得可观测、可治理、可恢复。
审计 解决"谁在什么时候改了什么"。没有审计层时,排查越权操作、定位误删源头、复盘安全事件都只能靠 docker 日志——而 CD 每分钟自动部署、容器重启即丢历史日志(acdm-backend/app/main.py:97)。所以审计必须落库。但审计有个硬约束:它不能反过来破坏业务。审计写库失败、审计开关关闭,都不该让用户的正常请求 500。这就是为什么 AuditService 的所有写入方法内部自己 try/except + warn,且有 ACDM_AUDIT_LOGGING_ENABLED 一键熔断(acdm-backend/app/services/audit_service.py:1-9)。
跨用户治理 解决一个 Aegra 切换带来的具体难题。系统 2026-04-25 切到 Aegra 0.9.4 做 Agent Runtime,Aegra 在 thread 表 schema 强绑 user_id,API 层 14 处 SQL 硬过滤 WHERE user_id = identity,auth handler 返回 None 都无法 bypass(acdm-backend/app/api/admin_thread_router.py:1-9)。这意味着 admin 想看任意用户的对话做合规审计,走 Aegra SDK 根本拿不到数据。唯一出路是绕开 Aegra HTTP API,直连它背后的 aegra DB。
异步基础设施 解决两类长耗时操作的可恢复性:BA 报告分析动辄几分钟,前端不能干等也不能断线就丢进度——需要 SSE 实时推 + 断线回放;Agent 跑在 Aegra 进程里,会出现三种失联态——跑完但前端关页面没收到(孤儿)、卡死、超时——需要一个常驻 scheduler 定期巡检兜底。
Architecture
| 子系统 | 职责 | 入口 | Source |
|---|---|---|---|
audit_log middleware | 每请求末端记日志 + 触发审计落库 | audit_log() | acdm-backend/app/main.py:270-295 |
AuditService | 写/查 login_history + api_audit_log,内部 fail-safe | AuditService | acdm-backend/app/services/audit_service.py:34 |
AuditLogService | 写 admin_audit_log,与状态变更同事务 | AuditLogService.record | acdm-backend/app/services/audit_log_service.py:20-50 |
audit_router | /admin/logins /admin/apis 只读查询 | router | acdm-backend/app/api/audit_router.py:21-66 |
admin_user_router | 用户升降级/启停 + /admin/audit 查询 | router | acdm-backend/app/api/admin_user_router.py:40-247 |
admin_thread_router | 跨用户读 thread(绕 Aegra SQL filter) | router | acdm-backend/app/api/admin_thread_router.py:43-336 |
SSEConnectionManager | SSE 多连接 + PG 持久化 + 断线回放 | sse_manager 单例 | acdm-backend/app/services/sse_manager.py:33-238 |
BackgroundTaskExecutor | BA 任务队列消费(已弃用) | background_executor 单例 | acdm-backend/app/services/background_executor.py:65-1003 |
BAReportSyncScheduler | 每 10s 巡检卡死 BA 步骤 | run_sync_scheduler | acdm-backend/app/services/ba_report_sync_scheduler.py:274-303 |
middleware 注册顺序(FastAPI 中 add_middleware / @app.middleware 后注册的先执行)从外到内是:CORS → SlowAPI → reversible_actor_context → mcp_service_auth → audit_log,所以 audit_log 最贴近业务路由,能拿到最终响应状态码与真实耗时(acdm-backend/app/main.py:222-295)。
Components / Subsystems
审计三表与 fail-safe 写入
职责:把"谁、何时、对哪个路径、做了什么、结果如何"持久化,且永不反噬业务。
三张表的分工(都刻意不加外键到 acdm_users,Keycloak 删账户后仍可追溯):
| 表 | 写入时机 | 写入者 | 关键设计 | Source |
|---|---|---|---|---|
login_history | 登录成功 / 登出 | AuditService.record_login / record_logout | user_email 冗余快照登录当刻值 | acdm-backend/app/model/models.py:516-538 |
api_audit_log | 变更类请求 或 /admin/* 完成后 | audit_log middleware → AuditService.record_api | 只记写操作 + admin GET,防膨胀 | acdm-backend/app/model/models.py:643-665 |
admin_audit_log | admin 升/降/启/停用户 | AuditLogService.record(与状态变更同事务) | actor_email/target_email 冗余存事件当刻值 | acdm-backend/app/model/models.py:329-351 |
记录范围判定 _should_record_api_audit(acdm-backend/app/main.py:33-39):先按前缀跳过 /health /auth /docs /openapi /redoc /mcp;然后 /admin/*(含 GET)一律记;其余只记 POST/PUT/PATCH/DELETE。GET 业务读接口不记是有意为之——否则审计表会被海量读请求撑爆(acdm-backend/app/model/models.py:646-647)。
fail-safe 的三层防线(审计绝不破坏业务):
- middleware 层:
_maybe_record_api_audit整体被try/except包,异常只logger.warning(acdm-backend/app/main.py:285-293)。 - service 层:
record_api/record_login/record_logout各自内部 try/except,失败返 None(acdm-backend/app/services/audit_service.py:106-124)。 - 开关层:
_enabled()读ACDM_AUDIT_LOGGING_ENABLED,设为false/0/no时所有写入方法直接 return,但查询方法仍可用(历史数据照查)(acdm-backend/app/services/audit_service.py:25-31)。
middleware 不在请求依赖链里,所以它自管 session 生命周期——用 async_session_maker() 开独立 session,自己 commit(acdm-backend/app/main.py:66-85)。
admin_audit_log 的同事务保证:AuditLogService.record 只 flush 不 commit,commit 留给调用方(acdm-backend/app/services/audit_log_service.py:31-50)。在 admin_user_router 里,admin_svc.promote() 改 is_admin 和 audit_svc.record() 写审计共用同一个 get_db session,路由末尾统一 session.commit()——要么状态变更+审计都落,要么都回滚,杜绝"权限改了但没留审计"的脏状态(acdm-backend/app/api/admin_user_router.py:59-82)。_actor_as_user 这里有个细节:它造一个不加进 session 的轻 User 实例仅作值载体传给审计,避免 identity map 冲突(acdm-backend/app/api/admin_user_router.py:250-263)。
跨用户治理路由(admin_*)
职责:给 admin 提供跨用户的合规可见性与可逆操作,全部 require_admin 守卫。
所有 admin_* 路由统一挂 /admin prefix + auth_dep,router 内部再 Depends(require_admin) 做二次保险(生产 Caddy forward_auth 已前置校验)(acdm-backend/app/main.py:491-545)。清单见下方速查表。
最有技术含量的是 admin_thread_router。它要解决的矛盾是:Aegra 0.9.4 在 SQL 层硬过滤 user_id,admin 用 Aegra SDK 拿不到别人的 thread。解法分两类数据源:
- thread 列表 / metadata / runs:
asyncpg直连aegraDB,裸 SQL 读thread/runs表(jsonb 字段直读)(acdm-backend/app/api/admin_thread_router.py:94-140)。 - 完整 thread state(messages/artifacts/todos):这些在
checkpoints+checkpoint_blobs里是 LangGraph 自家 msgpack 格式。SQL 直读 bytea blob 自己解析 = fork LangGraph 内部 ABI,所以走官方AsyncPostgresSaver.aget()反序列化channel_values(acdm-backend/app/api/admin_thread_router.py:168-235)。
_aegra_db_url() 优先读 AEGRA_DATABASE_URL,否则从 acdm 自己的 DATABASE_URL 衍生:去掉 +asyncpg driver 标记、库名替换为 aegra(生产假设两个 DB 同 platform-db 实例、同 credential)(acdm-backend/app/api/admin_thread_router.py:46-65)。
_load_thread_state 是 fail-soft 设计的典范:LangGraph SDK 是外部依赖,checkpoint schema 随版本变。它把"langgraph 没装"、"读 checkpoint 失败"、"channel_values 类型异常"、"序列化失败"分别返回 (None,None,None,"<原因>"),让端点不 500——admin 仍能看到 metadata + runs 用于审计,前端按 messages_warning 显示红色降级提示(acdm-backend/app/api/admin_thread_router.py:168-235、257-336)。
SSEConnectionManager
职责:把 BA 报告这类长耗时任务的进度实时推给前端,支持多前端同看一任务、断线后回放未收到的事件。
关键类:SSEConnectionManager 在 acdm-backend/app/services/sse_manager.py:33,模块级单例 sse_manager(:238)。
为什么不用 Redis:sse_event_history 表注释明确写"PostgreSQL 模式下替代 Redis 存储"(acdm-backend/app/model/ba_report_models.py:107-114)。事件持久化进 PG,断线重连靠 event_id 单调递增序号回放,代价是 push 时一次 SELECT max(event_id) + INSERT + commit(acdm-backend/app/services/sse_manager.py:106-122)。
关键方法:
connect(task_id):每连接建一个asyncio.Queue(maxsize=100),同一 task 多连接放进_connections[task_id]列表,_lock保护(acdm-backend/app/services/sse_manager.py:49-68)。push(db, task_id, event, data):取下一个event_id→ 存 PG → 推所有活跃 queue;queue 满则丢该事件 + warn,不阻塞(acdm-backend/app/services/sse_manager.py:89-139)。get_history(db, task_id, last_event_id):回放event_id > last_event_id的事件,断线重连用(acdm-backend/app/services/sse_manager.py:141-172)。clear_expired(db, ttl=3600):删created_at早于 cutoff 的事件,防表无限膨胀(acdm-backend/app/services/sse_manager.py:190-208)。format_sse(event):拼id:/event:/data:(JSON,ensure_ascii=False)三行 + 空行的标准 SSE 报文(acdm-backend/app/services/sse_manager.py:210-234)。
消费端在 ba_report_router.py 的 /ba-reports/{report_id}/stream:查当前活跃 task,无活跃 task 返回 event: no_task 空流;有则 connect 拿 queue,若带 Last-Event-ID header 先回放历史,再 asyncio.wait_for(queue.get(), timeout=30) 阻塞等新事件——30s 超时发 : heartbeat 保活,收到 completed/error 事件即结束流,finally 里 disconnect(acdm-backend/app/api/ba_report_router.py:243-328)。
lifespan 异步基础设施
FastAPI _lifespan 在 yield 前拉起两条常驻协程,yield 后逐一优雅关闭(acdm-backend/app/main.py:186-211):
1. background_executor(已弃用,2026-05-17)——文件头部 DeprecationWarning 明确标注"系统已切 SSE direct-to-LangGraph 模式,本模块不在活跃执行路径"(acdm-backend/app/services/background_executor.py:1-16、36-42)。它原是 BA 报告的 Hybrid 后台任务执行器:单例 + asyncio.Queue 任务队列 + 消费者协程 + MAX_CONCURRENT_TASKS=3 并发控制 + 启动时 _recover_pending_tasks 恢复未完成任务(acdm-backend/app/services/background_executor.py:65-126、830-871)。start()/stop() 仍被 lifespan 调,但 _execute_task 等核心方法均标 unreachable。理解它的价值在于:它是被 SSE direct-to-LangGraph 取代的前一代架构,新架构改由 ba_report_sync_scheduler 兜底。
2. ba_report_sync_scheduler(活跃)——asyncio.create_task(run_sync_scheduler(), name="ba_report_sync_scheduler"),挂 app.state.scheduler_task,关闭时 cancel + await(acdm-backend/app/main.py:191-211)。它每 SYNC_INTERVAL_SECONDS(默认 10s)循环一次,巡检 status="running" 且 started_at 超过 MONITOR_CHECK_MINUTES(默认 2min)的 BA 步骤(acdm-backend/app/services/ba_report_sync_scheduler.py:26-32、274-303)。
它的核心是 runs.get() 分层判断(v1.6,替代盲拉 checkpoint):
python
# 摘自 acdm-backend/app/services/ba_report_sync_scheduler.py:151-199
if run_status == "success":
# Agent 已跑完 → 拉 checkpoint 取成果(孤儿:用户关页面但 agent 跑完了)
state = await service._fetch_state_from_checkpoint(...)
if state: await service.handle_step_completed_from_checkpoint(...)
if run_status in ("error", "timeout", "interrupted", "cancelled"):
# 标 error;error_code 列只 VARCHAR(8),需缩写 AE_ERR/AE_TMO/AE_INT/AE_CAN
...
# status=running/pending 或无 run → 检查超时(TIMEOUT_THRESHOLD_MINUTES,默认 15)
if step.started_at and step.started_at < timeout_threshold:
await self._mark_step_timeout(db, step)设计要点:① 第一层只调轻量 runs.get()(7 字段)判断 agent 存活/成功/失败,仅 status=success 时才拉重的 checkpoint 取成果——避免每轮盲拉(acdm-backend/app/services/ba_report_sync_scheduler.py:1-11、59-72)。② 幂等:已同步步骤跳过。③ 超时有重试计数保护,_mark_step_timeout 用 step.ai_metadata.timeout_retry_count,达 MAX_TIMEOUT_RETRIES(默认 3)标 TMO_PERM 永久失败,否则 TIMEOUT + 计数 +1(acdm-backend/app/services/ba_report_sync_scheduler.py:229-271)。④ 单步异常 db.rollback() 后继续下一步,不让一个坏步骤拖垮整轮(acdm-backend/app/services/ba_report_sync_scheduler.py:209-214)。
速查表:admin_* 治理端点 + 审计写入点
admin_ 路由全集*(全部 /admin prefix,均 require_admin 守卫):
| 端点 | 方法 | 作用 | 降级开关 | Source |
|---|---|---|---|---|
/admin/logins | GET | 查登录历史 | — | acdm-backend/app/api/audit_router.py:24-40 |
/admin/apis | GET | 查 API 变更流水 | — | acdm-backend/app/api/audit_router.py:43-66 |
/admin/users | GET | 列用户(分页) | — | acdm-backend/app/api/admin_user_router.py:43-56 |
/admin/users/{sub}/promote | POST | 升级 admin(写审计) | — | acdm-backend/app/api/admin_user_router.py:59-82 |
/admin/users/{sub}/demote | POST | 降级(LastAdmin 保护) | — | acdm-backend/app/api/admin_user_router.py:85-110 |
/admin/users/{sub}/disable | POST | 禁用(Self 保护) | — | acdm-backend/app/api/admin_user_router.py:113-148 |
/admin/users/{sub}/enable | POST | 启用(幂等) | — | acdm-backend/app/api/admin_user_router.py:151-180 |
/admin/users/by-subs | POST | 批量反查 email(≤100) | — | acdm-backend/app/api/admin_user_router.py:183-202 |
/admin/audit | GET | 查升降级审计 | — | acdm-backend/app/api/admin_user_router.py:205-247 |
/admin/threads/all | GET | 跨用户列 thread | ACDM_ADMIN_THREAD_ACCESS_ENABLED | acdm-backend/app/api/admin_thread_router.py:77-140 |
/admin/threads/{id}/state | GET | 跨用户读 thread 全态 | ACDM_ADMIN_THREAD_ACCESS_ENABLED | acdm-backend/app/api/admin_thread_router.py:257-336 |
/admin/storage-events 等 | GET/POST | 列/详/可逆 revert | router 内三角色判断 | acdm-backend/app/api/admin_storage_events_router.py:167-346 |
/admin/trash/{projects,calendars} | GET/DELETE | 回收站列 + 永久删 | — | acdm-backend/app/api/admin_trash_router.py:76-145 |
审计写入点矩阵:
| 事件 | 触发位置 | 落表 | fail-safe | 受总开关 |
|---|---|---|---|---|
| 任意写请求 / admin GET | audit_log middleware | api_audit_log | ✓(warn 不抛) | ✓ |
| 登录成功 | /auth/callback | login_history | ✓ | ✓ |
| 登出 | /auth/logout | login_history(填 logged_out_at) | ✓ | ✓ |
| admin 升/降/启/停 | admin_user_router(同事务) | admin_audit_log | ✗(事务一致优先) | ✗ |
admin_audit_log刻意不走 fail-safe:它要的是"权限变更与审计原子一致",审计失败就该让整个事务回滚,而不是悄悄放过权限变更。这与api_audit_log的取舍正好相反。
Configuration
| Config | 默认 | 含义 | 影响 | Source |
|---|---|---|---|---|
ACDM_AUDIT_LOGGING_ENABLED | true | 审计写入总开关 | false/0/no 时停所有审计写,查询仍可用 | acdm-backend/app/services/audit_service.py:25-31 |
ACDM_ADMIN_THREAD_ACCESS_ENABLED | true | 跨用户 thread 访问开关 | false 时 /admin/threads/* 返 503 | acdm-backend/app/api/admin_thread_router.py:68-92 |
AEGRA_DATABASE_URL | (从 DATABASE_URL 衍生) | aegra DB 连接串 | 显式设则用,否则同实例换库名 | acdm-backend/app/api/admin_thread_router.py:46-65 |
BA_REPORT_SYNC_INTERVAL | 10 | scheduler 巡检间隔(秒) | 调大降负载、慢恢复 | acdm-backend/app/services/ba_report_sync_scheduler.py:27 |
BA_REPORT_TIMEOUT_MINUTES | 15 | 步骤超时阈值(分钟) | 超时标 error | acdm-backend/app/services/ba_report_sync_scheduler.py:30 |
BA_REPORT_MONITOR_CHECK_MINUTES | 2 | running 多久后开始巡检 | 防误判刚启动的步骤 | acdm-backend/app/services/ba_report_sync_scheduler.py:31 |
BA_REPORT_MAX_TIMEOUT_RETRIES | 3 | 超时最大重试 | 达上限标 TMO_PERM | acdm-backend/app/services/ba_report_sync_scheduler.py:32 |
SSE_MAX_HISTORY / SSE_HISTORY_TTL | 100 / 3600 | SSE 历史保留量/过期秒 | clear_expired 清理依据 | acdm-backend/app/services/sse_manager.py:27-30 |
安全相关:
ACDM_ADMIN_THREAD_ACCESS_ENABLED=false是合规应急开关——一旦发现跨用户读 thread 被滥用或 aegra DB 直连出问题,可立刻熔断而不下线整个 admin 区。改 .env 后需up -d --force-recreate(restart不重读 .env)。
Common Pitfalls / 实战 Tips
- 审计写不出来别慌:
record_*失败只 warn 返 None,业务请求照常 201/200。排查时 greprecord_api failed/api_audit middleware failed(acdm-backend/app/services/audit_service.py:123、acdm-backend/app/main.py:293)。 - admin 查不到别人对话先查开关:
/admin/threads/*返 503 多半是ACDM_ADMIN_THREAD_ACCESS_ENABLED=false,不是 bug(acdm-backend/app/api/admin_thread_router.py:88-92)。 messages_warning非 null 不是端点挂了:checkpoint 反序列化是 fail-soft,metadata+runs 仍可信,只是 messages/artifacts/todos 拿不到——LangGraph 升级后最常见(acdm-backend/app/api/admin_thread_router.py:184-187)。- error_code 列只有 VARCHAR(8):scheduler 写错误码必须缩写(
AE_ERR/AE_TMO/TMO_PERM/UNRECOV),写全称会被截断(acdm-backend/app/services/ba_report_sync_scheduler.py:177-178)。 - background_executor 已弃用:看到它的日志
[BackgroundExecutor]别按它排查 BA 流程,活跃路径是 SSE direct-to-LangGraph + sync_scheduler(acdm-backend/app/services/background_executor.py:1-16)。 - SSE queue 满会丢事件:
queue.put_nowait满则 warn 丢弃不阻塞 push;前端靠Last-Event-ID重连补回(acdm-backend/app/services/sse_manager.py:132-136)。
References
acdm-backend/app/main.py:29-85—_should_record_api_audit+_maybe_record_api_audit(审计判定与独立 session 写入)acdm-backend/app/main.py:222-295— middleware 注册顺序 +audit_logmiddleware 本体acdm-backend/app/main.py:140-211—_lifespan:拉起/关闭 background_executor + sync_scheduleracdm-backend/app/services/audit_service.py:25-218— fail-safe 写入 + 总开关 + 游标分页查询acdm-backend/app/services/audit_log_service.py:20-50— admin 升降级审计(同事务,只 flush)acdm-backend/app/api/admin_thread_router.py:46-336— 跨用户 thread:aegra DB 衍生连接 + fail-soft checkpoint 反序列化acdm-backend/app/api/admin_user_router.py:59-263— 升降级/启停 +/admin/audit+_actor_as_user值载体技巧acdm-backend/app/services/sse_manager.py:33-238— SSE 多连接 + PG 持久化 + 回放acdm-backend/app/api/ba_report_router.py:243-328— SSE 消费端(Last-Event-ID 回放 + 心跳)acdm-backend/app/services/ba_report_sync_scheduler.py:38-303— runs.get() 分层巡检 + 超时重试保护acdm-backend/app/model/models.py:329-665— 三张审计表 ORM 定义
Related Pages
| Page | Relationship |
|---|---|
| 鉴权与授权双层体系 | 本章 require_admin / JWT 解析复用该章鉴权设施 |
| acdm-backend 控制面架构 | 本章 middleware 链与 lifespan 是该章控制面的一部分 |
| Aegra 运行时与 LangGraph | 本章跨用户治理需绕开该章 Aegra 的 user_id SQL 硬过滤 |
| BA 专家报告工作流 | 本章 SSE / scheduler 服务于该章 BA 报告任务 |
| 可逆 DB 操作与存储层 | 本章 /admin/storage-events revert 基于该章可逆事件 |
| 项目管理与资源授权 | 本章 /admin/trash 永久删除该章软删的项目/日历 |