Skip to content

BA 专家报告工作流

本章目标:

  1. 看懂 BA 现状分析报告的 8 步流程定义、step→skill 映射、状态机
  2. 弄清"前端 SSE 驱动 + 后端轻状态管理"这套去后台编排架构,以及 interrupt/confirm/regenerate/retry 的交互契约
  3. 理解 sync_scheduler 孤儿/僵尸/超时三态修复,及 step_8 确认后 P2-P3 内容回写 GitLab Wiki 的逻辑

TL;DR

BA 专家报告把一份 ERP 录屏 + 材料,经 8 个 Skill 步骤(STEP_DEFS,ba_report/constants.py:8)滚动生成一份华为 4A 口径的现状分析报告。架构经历过一次大翻转:原来是后端 LangGraph Pipeline 编排(pipeline.py,已 deprecated 2026-05-17),现在是纯前端 SSE direct-to-LangGraph——前端拿 execute_config 直连 Aegra /runs/stream,后端 BAReportService 只做轻状态机(start/finish/fail/confirm/regenerate/retry)+ 文件回收 + checkpoint 兜底。每步产物经用户确认(confirm_step)才推下一步,可基于反馈重新生成(regenerate_step)。BAReportSyncScheduler 每 10s 用 runs.get() 分层判断,补救用户关页面留下的孤儿步骤。step_8 确认后把 P2-P3 来源内容回写项目 GitLab Wiki(WikiSyncService)。

Overview(为什么是"前端 SSE 驱动"而不是后端编排)

BA 报告要解决一个矛盾:单步分析是 5-10 分钟级的 LLM 长任务,但用户必须逐步介入审阅。如果纯后端编排(早期 pipeline.py 的 8 节点 LangGraph + interrupt_before),会撞上三个问题:

  1. 长任务 SSE 必须经后端中转——后端要把 Aegra 的 SSE 再转发一次,多一跳延迟 + 后端持有大量长连接。
  2. interrupt 恢复链路脆弱——pipeline.pyinterrupt_before 在每个 confirm 节点挂起,用户确认后要 update_state + runs.create(input={}) 从 interrupt 点恢复,这套两步式恢复(langgraph_client.py:204 confirm_ba_report_step)在 Aegra 上易超时。
  3. 用户关页面 = 任务悬空——agent 在 Aegra 侧继续跑完,但没人收尾。

当前方案(ba_report_service.py:8-15 模块 docstring 明确"纯前端 Stream 模式"):前端直接连 Aegra SSE 看 token 流,后端退化成状态登记处 + 兜底修复器。每步生命周期由前端 4 个回调驱动:start(标 running)→ 前端连 SSE → finish(存内容,转 waiting_confirm)/fail(转 error)→ confirm(转 confirmed,返回下一步 config)。后端 sync_scheduler 是安全网:用户关页面时,agent 还在 Aegra 跑,scheduler 检测到 runs.get() 报 success 就替用户补做 finish

Architecture:三方参与者

组件职责入口文件Source
BAReportService8 步轻状态机:create/start/finish/fail/confirm/regenerate/retry,文件回收,checkpoint 兜底acdm-backend/app/services/ba_report_service.pyacdm-backend/app/services/ba_report_service.py:61
ba_report_router20+ REST 端点,require_report_access 授权,创建时 require_project_accessacdm-backend/app/api/ba_report_router.pyacdm-backend/app/api/ba_report_router.py:99-995
STEP_DEFS / STEP_SKILL_MAP8 步定义 + step_id → ba-analysis-pack Skill 目录映射ba_report/constants.pyacdm-backend/app/services/ba_report/constants.py:8-36
build_step_prompt每步生成"直接执行不要澄清"的强约束 promptba_report/prompts.pyacdm-backend/app/services/ba_report/prompts.py:12-37
CheckpointService经 LangGraph SDK 读 Aegra state,文件传 Nextcloud,DB 同步ba_report/checkpoint.pyacdm-backend/app/services/ba_report/checkpoint.py:24
BAReportSyncScheduler与 acdm-backend 同进程的定时监控,孤儿/僵尸/超时三态修复ba_report_sync_scheduler.pyacdm-backend/app/services/ba_report_sync_scheduler.py:35
WikiSyncServicestep_8 后把 P2-P3 来源内容回写项目 GitLab Wikiba_report/wiki.pyacdm-backend/app/services/ba_report/wiki.py:24
useBaReportStream前端 SSE Hook:直连 Aegra /runs/stream,token 增量、reattach、心跳超时use-ba-report-stream.tsdeer-flow/frontend/src/hooks/use-ba-report-stream.ts:140
pipeline.py(已弃用)老的 8 节点 LangGraph + interrupt_before 编排pipelines/ba_report/pipeline.pydeer-flow/backend/packages/harness/deerflow/pipelines/ba_report/pipeline.py:1-33

