主题
持久化与存储层
本章目标:
- 讲清楚 DeerFlow 为什么把「应用业务数据」与「LangGraph 执行状态」拆成两套独立的持久化体系,以及 SQLite 为何是默认后端。
- 拆解
deerflow.persistence的引擎生命周期、声明式 Base、各 domain 仓储(run / thread_meta / user / feedback / run_event)与跨方言 JSON 匹配机制。- 说明
runtime/store异步 Provider 如何镜像 checkpointer 后端,以及一次 run event 写入与读取的完整数据流。
TL;DR
DeerFlow 有两条彼此独立的持久化链路:deerflow.persistence(SQLAlchemy 2.0 async ORM,管理 runs / threads_meta / users / feedback / run_events)与 LangGraph 自管的 checkpointer + store。两者由 config.yaml 的统一 database: 段(memory / sqlite / postgres)驱动,SQLite 模式下共用同一个 deerflow.db 文件并启用 WAL database_config.py:1-12。引擎在 Gateway 启动时初始化、关闭时释放;backend=memory 时引擎为 no-op,仓储回退到内存实现 engine.py:1-9。当前版本通过 Base.metadata.create_all 自动建表,Alembic 目录已就位但尚无版本脚本 engine.py:137-163。
Overview
为什么需要这一层抽象?核心原因有三个。
第一,职责隔离。 LangGraph 的 checkpointer 负责图执行状态(消息历史、中间状态、断点恢复),它有自己的 schema 生命周期,由 LangGraph 自身管理,绝不能被 DeerFlow 触碰 migrations/env.py:1-6。而 DeerFlow 还有一批「应用自有数据」——run 元数据、线程归属、用户账号、反馈、run 事件流——这些需要被 SQL 查询、聚合统计、按 user 过滤,LangGraph checkpointer 不提供这种能力。所以 deerflow.persistence 是一套独立的 SQLAlchemy ORM,与 checkpointer 物理上可能共用一个文件/数据库,但逻辑上完全分离 persistence/init.py:1-9。
第二,后端可替换且统一配置。 用户只配置一个 database.backend,系统自动处理物理分离:memory 用于开发(重启即丢)、sqlite 用于单机部署、postgres 用于多节点生产 database_config.py:40-57。SQLite 是默认后端,因为它零运维、单文件、随 make dev 即可用;通过在每个连接上启用 journal_mode=WAL + synchronous=NORMAL,SQLite 支持并发读 + 单写者不互相阻塞,使「checkpointer 与应用共用一个文件」在生产单机部署下也安全 engine.py:103-123。
第三,跨方言可移植。 仓储代码必须在 SQLite 与 PostgreSQL 上行为一致。JSON 列上的 metadata 过滤在两种方言下语法差异极大(SQLite 用 json_extract/json_type,PG 用 ->>/json_typeof),json_compat.JsonMatch 把这层差异封装成一个可编译表达式,并对 key/value 做注入防护与 64 位整数边界校验 json_compat.py:60-91。
Architecture
Source 列表:
persistence/engine.py— async 引擎/会话工厂生命周期 engine.pypersistence/base.py— 声明式 Base + 通用to_dict()base.pypersistence/json_compat.py— 跨方言 JSON 匹配 json_compat.pypersistence/run/、thread_meta/、user/、feedback/、models/run_event.py— 各 domain ORM + 仓储runtime/store/async_provider.py、provider.py、_sqlite_utils.py— LangGraph store 工厂app/gateway/deps.py— 启动时装配所有仓储 deps.py:41-97
说明:
user_id/thread_id/run_id是逻辑关联键,代码层面以String(64/36)存储而非数据库外键约束(便于跨后端可移植与历史数据兼容),归属过滤在仓储层通过resolve_user_id强制执行。
Components / Subsystems
engine — 异步引擎生命周期
engine.py 是整层的入口。init_engine(backend, url, echo, pool_size, sqlite_dir) 根据后端创建 AsyncEngine 与 async_sessionmaker:memory 直接 no-op 返回(get_session_factory() 返回 None);sqlite 创建目录并注册一个 connect 事件监听器,在每个新连接上执行 PRAGMA journal_mode=WAL / synchronous=NORMAL / foreign_keys=ON(PRAGMA 是 per-connection 的,必须用监听器而非启动时一次性执行);postgres 启用 pool_pre_ping 与连接池 engine.py:57-135。建表走 Base.metadata.create_all;PostgreSQL 若库不存在会自动连 postgres 维护库 CREATE DATABASE 再重建引擎重试 engine.py:149-163。关键模块级符号:init_engine / init_engine_from_config / get_session_factory / get_engine / close_engine,均通过包级 __init__ 暴露 persistence/init.py:11-13。
base — 声明式 Base
Base(DeclarativeBase) 是所有 DeerFlow ORM 模型的父类,提供通用 to_dict(exclude=...)(经 sa_inspect 遍历映射列)与 __repr__(),各模型无需自写序列化逻辑 base.py:16-40。LangGraph checkpointer 的表不由此 Base 管理 base.py:7。
json_compat — 跨方言 JSON 匹配
JsonMatch(ColumnElement) 把 column[key] == value 编译为 SQLite 的 json_type/json_extract 或 PG 的 json_typeof/->>,做类型安全比较(区分 bool/int、NULL/缺键)json_compat.py:60-91。key 必须匹配 [A-Za-z0-9_-]+(防 JSONPath/SQL 注入),value 仅允许 None/bool/int/float/str 且整数限 64 位 json_compat.py:28-58。它被 ThreadMetaRepository.search 用于元数据过滤 thread_meta/sql.py:129-143。
run domain — RunRow + RunRepository
RunRow(表 runs)以 run_id 为主键,记录 thread/user 归属、状态机(pending|running|success|error|timeout|interrupted)、模型名、metadata/kwargs JSON、token 用量分桶(lead_agent / subagent / middleware)及便捷字段(first_human_message / last_ai_message)run/model.py:13-49。RunRepository 实现 RunStore 接口,每方法开短生命周期 session(后台 worker 可能跑数分钟,不持有跨执行连接);aggregate_tokens_by_thread 用单条 GROUP BY SQL 做模型维度 token 聚合 run/sql.py:1-6 run/sql.py:222-270。
thread_meta domain — 归属与访问控制
ThreadMetaRow(表 threads_meta)以 thread_id 为主键,持 user_id 索引、display_name(标题)、status、metadata_json thread_meta/model.py:13-23。ThreadMetaStore 抽象接口规定所有方法接受三态 user_id(AUTO 从 contextvar 解析 / 显式 str / 显式 None 绕过归属过滤,仅迁移与 CLI 用)thread_meta/base.py:7-13。check_access 有读宽松/写严格两套语义,严格模式下「已被删除的线程」不能被任何调用者重新命中,堵住删除幂等的跨用户漏洞 thread_meta/sql.py:78-106。make_thread_store(sf, store) 工厂:有 session_factory 用 ThreadMetaRepository,否则回退 LangGraph store 包装的 MemoryThreadMetaStore thread_meta/init.py:26-39。
user domain — 账号与 OAuth
UserRow(表 users)UUID 主键存为 36 字符串(跨后端可移植),邮箱唯一,system_role(admin|user)用纯字符串避免新角色引发 ALTER TABLE,并用部分唯一索引 idx_users_oauth_identity 保证一对 (provider, oauth_id) 一个账号、留 NULL/NULL 行不受约束以兼容密码账号 user/model.py:22-59。它与其它表共用同一引擎/连接池/schema 初始化路径 user/model.py:1-10。
feedback domain — 评分与统计
FeedbackRow(表 feedback)以 (thread_id, run_id, user_id) 唯一约束保证一用户对一 run 只有一条反馈,rating 仅 +1/-1 feedback/model.py:13-32。FeedbackRepository.upsert 按唯一键 select-then-update/insert;aggregate_by_run 用 CASE 在数据库侧统计正负计数 feedback/sql.py:125-163 feedback/sql.py:203-217。
run_event domain — 事件流
RunEventRow(表 run_events)自增 id,以 seq 在线程内单调递增,UniqueConstraint(thread_id, seq) + 两个复合索引支撑按线程/类别/run 分页;category 为 message|trace|lifecycle models/run_event.py:13-35。其存储实现 DbRunEventStore 不在 entity 目录,而在 runtime/events/store/db.py(模型与存储分置)models/init.py:12-15。
migrations — Alembic 占位
migrations/env.py 配置 Alembic 仅管理 DeerFlow 自有表,显式声明绝不触碰 LangGraph checkpointer 表;启用 render_as_batch=True 以支持 SQLite 的 ALTER TABLE migrations/env.py:47-54。alembic.ini 离线默认 URL 为 sqlite+aiosqlite:///./data/deerflow.db,运行时实际用 DeerFlow config 的引擎 migrations/alembic.ini:1-5。⚠️ versions/ 目录当前仅含 .gitkeep(无版本脚本),建表实际由 engine.py 的 create_all 完成,注释明确 "Production should use Alembic" engine.py:137-138。
runtime/store — LangGraph Store 工厂(与 checkpointer 关系)
runtime/store 不属于 deerflow.persistence,它构造 LangGraph 的 BaseStore(长期记忆 KV),后端镜像 checkpointer 段:memory→InMemoryStore、sqlite→AsyncSqliteStore、postgres→AsyncPostgresStore,确保 store 与 checkpointer 始终用同一持久化技术 async_provider.py:1-16 async_provider.py:36-80。provider.py 提供同步单例/上下文管理器供 CLI 与嵌入式 DeerFlowClient 使用 provider.py:1-19。_sqlite_utils.py 共享 SQLite 连接串解析(:memory:/file: 原样、普通路径解析为绝对路径)与父目录创建,store 与 checkpointer Provider 都复用它 _sqlite_utils.py:10-28。checkpointer 的 async provider 优先用旧版 checkpointer: 段,否则回退统一 database: 段,二者最终都指向同一 deerflow.db 路径 checkpointer/async_provider.py:86-122。
下面是三套持久化组件的后端选择决策(同一份 config 派生三条链路):
Data Flow
下面是一次 run event 的写入与读取全过程(SQLite/db 后端)。
写入侧关键点:_max_seq_for_thread 在 SQLite 用 SELECT max(seq) ... FOR UPDATE 序列化同线程写者(PostgreSQL 改用 pg_advisory_xact_lock,因为聚合结果不可加行锁)db.py:89-109。读取侧 _user_id_from_context 会把 User.id(UUID)强制 str(),否则 aiosqlite 无法绑定 UUID 到 VARCHAR 列会静默回滚 db.py:71-87。
Implementation Details
SQLite WAL 监听器是整层最关键的实现细节——它决定了「单文件共用」在生产单机下是否安全:
python
# engine.py:115-123
@event.listens_for(_engine.sync_engine, "connect")
def _enable_sqlite_wal(dbapi_conn, _record): # noqa: ARG001
cursor = dbapi_conn.cursor()
try:
cursor.execute("PRAGMA journal_mode=WAL;")
cursor.execute("PRAGMA synchronous=NORMAL;")
cursor.execute("PRAGMA foreign_keys=ON;")
finally:
cursor.close()解读:SQLite 的 PRAGMA 是 per-connection 的,不能在启动时执行一次就生效,必须挂在 connect 事件上对每个新连接重新设置。WAL 让读不阻塞写、写不阻塞读;synchronous=NORMAL 是 WAL 的「安全且快」搭档(仅在 WAL checkpoint 边界 fsync,而非每次 commit)。注释还说明不设 busy_timeout——Python sqlite3 驱动默认已有 5 秒 busy timeout,aiosqlite/SQLAlchemy 方言继承该默认,重设是 no-op,写者争锁时会等待而非立即失败 engine.py:103-123。
启动装配侧(Gateway lifespan):引擎必须在 checkpointer 之前初始化,以便 PostgreSQL 自动建库逻辑先跑;sf 为 None(memory)时仓储回退到内存实现 deps.py:62-85。
速查表
| 子模块 | 表名 | ORM 模型 | 仓储/存储 | 职责 | Source |
|---|---|---|---|---|---|
| run | runs | RunRow | RunRepository | run 元数据、状态机、token 聚合 | run/model.py:13-49 |
| thread_meta | threads_meta | ThreadMetaRow | ThreadMetaRepository / MemoryThreadMetaStore | 线程归属、标题、状态、元数据过滤 | thread_meta/sql.py:20-32 |
| user | users | UserRow | (auth 层 repository) | 账号、密码 hash、role、OAuth 绑定 | user/model.py:22-59 |
| feedback | feedback | FeedbackRow | FeedbackRepository | 评分 +1/-1、upsert、聚合统计 | feedback/sql.py:18-28 |
| models/run_event | run_events | RunEventRow | DbRunEventStore (在 runtime/events) | 事件流、单调 seq、分页 | models/run_event.py:13-35 |
| engine | — | — | init/close/get_session_factory | 引擎/会话工厂生命周期 | engine.py:57-199 |
| base | — | Base | — | 通用 to_dict() / __repr__() | base.py:16-40 |
| json_compat | — | JsonMatch | json_match() | 跨方言 JSON 等值过滤 + 注入防护 | json_compat.py:60-91 |
| migrations | — | — | Alembic env(占位) | 仅管 DeerFlow 表,不碰 checkpointer | migrations/env.py:1-6 |
| runtime/store | — | — | make_store / get_store | LangGraph 长期记忆 KV(镜像 checkpointer) | async_provider.py:36-80 |
Configuration
database: 段(DatabaseConfig)同时驱动 checkpointer 与应用持久化:
| 字段 | 默认值 | 作用 | Source |
|---|---|---|---|
backend | memory | memory/sqlite/postgres,统一后端选择 | database_config.py:41-44 |
sqlite_dir | .deer-flow/data | SQLite 文件目录,checkpointer 与 app 共用 {dir}/deerflow.db | database_config.py:45-48 |
postgres_url | "" | PG 连接 URL($VAR 引用 .env),+asyncpg 后缀自动追加 | database_config.py:49-57 |
echo_sql | False | 打印所有 SQL(调试用) | database_config.py:58-61 |
pool_size | 5 | 应用 ORM 引擎连接池大小(仅 postgres) | database_config.py:62-65 |
app_sqlalchemy_url(派生) | — | sqlite→sqlite+aiosqlite:///...;postgres→postgresql+asyncpg://... | database_config.py:92-102 |
旧版独立 checkpointer: 段(CheckpointerConfig)仍被支持且优先级高于 database:,connection_string 对 sqlite 可省(默认 store.db)、对 postgres 必填 checkpointer_config.py:10-26。
Common Pitfalls/Tips
- memory 后端无持久化:
backend=memory时引擎 no-op,get_session_factory()返回None,run/feedback 仓储回退到MemoryRunStore,重启即丢线程列表 engine.py:76-78 deps.py:77-81。 - JSON 序列化中文:引擎注入
_json_serializer(ensure_ascii=False),JSON 列里的中文不会被转义成\uXXXXengine.py:19-21。 - metadata 过滤 key 受限:
search的 metadata key 必须匹配[A-Za-z0-9_-]+,全部被拒会抛InvalidMetadataFilterError(Gateway 返回 400)thread_meta/sql.py:129-143。 - trace 内容会被截断:
run_events的 trace 类内容超过max_trace_content(默认 10240 字节,见 RunEventsConfig)会按字节截断并标记content_truncateddb.py:49-57。 - 生产慎用自动建表:当前靠
create_all,新增列不会自动迁移已有库;versions/仅占位,需正式 Alembic 脚本才能安全演进 schema engine.py:137-138。 - 归属过滤默认开启:仓储方法的
user_id默认AUTO,会从请求 contextvar 解析;后台 worker 无 contextvar 时为软读(不过滤/不打戳),迁移脚本须显式传user_id=None绕过 thread_meta/base.py:7-13。
References
- persistence/engine.py — 引擎生命周期与 WAL 监听器
- persistence/base.py — 声明式 Base
- persistence/json_compat.py — 跨方言 JSON 匹配
- persistence/run/sql.py — RunRepository 与 token 聚合
- persistence/thread_meta/sql.py — 归属与访问控制
- persistence/feedback/sql.py — 反馈 upsert 与聚合
- persistence/migrations/env.py — Alembic 环境
- runtime/store/async_provider.py — LangGraph store 工厂
- runtime/events/store/db.py — DbRunEventStore
- config/database_config.py — 统一数据库配置
Related Pages
| 章节 | 关系 |
|---|---|
| 15-Runtime运行时与StreamBridge.md | Runtime 层产生 run / run_event,本章是其落库后端;RunManager 以 RunStore 为持久化支撑 |
| 23-长期记忆系统.md | 长期记忆走 LangGraph store,本章 runtime/store 工厂为其提供 sqlite/postgres 后端 |
| 04-配置系统与AppConfig.md | database: / checkpointer: 段由 AppConfig 解析,本章消费这些配置初始化引擎 |
| 11-ThreadState与状态管理.md | ThreadState 的执行态由 LangGraph checkpointer 持久化,与本章应用数据物理共用 SQLite 文件但 schema 分离 |