Skip to content

持久化与存储层

本章目标:

  1. 讲清楚 DeerFlow 为什么把「应用业务数据」与「LangGraph 执行状态」拆成两套独立的持久化体系,以及 SQLite 为何是默认后端。
  2. 拆解 deerflow.persistence 的引擎生命周期、声明式 Base、各 domain 仓储(run / thread_meta / user / feedback / run_event)与跨方言 JSON 匹配机制。
  3. 说明 runtime/store 异步 Provider 如何镜像 checkpointer 后端,以及一次 run event 写入与读取的完整数据流。

TL;DR

DeerFlow 有两条彼此独立的持久化链路:deerflow.persistence(SQLAlchemy 2.0 async ORM,管理 runs / threads_meta / users / feedback / run_events)与 LangGraph 自管的 checkpointer + store。两者由 config.yaml 的统一 database: 段(memory / sqlite / postgres)驱动,SQLite 模式下共用同一个 deerflow.db 文件并启用 WAL database_config.py:1-12。引擎在 Gateway 启动时初始化、关闭时释放;backend=memory 时引擎为 no-op,仓储回退到内存实现 engine.py:1-9。当前版本通过 Base.metadata.create_all 自动建表,Alembic 目录已就位但尚无版本脚本 engine.py:137-163

Overview

为什么需要这一层抽象?核心原因有三个。

第一,职责隔离。 LangGraph 的 checkpointer 负责图执行状态(消息历史、中间状态、断点恢复),它有自己的 schema 生命周期,由 LangGraph 自身管理,绝不能被 DeerFlow 触碰 migrations/env.py:1-6。而 DeerFlow 还有一批「应用自有数据」——run 元数据、线程归属、用户账号、反馈、run 事件流——这些需要被 SQL 查询、聚合统计、按 user 过滤,LangGraph checkpointer 不提供这种能力。所以 deerflow.persistence 是一套独立的 SQLAlchemy ORM,与 checkpointer 物理上可能共用一个文件/数据库,但逻辑上完全分离 persistence/init.py:1-9

第二,后端可替换且统一配置。 用户只配置一个 database.backend,系统自动处理物理分离:memory 用于开发(重启即丢)、sqlite 用于单机部署、postgres 用于多节点生产 database_config.py:40-57。SQLite 是默认后端,因为它零运维、单文件、随 make dev 即可用;通过在每个连接上启用 journal_mode=WAL + synchronous=NORMAL,SQLite 支持并发读 + 单写者不互相阻塞,使「checkpointer 与应用共用一个文件」在生产单机部署下也安全 engine.py:103-123

第三,跨方言可移植。 仓储代码必须在 SQLite 与 PostgreSQL 上行为一致。JSON 列上的 metadata 过滤在两种方言下语法差异极大(SQLite 用 json_extract/json_type,PG 用 ->>/json_typeof),json_compat.JsonMatch 把这层差异封装成一个可编译表达式,并对 key/value 做注入防护与 64 位整数边界校验 json_compat.py:60-91

Architecture

Source 列表:

  • persistence/engine.py — async 引擎/会话工厂生命周期 engine.py
  • persistence/base.py — 声明式 Base + 通用 to_dict() base.py
  • persistence/json_compat.py — 跨方言 JSON 匹配 json_compat.py
  • persistence/run/thread_meta/user/feedback/models/run_event.py — 各 domain ORM + 仓储
  • runtime/store/async_provider.pyprovider.py_sqlite_utils.py — LangGraph store 工厂
  • app/gateway/deps.py — 启动时装配所有仓储 deps.py:41-97

说明:user_id / thread_id / run_id 是逻辑关联键,代码层面以 String(64/36) 存储而非数据库外键约束(便于跨后端可移植与历史数据兼容),归属过滤在仓储层通过 resolve_user_id 强制执行。

Components / Subsystems

engine — 异步引擎生命周期