8 步定义与 Skill 映射(constants.py:8-48)是整条流水线的"骨架"——每个 step_id 对应 ba-analysis-pack 下一个 Skill 目录,build_step_prompt 把它拼成给 lead_agent 的 human message:

step_id显示名Skill 目录(STEP_SKILL_MAP)关键产出
step_1基础信息data-preparation确认页 + 定界清单 + KB检索结果 + 关键词清单
step_2业务顶层架构architecture-designL1-L5 层级 + 业务全景图 .mmd
step_3业务场景分析scenario-restorationL4 流程 + KCP + 业务规则 + RACI
step_4业务对象清单object-identification7 类业务对象(MD/AD/BD/VD/CON/MG/RQ)
step_5业务对象与流程关系flow-analysis10 种对象关系 + CRUD 矩阵 + 流向图
step_6现状问题诊断problem-diagnosis8 类断点 + ERP 13 类规则缺失
step_7优化建议improvement-proposal流程/规则/绩效/高阶建议 + 路线图
step_8报告整合report-integration双轮精修定稿 + 附录 A-E + Wiki 回写触发

后端步骤状态机(单步生命周期):

Components / Subsystems

BAReportService:8 步轻状态机

职责:管理 8 个 BAReportStep 行的状态流转,不做 LLM 编排(那是前端连 Aegra 干的)。

关键方法:

  • create_report()(ba_report_service.py:69):建一条 Report(type=ba_analysis)+ 8 个 pending step。注意 knowledge_sync 不是用户确认步骤(constants.py:17 注释:step_8 确认后自动触发)。实际创建走 router 的 create_report(ba_report_router.py:99),前端先在 Gateway 建 thread + 上传文件,再带 thread_id 调本端点,thread_id 存进 report.ai_thread_id
  • start_step()(:312):宽松乐观锁——WHERE status IN ('pending','error') 才更新为 running 并写 started_at。前端连 SSE 前调,started_at 是超时检测基线。rowcount==0 说明已是 running/waiting_confirm,返回 False 让前端不重复连。
  • finish_step()(:547):前端 SSE 流结束后调。幂等:已 waiting_confirm 直接返回不重存版本。核心是从 checkpoint 渐进式轮询取文件(delay 从 100ms 翻倍到 2s 上限,最多 2 分钟,:602-633),上传 Nextcloud,存 chapter_md_content + 章节版本,转 waiting_confirm,最后 asyncio.create_task fire-and-forget 异步生成润色建议(_generate_polish_suggestions,:1631)。
  • confirm_step()(:374):waiting_confirm → confirmed。DB 提交后 best-effort 通知 LangGraph confirm_ba_report_step(失败不回滚,:430-443),再把下一步置 pending 并构建 next_step_config(_build_next_step_config,:468)返回给前端,前端拿到后直接连下一步 SSE——这是去后台编排的关键。
  • regenerate_step()(:872):waiting_confirm → pending,清空内容保留版本历史,把 feedbacks 拼进 human message 前缀("【用户反馈】...请根据以上反馈重新执行"),返回 execute_config 让前端重连 SSE。有 running 的 regeneration 则复用(幂等,:898-904)。
  • retry_step()(:1037):error → pending,retry_count ≤ 3,调 resume_ba_report_from_step
  • cancel_report()(:1013):调 cancel_ba_report_thread 杀 Aegra run,非 confirmed 步骤全标 error。

实现要点:Service 方法不显式 commit 由路由层 get_db 统一提交(模块 docstring 事务约定),但 finish/confirm/regenerate 等关键路径内部仍显式 await self.db.commit() 以保证状态及时落地(因为后续要 best-effort 通知 LangGraph,DB 必须先固化)。

execute_config / next_step_config:控制面→引擎的执行契约

