主题
BA 专家报告工作流
本章目标:
- 看懂 BA 现状分析报告的 8 步流程定义、step→skill 映射、状态机
- 弄清"前端 SSE 驱动 + 后端轻状态管理"这套去后台编排架构,以及 interrupt/confirm/regenerate/retry 的交互契约
- 理解 sync_scheduler 孤儿/僵尸/超时三态修复,及 step_8 确认后 P2-P3 内容回写 GitLab Wiki 的逻辑
TL;DR
BA 专家报告把一份 ERP 录屏 + 材料,经 8 个 Skill 步骤(STEP_DEFS,ba_report/constants.py:8)滚动生成一份华为 4A 口径的现状分析报告。架构经历过一次大翻转:原来是后端 LangGraph Pipeline 编排(pipeline.py,已 deprecated 2026-05-17),现在是纯前端 SSE direct-to-LangGraph——前端拿 execute_config 直连 Aegra /runs/stream,后端 BAReportService 只做轻状态机(start/finish/fail/confirm/regenerate/retry)+ 文件回收 + checkpoint 兜底。每步产物经用户确认(confirm_step)才推下一步,可基于反馈重新生成(regenerate_step)。BAReportSyncScheduler 每 10s 用 runs.get() 分层判断,补救用户关页面留下的孤儿步骤。step_8 确认后把 P2-P3 来源内容回写项目 GitLab Wiki(WikiSyncService)。
Overview(为什么是"前端 SSE 驱动"而不是后端编排)
BA 报告要解决一个矛盾:单步分析是 5-10 分钟级的 LLM 长任务,但用户必须逐步介入审阅。如果纯后端编排(早期 pipeline.py 的 8 节点 LangGraph + interrupt_before),会撞上三个问题:
- 长任务 SSE 必须经后端中转——后端要把 Aegra 的 SSE 再转发一次,多一跳延迟 + 后端持有大量长连接。
- interrupt 恢复链路脆弱——
pipeline.py用interrupt_before在每个 confirm 节点挂起,用户确认后要update_state+runs.create(input={})从 interrupt 点恢复,这套两步式恢复(langgraph_client.py:204confirm_ba_report_step)在 Aegra 上易超时。 - 用户关页面 = 任务悬空——agent 在 Aegra 侧继续跑完,但没人收尾。
当前方案(ba_report_service.py:8-15 模块 docstring 明确"纯前端 Stream 模式"):前端直接连 Aegra SSE 看 token 流,后端退化成状态登记处 + 兜底修复器。每步生命周期由前端 4 个回调驱动:start(标 running)→ 前端连 SSE → finish(存内容,转 waiting_confirm)/fail(转 error)→ confirm(转 confirmed,返回下一步 config)。后端 sync_scheduler 是安全网:用户关页面时,agent 还在 Aegra 跑,scheduler 检测到 runs.get() 报 success 就替用户补做 finish。
Architecture:三方参与者
| 组件 | 职责 | 入口文件 | Source |
|---|---|---|---|
BAReportService | 8 步轻状态机:create/start/finish/fail/confirm/regenerate/retry,文件回收,checkpoint 兜底 | acdm-backend/app/services/ba_report_service.py | acdm-backend/app/services/ba_report_service.py:61 |
ba_report_router | 20+ REST 端点,require_report_access 授权,创建时 require_project_access | acdm-backend/app/api/ba_report_router.py | acdm-backend/app/api/ba_report_router.py:99-995 |
STEP_DEFS / STEP_SKILL_MAP | 8 步定义 + step_id → ba-analysis-pack Skill 目录映射 | ba_report/constants.py | acdm-backend/app/services/ba_report/constants.py:8-36 |
build_step_prompt | 每步生成"直接执行不要澄清"的强约束 prompt | ba_report/prompts.py | acdm-backend/app/services/ba_report/prompts.py:12-37 |
CheckpointService | 经 LangGraph SDK 读 Aegra state,文件传 Nextcloud,DB 同步 | ba_report/checkpoint.py | acdm-backend/app/services/ba_report/checkpoint.py:24 |
BAReportSyncScheduler | 与 acdm-backend 同进程的定时监控,孤儿/僵尸/超时三态修复 | ba_report_sync_scheduler.py | acdm-backend/app/services/ba_report_sync_scheduler.py:35 |
WikiSyncService | step_8 后把 P2-P3 来源内容回写项目 GitLab Wiki | ba_report/wiki.py | acdm-backend/app/services/ba_report/wiki.py:24 |
useBaReportStream | 前端 SSE Hook:直连 Aegra /runs/stream,token 增量、reattach、心跳超时 | use-ba-report-stream.ts | deer-flow/frontend/src/hooks/use-ba-report-stream.ts:140 |
pipeline.py(已弃用) | 老的 8 节点 LangGraph + interrupt_before 编排 | pipelines/ba_report/pipeline.py | deer-flow/backend/packages/harness/deerflow/pipelines/ba_report/pipeline.py:1-33 |
8 步定义与 Skill 映射(constants.py:8-48)是整条流水线的"骨架"——每个 step_id 对应 ba-analysis-pack 下一个 Skill 目录,build_step_prompt 把它拼成给 lead_agent 的 human message:
| step_id | 显示名 | Skill 目录(STEP_SKILL_MAP) | 关键产出 |
|---|---|---|---|
| step_1 | 基础信息 | data-preparation | 确认页 + 定界清单 + KB检索结果 + 关键词清单 |
| step_2 | 业务顶层架构 | architecture-design | L1-L5 层级 + 业务全景图 .mmd |
| step_3 | 业务场景分析 | scenario-restoration | L4 流程 + KCP + 业务规则 + RACI |
| step_4 | 业务对象清单 | object-identification | 7 类业务对象(MD/AD/BD/VD/CON/MG/RQ) |
| step_5 | 业务对象与流程关系 | flow-analysis | 10 种对象关系 + CRUD 矩阵 + 流向图 |
| step_6 | 现状问题诊断 | problem-diagnosis | 8 类断点 + ERP 13 类规则缺失 |
| step_7 | 优化建议 | improvement-proposal | 流程/规则/绩效/高阶建议 + 路线图 |
| step_8 | 报告整合 | report-integration | 双轮精修定稿 + 附录 A-E + Wiki 回写触发 |
后端步骤状态机(单步生命周期):
Components / Subsystems
BAReportService:8 步轻状态机
职责:管理 8 个 BAReportStep 行的状态流转,不做 LLM 编排(那是前端连 Aegra 干的)。
关键方法:
create_report()(ba_report_service.py:69):建一条Report(type=ba_analysis)+ 8 个 pending step。注意knowledge_sync不是用户确认步骤(constants.py:17注释:step_8 确认后自动触发)。实际创建走 router 的create_report(ba_report_router.py:99),前端先在 Gateway 建 thread + 上传文件,再带thread_id调本端点,thread_id 存进report.ai_thread_id。start_step()(:312):宽松乐观锁——WHERE status IN ('pending','error')才更新为 running 并写started_at。前端连 SSE 前调,started_at是超时检测基线。rowcount==0说明已是 running/waiting_confirm,返回 False 让前端不重复连。finish_step()(:547):前端 SSE 流结束后调。幂等:已 waiting_confirm 直接返回不重存版本。核心是从 checkpoint 渐进式轮询取文件(delay从 100ms 翻倍到 2s 上限,最多 2 分钟,:602-633),上传 Nextcloud,存chapter_md_content+ 章节版本,转 waiting_confirm,最后asyncio.create_taskfire-and-forget 异步生成润色建议(_generate_polish_suggestions,:1631)。confirm_step()(:374):waiting_confirm → confirmed。DB 提交后 best-effort 通知 LangGraphconfirm_ba_report_step(失败不回滚,:430-443),再把下一步置 pending 并构建next_step_config(_build_next_step_config,:468)返回给前端,前端拿到后直接连下一步 SSE——这是去后台编排的关键。regenerate_step()(:872):waiting_confirm → pending,清空内容保留版本历史,把feedbacks拼进 human message 前缀("【用户反馈】...请根据以上反馈重新执行"),返回execute_config让前端重连 SSE。有 running 的 regeneration 则复用(幂等,:898-904)。retry_step()(:1037):error → pending,retry_count≤ 3,调resume_ba_report_from_step。cancel_report()(:1013):调cancel_ba_report_thread杀 Aegra run,非 confirmed 步骤全标 error。
实现要点:Service 方法不显式 commit 由路由层 get_db 统一提交(模块 docstring 事务约定),但 finish/confirm/regenerate 等关键路径内部仍显式 await self.db.commit() 以保证状态及时落地(因为后续要 best-effort 通知 LangGraph,DB 必须先固化)。
execute_config / next_step_config:控制面→引擎的执行契约
_build_next_step_config()(ba_report_service.py:468)和 router 的 get_execute_config(ba_report_router.py:331)产出同构的执行配置——这是 acdm-backend 喂给前端、前端再喂给 Aegra 的"启动指令包":
python
# 摘自 ba_report_service.py:511-543
{
"thread_id": thread_id, # 必须真实 ai_thread_id,不能 fallback report.id
"assistant_id": "lead_agent",
"request": {
"input": {"messages": [{"type": "human", "content": build_step_prompt(...)}]},
"config": {"recursion_limit": 1000},
"stream_mode": ["messages-tuple", "values", "custom"],
"stream_resumable": True, "on_disconnect": "continue",
},
"context": {"agent_name": "ba-expert", "model_name": "kimi-k2.6", ...},
"auth_headers": {"X-Auth-User-Id": user_sub, "X-Auth-User-Role": user_role},
}stream_resumable: True + on_disconnect: "continue" 是关键:前端断线 agent 不停,可 reattach(use-ba-report-stream.ts:770 reattach);ai_thread_id 缺失则返回 None,前端必须等报告加载完才能连(:487-492)。
CheckpointService:状态兜底层
职责:当前端没收尾(关页面/网络断)时,从 Aegra checkpoint 把成果同步回 acdm DB。
关键方法:
fetch_state_from_checkpoint()(checkpoint.py:32):用 LangGraph SDKthreads.get_state,定时任务用系统账号_auth_headers(is_admin=True)。Thread 不存在(被清理)是预期场景,用 DEBUG 不报警。sync_step_from_checkpoint()(:141):五重幂等——confirmed 跳过、waiting_confirm 且已有 nc:// 路径跳过、按 status/content/files/suggestions 四个维度判needs_sync、status 回归保护(waiting_confirm 不被降级回 running,:273-277)、suggestions 写前二次查 DB 防并发。upload_files_to_nextcloud()(:72):checkpoint 里是 inline content,上传到{project_nc_path}/BA报告/{report_id}/{step_id}/,DB 存nc://路径而非内容。
WikiSyncService:知识库回写
职责:step_8 报告整合确认后,把高置信度内容沉淀进项目知识库。
sync_to_wiki()(wiki.py:30)按 source_level 过滤——只回写 P1-P5,P6(LLM 推断)不写(:62-65,实际 deer-flow 侧 knowledge_sync.py:55 进一步收紧到只写 P2-P3)。回写路径 {project.gitlab_wiki_path}/BA报告/{清洗后场景名}.md,按步骤显示名追加 ## 章节,章节级幂等:if section_header in full_content: skip(:97-99)。GitLab commit 失败不抛异常,塞进 errors[] 返回(回写是 best-effort,不阻塞报告完成)。
这条回写由 POST /ba-reports/{id}/sync-wiki(ba_report_router.py:977)触发。注意 deer-flow 侧的 knowledge_sync_node(knowledge_sync.py:73)是这条 API 的旧调用方,已随 pipeline.py deprecated(knowledge_sync.py:1-5);当前活跃路径下回写由前端/编排在 step_8 确认后调用该 API。
Data Flow:一个步骤的完整生命周期
每步关键步骤拆解:
- start:前端
POST /start,start_step乐观锁把 pending/error→running 并写started_at(超时基线)。 - stream:前端
streamLangGraph直连 Aegra/api/langgraph/threads/{tid}/runs/stream(经 proxy 鉴权),消费messages-tuple(token 增量,use-ba-report-stream.ts:452)、values(完整 messages 快照)、custom(task 进度)、end/error。 - save-run-id:从 SSE 响应头
Content-Location正则提 run_id 回存(use-ba-report-stream.ts:400-412),scheduler 用它精确runs.get()。 - finish:
event:end后前端从langGraphMessages提write_file文件名,POST /finish。后端渐进轮询 checkpoint 取文件、传 NC、存版本、转 waiting_confirm,fire-and-forget 生成润色建议。 - confirm/regenerate/retry:见状态机。confirm 返回
next_step_config让前端无缝连下一步,实现"前端驱动的链式 8 步"。
Implementation Details
sync_scheduler:runs.get() 分层判断(v1.6 不再盲拉 checkpoint)
老版本 scheduler 每轮盲拉 checkpoint(重)。v1.6(ba_report_sync_scheduler.py:4-11 docstring)改成两层判断——先用轻量 runs.get()(7 字段)判 agent 死活,只有 success 才拉 checkpoint:
python
# 摘自 ba_report_sync_scheduler.py:151-199(精简)
if run_status == "success":
state = await service._fetch_state_from_checkpoint(...)
await service.handle_step_completed_from_checkpoint(...) # 孤儿补 finish
elif run_status in ("error","timeout","interrupted","cancelled"):
# error_code VARCHAR(8),缩写 AE_ERR/AE_TMO/AE_INT/AE_CAN
update(...).values(status="error", error_code=_code_map[run_status])
else: # running/pending 或无 run 记录
if step.started_at < timeout_threshold:
await self._mark_step_timeout(db, step) # 含 ≤3 次重试保护三态对应:
- 孤儿(orphan):用户关页面,agent 在 Aegra 已跑完(
runs.get()=success),scheduler 替用户补finish_step的工作(handle_step_completed_from_checkpoint,ba_report_service.py:709)。 - 僵尸/失败:Aegra 报 error/timeout/interrupted/cancelled,标 error(error_code 缩写因列宽 VARCHAR(8))。
- 超时:Aegra 还 running 但超
TIMEOUT_THRESHOLD_MINUTES(默认 15min),_mark_step_timeout标 error,timeout_retry_count≤MAX_TIMEOUT_RETRIES(3)否则TMO_PERM永久失败。
scheduler 在 FastAPI lifespan 里 asyncio.create_task(run_sync_scheduler(), name="ba_report_sync_scheduler") 启动(acdm-backend/app/main.py:191),与 acdm-backend 同进程,每 SYNC_INTERVAL_SECONDS(默认 10s)一轮。
BAReportService 还有一组人工触发的修复端点(管理员):detect_timeout_steps/fix_timeout_steps(ba_report_service.py:1259-1380,running 超 600s 标 error)和 fix_stuck_steps(:1382,running 无 started_at→重置 pending、pending 超 1 小时→标 error),都用 expected_status 乐观锁防竞态。
已废弃的 pipeline.py:理解架构翻转
pipelines/ba_report/pipeline.py:1-5 和 nodes/knowledge_sync.py:1-5 都标了 .. deprecated:: 2026-05-17,import 即 warnings.warn(DeprecationWarning)。它原本是 8 节点 LangGraph(data-preparation → confirm-data-preparation → ...),用 interrupt_before=[confirm-*](pipeline.py:232-241)在每个确认节点挂起,should_continue 条件边按 user_action(continue/regenerate/retry)决定走向。读这段代码能理解为什么现在 confirm_ba_report_step(langgraph_client.py:204)还保留 update_state(user_action=continue) + runs.wait() 两步式——它是为兼容这套老 interrupt 模型留的 best-effort 通知,DB 才是真相源(确认失败不回滚 DB)。
Configuration
| Config | 默认值 | 含义 | Source |
|---|---|---|---|
BA_REPORT_SYNC_INTERVAL | 10(秒) | scheduler 轮询间隔 | acdm-backend/app/services/ba_report_sync_scheduler.py:27 |
BA_REPORT_TIMEOUT_MINUTES | 15(分) | running 超此标 error | acdm-backend/app/services/ba_report_sync_scheduler.py:30 |
BA_REPORT_MONITOR_CHECK_MINUTES | 2(分) | running 超此才纳入监控 | acdm-backend/app/services/ba_report_sync_scheduler.py:31 |
BA_REPORT_MAX_TIMEOUT_RETRIES | 3 | 超时重试上限,超则 TMO_PERM | acdm-backend/app/services/ba_report_sync_scheduler.py:32 |
STEP_TIMEOUT_SECONDS | 600(秒) | 人工修复端点超时阈值 | acdm-backend/app/services/ba_report/constants.py:51 |
MAX_CONTENT_LENGTH | 100KB | update_content 编辑内容上限 | acdm-backend/app/services/ba_report/constants.py:54 |
recursion_limit | 1000 | 喂 Aegra 的 agent 递归上限 | acdm-backend/app/services/ba_report_service.py:524 |
ACDM_BACKEND_URL | http://localhost:8002 | (已弃用 knowledge_sync_node 回调 acdm URL) | deer-flow/backend/packages/harness/deerflow/pipelines/ba_report/nodes/knowledge_sync.py:70 |
| 重试上限 | 3 | retry_step retry_count 上限 | acdm-backend/app/services/ba_report_service.py:1044 |
Common Pitfalls / 实战 Tips
ai_thread_id不能用report.id兜底(ba_report_service.py:485-492注释明确):report.id 是 BA 报告 ID,不是 LangGraph thread。缺失时返回 None,前端必须等报告加载完才能连 SSE。- error_code 列只有 VARCHAR(8):scheduler 把 Aegra 状态缩写成
AE_ERR/AE_TMO/AE_INT/AE_CAN/TMO_PERM/UNRECOV(ba_report_sync_scheduler.py:177-178),写完整词会被截断。 - confirm 通知 LangGraph 是 best-effort:
confirm_step里 DB 已 commit 后才通知,通知失败只 log error 不回滚(ba_report_service.py:438-443)——DB 状态是真相源,LangGraph 同步是尽力而为。 - finish 幂等靠
for_update行锁 + 状态检查:已 waiting_confirm 直接返回不重存版本(:576-585);scheduler 的handle_step_completed_from_checkpoint也检查status in (waiting_confirm,confirmed)跳过(:718),前端和 scheduler 双路径不会重复 finish。 - 润色建议是 fire-and-forget:
finish_step末尾asyncio.create_task(_generate_polish_suggestions)不阻塞返回,LLM 失败/JSON 截断都不影响 finish(:672-675、:1631);_repair_truncated_json(:1592)还会尝试补全截断的 JSON。建议生成有二次幂等检查(LLM 期间可能并发已写)。 - status 同步不回归:
sync_step_from_checkpoint显式拦截 waiting_confirm → running 的降级(checkpoint.py:273-277),防 checkpoint 滞后把已完成步骤打回。 - 读 deprecated 代码要看顶部 docstring:
pipeline.py/knowledge_sync.py仍在仓库里但 2026-05-17 起不在活跃路径,改它们不会影响线上行为。
References
acdm-backend/app/services/ba_report_service.py:61-1001— BAReportService 8 步状态机(本章主源:create/start/finish/confirm/regenerate/retry)acdm-backend/app/api/ba_report_router.py:99-995— BA 报告 20+ REST 端点 + execute-configacdm-backend/app/services/ba_report/constants.py:8-54— STEP_DEFS / STEP_SKILL_MAP / 超时常量acdm-backend/app/services/ba_report/prompts.py:12-366— 8 步"直接执行不澄清"强约束 promptacdm-backend/app/services/ba_report/checkpoint.py:24-352— checkpoint 读取 + Nextcloud 上传 + 五重幂等同步acdm-backend/app/services/ba_report/wiki.py:24-148— P1-P5 章节级幂等回写 GitLab Wikiacdm-backend/app/services/ba_report_sync_scheduler.py:35-303— runs.get() 分层孤儿/僵尸/超时三态修复acdm-backend/app/services/langgraph_client.py:204-472— confirm/resume/cancel 对 Aegra 的两步式 best-effort 通知deer-flow/frontend/src/hooks/use-ba-report-stream.ts:140-918— 前端 SSE Hook(token 增量/reattach/心跳超时)deer-flow/backend/packages/harness/deerflow/pipelines/ba_report/pipeline.py:1-243— 已弃用的 8 节点 LangGraph 编排(架构翻转参考)
Related Pages
| Page | Relationship |
|---|---|
| Aegra 运行时与 LangGraph | 本章 lead_agent ba-expert 跑在该章 Aegra 运行时,checkpoint state 由其管理 |
| 控制面与引擎的耦合契约 | 本章 execute_config / two-step confirm 是该章控制面→引擎契约的具体实例 |
| acdm-backend 控制面架构 | 本章 BAReportService/Repository 分层遵循该章控制面分层规范 |
| SKILL 技能系统 | 本章 8 步映射到 ba-analysis-pack 的 8 个 Skill 目录(STEP_SKILL_MAP) |
| 知识库与 Wiki 协编 | 本章 step_8 后 P2-P3 回写复用该章 GitLabClient + project.gitlab_wiki_path |
| 审计 SSE 与后台任务 | 本章 sync_scheduler 与该章后台任务同进程 lifespan 启动 |
| ERP 录屏分析与 PRD 流水线 | 本章 recording_result_id 输入来自该章录屏分析产物 |
| AI 消息流与 SSE 基础设施 | 本章前端 SSE Hook 复用该章 sse-parser / ai-task substrate |