engine.py 是整层的入口。init_engine(backend, url, echo, pool_size, sqlite_dir) 根据后端创建 AsyncEngineasync_sessionmaker:memory 直接 no-op 返回(get_session_factory() 返回 None);sqlite 创建目录并注册一个 connect 事件监听器,在每个新连接上执行 PRAGMA journal_mode=WAL / synchronous=NORMAL / foreign_keys=ON(PRAGMA 是 per-connection 的,必须用监听器而非启动时一次性执行);postgres 启用 pool_pre_ping 与连接池 engine.py:57-135。建表走 Base.metadata.create_all;PostgreSQL 若库不存在会自动连 postgres 维护库 CREATE DATABASE 再重建引擎重试 engine.py:149-163。关键模块级符号:init_engine / init_engine_from_config / get_session_factory / get_engine / close_engine,均通过包级 __init__ 暴露 persistence/init.py:11-13

base — 声明式 Base

Base(DeclarativeBase) 是所有 DeerFlow ORM 模型的父类,提供通用 to_dict(exclude=...)(经 sa_inspect 遍历映射列)与 __repr__(),各模型无需自写序列化逻辑 base.py:16-40。LangGraph checkpointer 的表由此 Base 管理 base.py:7

json_compat — 跨方言 JSON 匹配

JsonMatch(ColumnElement)column[key] == value 编译为 SQLite 的 json_type/json_extract 或 PG 的 json_typeof/->>,做类型安全比较(区分 bool/int、NULL/缺键)json_compat.py:60-91。key 必须匹配 [A-Za-z0-9_-]+(防 JSONPath/SQL 注入),value 仅允许 None/bool/int/float/str 且整数限 64 位 json_compat.py:28-58。它被 ThreadMetaRepository.search 用于元数据过滤 thread_meta/sql.py:129-143

run domain — RunRow + RunRepository

RunRow(表 runs)以 run_id 为主键,记录 thread/user 归属、状态机(pending|running|success|error|timeout|interrupted)、模型名、metadata/kwargs JSON、token 用量分桶(lead_agent / subagent / middleware)及便捷字段(first_human_message / last_ai_message)run/model.py:13-49RunRepository 实现 RunStore 接口,每方法开短生命周期 session(后台 worker 可能跑数分钟,不持有跨执行连接);aggregate_tokens_by_thread 用单条 GROUP BY SQL 做模型维度 token 聚合 run/sql.py:1-6 run/sql.py:222-270

thread_meta domain — 归属与访问控制

ThreadMetaRow(表 threads_meta)以 thread_id 为主键,持 user_id 索引、display_name(标题)、statusmetadata_json thread_meta/model.py:13-23ThreadMetaStore 抽象接口规定所有方法接受三态 user_id(AUTO 从 contextvar 解析 / 显式 str / 显式 None 绕过归属过滤,仅迁移与 CLI 用)thread_meta/base.py:7-13check_access 有读宽松/写严格两套语义,严格模式下「已被删除的线程」不能被任何调用者重新命中,堵住删除幂等的跨用户漏洞 thread_meta/sql.py:78-106make_thread_store(sf, store) 工厂:有 session_factory 用 ThreadMetaRepository,否则回退 LangGraph store 包装的 MemoryThreadMetaStore thread_meta/init.py:26-39

user domain — 账号与 OAuth

UserRow(表 users)UUID 主键存为 36 字符串(跨后端可移植),邮箱唯一,system_role(admin|user)用纯字符串避免新角色引发 ALTER TABLE,并用部分唯一索引 idx_users_oauth_identity 保证一对 (provider, oauth_id) 一个账号、留 NULL/NULL 行不受约束以兼容密码账号 user/model.py:22-59。它与其它表共用同一引擎/连接池/schema 初始化路径 user/model.py:1-10

feedback domain — 评分与统计

FeedbackRow(表 feedback)以 (thread_id, run_id, user_id) 唯一约束保证一用户对一 run 只有一条反馈,rating 仅 +1/-1 feedback/model.py:13-32FeedbackRepository.upsert 按唯一键 select-then-update/insert;aggregate_by_runCASE 在数据库侧统计正负计数 feedback/sql.py:125-163 feedback/sql.py:203-217

run_event domain — 事件流