_build_next_step_config()(ba_report_service.py:468)和 router 的 get_execute_config(ba_report_router.py:331)产出同构的执行配置——这是 acdm-backend 喂给前端、前端再喂给 Aegra 的"启动指令包":

python
# 摘自 ba_report_service.py:511-543
{
  "thread_id": thread_id,            # 必须真实 ai_thread_id,不能 fallback report.id
  "assistant_id": "lead_agent",
  "request": {
    "input": {"messages": [{"type": "human", "content": build_step_prompt(...)}]},
    "config": {"recursion_limit": 1000},
    "stream_mode": ["messages-tuple", "values", "custom"],
    "stream_resumable": True, "on_disconnect": "continue",
  },
  "context": {"agent_name": "ba-expert", "model_name": "kimi-k2.6", ...},
  "auth_headers": {"X-Auth-User-Id": user_sub, "X-Auth-User-Role": user_role},
}

stream_resumable: True + on_disconnect: "continue" 是关键:前端断线 agent 不停,可 reattach(use-ba-report-stream.ts:770 reattach);ai_thread_id 缺失则返回 None,前端必须等报告加载完才能连(:487-492)。

CheckpointService:状态兜底层

职责:当前端没收尾(关页面/网络断)时,从 Aegra checkpoint 把成果同步回 acdm DB。

关键方法:

  • fetch_state_from_checkpoint()(checkpoint.py:32):用 LangGraph SDK threads.get_state,定时任务用系统账号 _auth_headers(is_admin=True)。Thread 不存在(被清理)是预期场景,用 DEBUG 不报警。
  • sync_step_from_checkpoint()(:141):五重幂等——confirmed 跳过、waiting_confirm 且已有 nc:// 路径跳过、按 status/content/files/suggestions 四个维度判 needs_sync、status 回归保护(waiting_confirm 不被降级回 running,:273-277)、suggestions 写前二次查 DB 防并发。
  • upload_files_to_nextcloud()(:72):checkpoint 里是 inline content,上传到 {project_nc_path}/BA报告/{report_id}/{step_id}/,DB 存 nc:// 路径而非内容。

WikiSyncService:知识库回写

职责:step_8 报告整合确认后,把高置信度内容沉淀进项目知识库。

sync_to_wiki()(wiki.py:30)按 source_level 过滤——只回写 P1-P5,P6(LLM 推断)不写(:62-65,实际 deer-flow 侧 knowledge_sync.py:55 进一步收紧到只写 P2-P3)。回写路径 {project.gitlab_wiki_path}/BA报告/{清洗后场景名}.md,按步骤显示名追加 ## 章节,章节级幂等:if section_header in full_content: skip(:97-99)。GitLab commit 失败不抛异常,塞进 errors[] 返回(回写是 best-effort,不阻塞报告完成)。

这条回写由 POST /ba-reports/{id}/sync-wiki(ba_report_router.py:977)触发。注意 deer-flow 侧的 knowledge_sync_node(knowledge_sync.py:73)是这条 API 的旧调用方,已随 pipeline.py deprecated(knowledge_sync.py:1-5);当前活跃路径下回写由前端/编排在 step_8 确认后调用该 API。

Data Flow:一个步骤的完整生命周期

每步关键步骤拆解:

  1. start:前端 POST /start,start_step 乐观锁把 pending/error→running 并写 started_at(超时基线)。
  2. stream:前端 streamLangGraph 直连 Aegra /api/langgraph/threads/{tid}/runs/stream(经 proxy 鉴权),消费 messages-tuple(token 增量,use-ba-report-stream.ts:452)、values(完整 messages 快照)、custom(task 进度)、end/error
  3. save-run-id:从 SSE 响应头 Content-Location 正则提 run_id 回存(use-ba-report-stream.ts:400-412),scheduler 用它精确 runs.get()
  4. finish:event:end 后前端从 langGraphMessageswrite_file 文件名,POST /finish。后端渐进轮询 checkpoint 取文件、传 NC、存版本、转 waiting_confirm,fire-and-forget 生成润色建议。
  5. confirm/regenerate/retry:见状态机。confirm 返回 next_step_config 让前端无缝连下一步,实现"前端驱动的链式 8 步"。

Implementation Details

sync_scheduler:runs.get() 分层判断(v1.6 不再盲拉 checkpoint)

老版本 scheduler 每轮盲拉 checkpoint(重)。v1.6(ba_report_sync_scheduler.py:4-11 docstring)改成两层判断——先用轻量 runs.get()(7 字段)判 agent 死活,只有 success 才拉 checkpoint:

python
# 摘自 ba_report_sync_scheduler.py:151-199(精简)
if run_status == "success":
    state = await service._fetch_state_from_checkpoint(...)
    await service.handle_step_completed_from_checkpoint(...)  # 孤儿补 finish
elif run_status in ("error","timeout","interrupted","cancelled"):
    # error_code VARCHAR(8),缩写 AE_ERR/AE_TMO/AE_INT/AE_CAN
    update(...).values(status="error", error_code=_code_map[run_status])
else:  # running/pending 或无 run 记录
    if step.started_at < timeout_threshold:
        await self._mark_step_timeout(db, step)  # 含 ≤3 次重试保护

三态对应:

  • 孤儿(orphan):用户关页面,agent 在 Aegra 已跑完(runs.get()=success),scheduler 替用户补 finish_step 的工作(handle_step_completed_from_checkpoint,ba_report_service.py:709)。
  • 僵尸/失败:Aegra 报 error/timeout/interrupted/cancelled,标 error(error_code 缩写因列宽 VARCHAR(8))。
  • 超时:Aegra 还 running 但超 TIMEOUT_THRESHOLD_MINUTES(默认 15min),_mark_step_timeout 标 error,timeout_retry_countMAX_TIMEOUT_RETRIES(3)否则 TMO_PERM 永久失败。

scheduler 在 FastAPI lifespan 里 asyncio.create_task(run_sync_scheduler(), name="ba_report_sync_scheduler") 启动(acdm-backend/app/main.py:191),与 acdm-backend 同进程,每 SYNC_INTERVAL_SECONDS(默认 10s)一轮。

BAReportService 还有一组人工触发的修复端点(管理员):detect_timeout_steps/fix_timeout_steps(ba_report_service.py:1259-1380,running 超 600s 标 error)和 fix_stuck_steps(:1382,running 无 started_at→重置 pending、pending 超 1 小时→标 error),都用 expected_status 乐观锁防竞态。

已废弃的 pipeline.py:理解架构翻转

pipelines/ba_report/pipeline.py:1-5nodes/knowledge_sync.py:1-5 都标了 .. deprecated:: 2026-05-17,import 即 warnings.warn(DeprecationWarning)。它原本是 8 节点 LangGraph(data-preparation → confirm-data-preparation → ...),用 interrupt_before=[confirm-*](pipeline.py:232-241)在每个确认节点挂起,should_continue 条件边按 user_action(continue/regenerate/retry)决定走向。读这段代码能理解为什么现在 confirm_ba_report_step(langgraph_client.py:204)还保留 update_state(user_action=continue) + runs.wait() 两步式——它是为兼容这套老 interrupt 模型留的 best-effort 通知,DB 才是真相源(确认失败不回滚 DB)。

Configuration

Config默认值含义Source
BA_REPORT_SYNC_INTERVAL10(秒)scheduler 轮询间隔acdm-backend/app/services/ba_report_sync_scheduler.py:27
BA_REPORT_TIMEOUT_MINUTES15(分)running 超此标 erroracdm-backend/app/services/ba_report_sync_scheduler.py:30
BA_REPORT_MONITOR_CHECK_MINUTES2(分)running 超此才纳入监控acdm-backend/app/services/ba_report_sync_scheduler.py:31
BA_REPORT_MAX_TIMEOUT_RETRIES3超时重试上限,超则 TMO_PERMacdm-backend/app/services/ba_report_sync_scheduler.py:32
STEP_TIMEOUT_SECONDS600(秒)人工修复端点超时阈值acdm-backend/app/services/ba_report/constants.py:51
MAX_CONTENT_LENGTH100KBupdate_content 编辑内容上限acdm-backend/app/services/ba_report/constants.py:54
recursion_limit1000喂 Aegra 的 agent 递归上限acdm-backend/app/services/ba_report_service.py:524
ACDM_BACKEND_URLhttp://localhost:8002(已弃用 knowledge_sync_node 回调 acdm URL)deer-flow/backend/packages/harness/deerflow/pipelines/ba_report/nodes/knowledge_sync.py:70
重试上限3retry_step retry_count 上限acdm-backend/app/services/ba_report_service.py:1044