RunEventRow(表 run_events)自增 id,以 seq 在线程内单调递增,UniqueConstraint(thread_id, seq) + 两个复合索引支撑按线程/类别/run 分页;categorymessage|trace|lifecycle models/run_event.py:13-35。其存储实现 DbRunEventStore 不在 entity 目录,而在 runtime/events/store/db.py(模型与存储分置)models/init.py:12-15

migrations — Alembic 占位

migrations/env.py 配置 Alembic 仅管理 DeerFlow 自有表,显式声明绝不触碰 LangGraph checkpointer 表;启用 render_as_batch=True 以支持 SQLite 的 ALTER TABLE migrations/env.py:47-54alembic.ini 离线默认 URL 为 sqlite+aiosqlite:///./data/deerflow.db,运行时实际用 DeerFlow config 的引擎 migrations/alembic.ini:1-5。⚠️ versions/ 目录当前仅含 .gitkeep(无版本脚本),建表实际由 engine.pycreate_all 完成,注释明确 "Production should use Alembic" engine.py:137-138

runtime/store — LangGraph Store 工厂(与 checkpointer 关系)

runtime/store 不属于 deerflow.persistence,它构造 LangGraph 的 BaseStore(长期记忆 KV),后端镜像 checkpointer 段:memory→InMemoryStoresqlite→AsyncSqliteStorepostgres→AsyncPostgresStore,确保 store 与 checkpointer 始终用同一持久化技术 async_provider.py:1-16 async_provider.py:36-80provider.py 提供同步单例/上下文管理器供 CLI 与嵌入式 DeerFlowClient 使用 provider.py:1-19_sqlite_utils.py 共享 SQLite 连接串解析(:memory:/file: 原样、普通路径解析为绝对路径)与父目录创建,store 与 checkpointer Provider 都复用它 _sqlite_utils.py:10-28。checkpointer 的 async provider 优先用旧版 checkpointer: 段,否则回退统一 database: 段,二者最终都指向同一 deerflow.db 路径 checkpointer/async_provider.py:86-122

下面是三套持久化组件的后端选择决策(同一份 config 派生三条链路):

Data Flow

下面是一次 run event 的写入与读取全过程(SQLite/db 后端)。

写入侧关键点:_max_seq_for_thread 在 SQLite 用 SELECT max(seq) ... FOR UPDATE 序列化同线程写者(PostgreSQL 改用 pg_advisory_xact_lock,因为聚合结果不可加行锁)db.py:89-109。读取侧 _user_id_from_context 会把 User.id(UUID)强制 str(),否则 aiosqlite 无法绑定 UUID 到 VARCHAR 列会静默回滚 db.py:71-87

Implementation Details

SQLite WAL 监听器是整层最关键的实现细节——它决定了「单文件共用」在生产单机下是否安全:

python
# engine.py:115-123
@event.listens_for(_engine.sync_engine, "connect")
def _enable_sqlite_wal(dbapi_conn, _record):  # noqa: ARG001
    cursor = dbapi_conn.cursor()
    try:
        cursor.execute("PRAGMA journal_mode=WAL;")
        cursor.execute("PRAGMA synchronous=NORMAL;")
        cursor.execute("PRAGMA foreign_keys=ON;")
    finally:
        cursor.close()

解读:SQLite 的 PRAGMA 是 per-connection 的,不能在启动时执行一次就生效,必须挂在 connect 事件上对每个新连接重新设置。WAL 让读不阻塞写、写不阻塞读;synchronous=NORMAL 是 WAL 的「安全且快」搭档(仅在 WAL checkpoint 边界 fsync,而非每次 commit)。注释还说明不设 busy_timeout——Python sqlite3 驱动默认已有 5 秒 busy timeout,aiosqlite/SQLAlchemy 方言继承该默认,重设是 no-op,写者争锁时会等待而非立即失败 engine.py:103-123

启动装配侧(Gateway lifespan):引擎必须在 checkpointer 之前初始化,以便 PostgreSQL 自动建库逻辑先跑;sfNone(memory)时仓储回退到内存实现 deps.py:62-85

速查表