Common Pitfalls / 实战 Tips

  • ai_thread_id 不能用 report.id 兜底(ba_report_service.py:485-492 注释明确):report.id 是 BA 报告 ID,不是 LangGraph thread。缺失时返回 None,前端必须等报告加载完才能连 SSE。
  • error_code 列只有 VARCHAR(8):scheduler 把 Aegra 状态缩写成 AE_ERR/AE_TMO/AE_INT/AE_CAN/TMO_PERM/UNRECOV(ba_report_sync_scheduler.py:177-178),写完整词会被截断。
  • confirm 通知 LangGraph 是 best-effort:confirm_step 里 DB 已 commit 后才通知,通知失败只 log error 不回滚(ba_report_service.py:438-443)——DB 状态是真相源,LangGraph 同步是尽力而为。
  • finish 幂等靠 for_update 行锁 + 状态检查:已 waiting_confirm 直接返回不重存版本(:576-585);scheduler 的 handle_step_completed_from_checkpoint 也检查 status in (waiting_confirm,confirmed) 跳过(:718),前端和 scheduler 双路径不会重复 finish。
  • 润色建议是 fire-and-forget:finish_step 末尾 asyncio.create_task(_generate_polish_suggestions) 不阻塞返回,LLM 失败/JSON 截断都不影响 finish(:672-675:1631);_repair_truncated_json(:1592)还会尝试补全截断的 JSON。建议生成有二次幂等检查(LLM 期间可能并发已写)。
  • status 同步不回归:sync_step_from_checkpoint 显式拦截 waiting_confirm → running 的降级(checkpoint.py:273-277),防 checkpoint 滞后把已完成步骤打回。
  • 读 deprecated 代码要看顶部 docstring:pipeline.py/knowledge_sync.py 仍在仓库里但 2026-05-17 起不在活跃路径,改它们不会影响线上行为。

References

  • acdm-backend/app/services/ba_report_service.py:61-1001 — BAReportService 8 步状态机(本章主源:create/start/finish/confirm/regenerate/retry)
  • acdm-backend/app/api/ba_report_router.py:99-995 — BA 报告 20+ REST 端点 + execute-config
  • acdm-backend/app/services/ba_report/constants.py:8-54 — STEP_DEFS / STEP_SKILL_MAP / 超时常量
  • acdm-backend/app/services/ba_report/prompts.py:12-366 — 8 步"直接执行不澄清"强约束 prompt
  • acdm-backend/app/services/ba_report/checkpoint.py:24-352 — checkpoint 读取 + Nextcloud 上传 + 五重幂等同步
  • acdm-backend/app/services/ba_report/wiki.py:24-148 — P1-P5 章节级幂等回写 GitLab Wiki
  • acdm-backend/app/services/ba_report_sync_scheduler.py:35-303 — runs.get() 分层孤儿/僵尸/超时三态修复
  • acdm-backend/app/services/langgraph_client.py:204-472 — confirm/resume/cancel 对 Aegra 的两步式 best-effort 通知
  • deer-flow/frontend/src/hooks/use-ba-report-stream.ts:140-918 — 前端 SSE Hook(token 增量/reattach/心跳超时)
  • deer-flow/backend/packages/harness/deerflow/pipelines/ba_report/pipeline.py:1-243 — 已弃用的 8 节点 LangGraph 编排(架构翻转参考)
PageRelationship
Aegra 运行时与 LangGraph本章 lead_agent ba-expert 跑在该章 Aegra 运行时,checkpoint state 由其管理
控制面与引擎的耦合契约本章 execute_config / two-step confirm 是该章控制面→引擎契约的具体实例
acdm-backend 控制面架构本章 BAReportService/Repository 分层遵循该章控制面分层规范
SKILL 技能系统本章 8 步映射到 ba-analysis-pack 的 8 个 Skill 目录(STEP_SKILL_MAP)
知识库与 Wiki 协编本章 step_8 后 P2-P3 回写复用该章 GitLabClient + project.gitlab_wiki_path
审计 SSE 与后台任务本章 sync_scheduler 与该章后台任务同进程 lifespan 启动
ERP 录屏分析与 PRD 流水线本章 recording_result_id 输入来自该章录屏分析产物
AI 消息流与 SSE 基础设施本章前端 SSE Hook 复用该章 sse-parser / ai-task substrate

公司内部参考 · 由 claude-wiki-gen 基于源码自动生成的二次分析