子模块表名ORM 模型仓储/存储职责Source
runrunsRunRowRunRepositoryrun 元数据、状态机、token 聚合run/model.py:13-49
thread_metathreads_metaThreadMetaRowThreadMetaRepository / MemoryThreadMetaStore线程归属、标题、状态、元数据过滤thread_meta/sql.py:20-32
userusersUserRow(auth 层 repository)账号、密码 hash、role、OAuth 绑定user/model.py:22-59
feedbackfeedbackFeedbackRowFeedbackRepository评分 +1/-1、upsert、聚合统计feedback/sql.py:18-28
models/run_eventrun_eventsRunEventRowDbRunEventStore (在 runtime/events)事件流、单调 seq、分页models/run_event.py:13-35
engineinit/close/get_session_factory引擎/会话工厂生命周期engine.py:57-199
baseBase通用 to_dict() / __repr__()base.py:16-40
json_compatJsonMatchjson_match()跨方言 JSON 等值过滤 + 注入防护json_compat.py:60-91
migrationsAlembic env(占位)仅管 DeerFlow 表,不碰 checkpointermigrations/env.py:1-6
runtime/storemake_store / get_storeLangGraph 长期记忆 KV(镜像 checkpointer)async_provider.py:36-80

Configuration

database: 段(DatabaseConfig)同时驱动 checkpointer 与应用持久化:

字段默认值作用Source
backendmemorymemory/sqlite/postgres,统一后端选择database_config.py:41-44
sqlite_dir.deer-flow/dataSQLite 文件目录,checkpointer 与 app 共用 {dir}/deerflow.dbdatabase_config.py:45-48
postgres_url""PG 连接 URL($VAR 引用 .env),+asyncpg 后缀自动追加database_config.py:49-57
echo_sqlFalse打印所有 SQL(调试用)database_config.py:58-61
pool_size5应用 ORM 引擎连接池大小(仅 postgres)database_config.py:62-65
app_sqlalchemy_url(派生)sqlite→sqlite+aiosqlite:///...;postgres→postgresql+asyncpg://...database_config.py:92-102

旧版独立 checkpointer: 段(CheckpointerConfig)仍被支持且优先级高于 database:,connection_string 对 sqlite 可省(默认 store.db)、对 postgres 必填 checkpointer_config.py:10-26

Common Pitfalls/Tips

  • memory 后端无持久化:backend=memory 时引擎 no-op,get_session_factory() 返回 None,run/feedback 仓储回退到 MemoryRunStore,重启即丢线程列表 engine.py:76-78 deps.py:77-81
  • JSON 序列化中文:引擎注入 _json_serializer(ensure_ascii=False),JSON 列里的中文不会被转义成 \uXXXX engine.py:19-21
  • metadata 过滤 key 受限:search 的 metadata key 必须匹配 [A-Za-z0-9_-]+,全部被拒会抛 InvalidMetadataFilterError(Gateway 返回 400)thread_meta/sql.py:129-143
  • trace 内容会被截断:run_events 的 trace 类内容超过 max_trace_content(默认 10240 字节,见 RunEventsConfig)会按字节截断并标记 content_truncated db.py:49-57
  • 生产慎用自动建表:当前靠 create_all,新增列不会自动迁移已有库;versions/ 仅占位,需正式 Alembic 脚本才能安全演进 schema engine.py:137-138
  • 归属过滤默认开启:仓储方法的 user_id 默认 AUTO,会从请求 contextvar 解析;后台 worker 无 contextvar 时为软读(不过滤/不打戳),迁移脚本须显式传 user_id=None 绕过 thread_meta/base.py:7-13

References

章节关系
15-Runtime运行时与StreamBridge.mdRuntime 层产生 run / run_event,本章是其落库后端;RunManagerRunStore 为持久化支撑
23-长期记忆系统.md长期记忆走 LangGraph store,本章 runtime/store 工厂为其提供 sqlite/postgres 后端
04-配置系统与AppConfig.mddatabase: / checkpointer: 段由 AppConfig 解析,本章消费这些配置初始化引擎
11-ThreadState与状态管理.mdThreadState 的执行态由 LangGraph checkpointer 持久化,与本章应用数据物理共用 SQLite 文件但 schema 分离

公司内部参考 · 由 claude-wiki-gen 基于源码自动生成的二次分析