主题
长期记忆系统
本章目标:
- 讲清 DeerFlow 为什么要用「异步去抖 + 按用户隔离」的长期记忆,而不是每轮对话同步调 LLM 写盘
- 拆解记忆全链路:
MemoryMiddleware过滤消息 → 去抖队列批处理 → updater 用 LLM 抽取 facts/context → 原子写盘 → 下一轮注入<memory>标签- 给出
memory.json数据结构、关键配置项与跨线程user_id捕获等真实易踩坑点的速查矩阵
TL;DR
MemoryMiddleware.after_agent 过滤出「用户输入 + 终态 AI 回复」,在请求上下文还活着时用 get_effective_user_id() 抓取 user_id,连同消息入队 memory_middleware.py:99-108。队列按 (thread_id, user_id, agent_name) 去重并用 threading.Timer 去抖(默认 30 秒)queue.py:127-149。到点后 MemoryUpdater 用同步 model.invoke() 让 LLM 输出更新 JSON,空白归一去重 facts 后通过 temp 文件 + replace() 原子写盘 updater.py:347-367。存储按 (user_id, agent_name) 分文件隔离并缓存 storage.py:84-103。下一轮按 token 预算注入 <memory> 标签到系统提示 prompt.py:585-588。
Overview
长期记忆系统要解决一个 super agent 的核心矛盾:对话是一次性的,但用户的偏好、项目背景、纠错经验是跨会话长期有效的。如果不持久化这些信息,agent 每次都从零开始,无法做个性化响应,也会重复犯同样的错。
为什么不在每轮对话结束时同步调一次 LLM 写记忆?有三个硬约束:
- 延迟敏感:记忆抽取本身要再跑一次 LLM 推理,如果同步阻塞在对话主路径上,用户每轮都要多等几秒。系统因此把记忆更新做成「fire-and-forget」的后台任务 memory_middleware.py:101-110。
- 写放大与抖动:用户连续追问时,每个回合都触发一次完整记忆抽取既浪费 token 又会让记忆文件高频抖动。去抖队列把 30 秒窗口内同一
(thread_id, user_id, agent_name)的多次入队合并成最后一次 queue.py:143-144。 - 多用户隔离:Gateway 是多用户服务,A 的偏好绝不能泄漏给 B。记忆按
user_id落到独立文件{base_dir}/users/{user_id}/memory.json,缓存 key 也带上user_idstorage.py:155-157。
去抖让事情变复杂的地方在于:threading.Timer 在另一个线程上触发,而 user_id 是存在 ContextVar 里的请求级状态,不会跨裸线程传播。所以系统必须在入队那一刻、请求上下文还活着时就把 user_id 抓出来塞进 ConversationContext 对象,让它「随数据走」而不是「随上下文走」memory_middleware.py:96-99。
Architecture
整个系统由「采集 → 去抖 → 抽取 → 存储 → 注入」五段组成,跨越中间件链、后台线程与文件系统三个执行域。
Source 列表:
- memory_middleware.py —
MemoryMiddleware,采集与入队 - message_processing.py — 消息过滤、纠错/正反馈信号检测
- queue.py — 去抖队列与单例
- updater.py — LLM 抽取、去重、原子写编排
- storage.py —
FileMemoryStorage,按用户隔离 + 缓存 - prompt.py — 更新提示词模板与注入格式化
- summarization_hook.py — 摘要前抢救待删消息
- memory_config.py —
MemoryConfig配置模型
Components / Subsystems
MemoryMiddleware — 采集与入队
职责:作为中间件链第 13 环,在 after_agent 阶段把本轮对话过滤后入队。它先从 runtime.context 或 LangGraph configurable 取 thread_id,过滤消息,检测纠错/正反馈信号,然后在请求上下文还活着时抓 user_id 入队。
关键类:MemoryMiddleware(AgentMiddleware) memory_middleware.py:28-38。after_agent 在没有 thread_id、没有消息、或缺少「至少一条 user + 一条 ai」时直接返回不入队 memory_middleware.py:68-91。消息过滤逻辑在 filter_messages_for_memory:只保留 human 输入和不带 tool_calls 的终态 ai 回复,并剥离 <uploaded_files> 块——纯上传消息会跳过,且其后的 ai 回复也一并丢弃 message_processing.py:56-85。纠错/正反馈检测扫描最近 6 条用户消息,匹配中英文模式(如 不对 / that's wrong / 完全正确)message_processing.py:88-109。
MemoryUpdateQueue — 去抖批处理队列
职责:收集 ConversationContext,用 threading.Timer 实现去抖,窗口内同 key 合并,到点后串行调 MemoryUpdater。
关键类:MemoryUpdateQueue queue.py:28-257,全局单例由 get_memory_queue() 提供 queue.py:265-275。去重身份是 _queue_key = (thread_id, user_id, agent_name) queue.py:43-50。_enqueue_locked 在追加新 context 前先把队列里同 key 的旧 context 过滤掉,但会把旧 context 的 correction_detected / reinforcement_detected OR 合并进来,避免去重时丢失信号 queue.py:127-144。add() 走 debounce_seconds 延迟,add_nowait() 用 0 秒立即排程(摘要前抢救场景用)queue.py:90-115。_process_queue 若发现已有 worker 在跑(_processing),会重新排 0 秒 timer 而非丢弃,保住立即 flush 语义 queue.py:171-184。
MemoryUpdater — LLM 抽取与原子写编排
职责:加载当前记忆 → 拼更新提示词 → 用同步 model.invoke() 调 LLM → 解析 JSON → 应用更新(去重 / 移除 / 上限裁剪)→ 原子写盘。
关键类:MemoryUpdater updater.py:276-586。它刻意走同步 LLM 调用路径(_do_update_memory_sync)而非异步,以避免触碰 lead agent 共享的 langchain 全局缓存 AsyncClient 连接池造成跨事件循环复用 bug(issue #2615)updater.py:396-412。_apply_updates 对 newFacts 做两道过滤:置信度低于 fact_confidence_threshold 的丢弃,内容经 casefold() + strip() 归一后与既有 facts 比对去重 updater.py:546-575;facts 超过 max_facts 时按置信度降序保留 top-N updater.py:577-584。_strip_upload_mentions_from_memory 用正则把「上传文件事件」从 summary 和 facts 中清掉,因为上传文件是会话级临时资源,写进长期记忆会让 agent 在后续会话里找不存在的文件 updater.py:244-264。
FileMemoryStorage — 按用户隔离 + 缓存
职责:把记忆按 (user_id, agent_name) 落到独立 JSON 文件,提供 mtime 失效的内存缓存,save 走 temp 文件 + 原子 rename。
关键类:FileMemoryStorage(MemoryStorage) storage.py:62-190。_get_memory_file_path 决定落盘位置:有 user_id 时走 {base_dir}/users/{user_id}/memory.json(或带 agent_name 的 per-agent 路径);配置里给了绝对 storage_path 则所有用户共享一份(退出隔离)storage.py:84-103。缓存 key 是 (user_id, agent_name) 元组,load() 用文件 st_mtime 判断缓存是否失效 storage.py:119-143。存储实例本身也是单例,由 get_memory_storage() 经反射从 config.storage_class 加载,失败回落到 FileMemoryStorage storage.py:196-231。
Data Flow
Implementation Details
跨线程 user_id 捕获
threading.Timer 在独立线程触发回调,而 user_id 存在 ContextVar(_current_user)里,裸线程不会传播 ContextVar。解决方案是在中间件里、请求上下文还活着时就把它抓出来当成普通数据塞进队列:
python
# memory_middleware.py:96-108
# Capture user_id at enqueue time while the request context is still alive.
# threading.Timer fires on a different thread where ContextVar values are not
# propagated, so we must store user_id explicitly in ConversationContext.
user_id = get_effective_user_id()
queue = get_memory_queue()
queue.add(
thread_id=thread_id,
messages=filtered_messages,
agent_name=self._agent_name,
user_id=user_id,
correction_detected=correction_detected,
reinforcement_detected=reinforcement_detected,
)get_effective_user_id() 从 _current_user ContextVar 取值,取不到则回落到常量 DEFAULT_USER_ID = "default"(无鉴权模式)user_context.py:100-109。抓到的 user_id 存进 ConversationContext 字段 queue.py:16-25,一路传到 updater.update_memory(..., user_id=context.user_id) queue.py:193-200。
去抖 Timer 的 cancel/重排
python
# queue.py:153-164
def _schedule_timer(self, delay_seconds: float) -> None:
# Cancel existing timer if any
if self._timer is not None:
self._timer.cancel()
self._timer = threading.Timer(
delay_seconds,
self._process_queue,
)
self._timer.daemon = True
self._timer.start()每次 add() 都在 self._lock 保护下重新排 timer:先 cancel() 旧 timer 再起新的,所以 30 秒窗口内的连续入队会不断推迟处理,直到「静默 30 秒」才真正触发 LLM 抽取。Timer 设为 daemon,进程退出时不阻塞——这是「best-effort 记忆」的取舍,极端情况下未处理的队列会丢 queue.py:159-164。
原子写盘
python
# storage.py:170-176
memory_data = {**memory_data, "lastUpdated": utc_now_iso_z()}
temp_path = file_path.with_suffix(f".{uuid.uuid4().hex}.tmp")
with open(temp_path, "w", encoding="utf-8") as f:
json.dump(memory_data, f, indent=2, ensure_ascii=False)
temp_path.replace(file_path)先写带随机 uuid 后缀的临时文件,再用 Path.replace()(原子 rename)覆盖目标文件,保证读者永远看不到半截 JSON。写前对入参做浅拷贝再加 lastUpdated,避免副作用污染调用方的 dict,也避免文件写成功前缓存引用被提前更新 storage.py:160-186。配套地,updater._finalize_update 在 in-place 修改前对 current_memory 做 copy.deepcopy,这样 save 失败也不会污染仍被缓存的原对象 updater.py:362-367。
速查表
memory.json 顶层结构由 create_empty_memory() 定义 storage.py:24-40。
| 区段 | 字段 | 类型 | 含义 | Source |
|---|---|---|---|---|
| 顶层 | version | str | 结构版本,固定 "1.0" | storage.py:27 |
| 顶层 | lastUpdated | str | 每次 save 时刷新的 ISO-8601(Z 后缀)时间戳 | storage.py:170 |
user | workContext | {summary, updatedAt} | 职业角色 / 公司 / 主项目 / 技术栈(1-3 句) | storage.py:30 |
user | personalContext | {summary, updatedAt} | 语言能力 / 沟通偏好 / 兴趣(1-2 句) | storage.py:31 |
user | topOfMind | {summary, updatedAt} | 当前 3-5 个并行关注点(更新最频繁) | storage.py:32 |
history | recentMonths | {summary, updatedAt} | 近 1-3 个月活动详述 | storage.py:35 |
history | earlierContext | {summary, updatedAt} | 3-12 个月前的历史模式 | storage.py:36 |
history | longTermBackground | {summary, updatedAt} | 长期不变的基础背景 | storage.py:37 |
facts[] | id | str | fact_ + uuid 前 8 位 | updater.py:561 |
facts[] | content | str | 事实正文(strip 后存储) | updater.py:562 |
facts[] | category | str | preference/knowledge/context/behavior/goal/correction | updater.py:563 |
facts[] | confidence | float | 0-1 置信度,低于阈值不入库 | updater.py:550-551 |
facts[] | createdAt | str | 创建时间戳 | updater.py:565 |
facts[] | source | str | 来源 thread_id,无则 "unknown",手工创建为 "manual" | updater.py:566 |
facts[] | sourceError | str(可选) | 仅 correction 类且显式错误时写入,注入时显示为 (avoid: ...) | updater.py:568-572 |
注入侧:format_memory_for_injection 把 facts 按 confidence 降序排列,逐行累加 token 直到达到 max_injection_tokens(默认 2000),并非固定截取 top 15;能放进预算的 fact 行越多越好 prompt.py:256-301。
Configuration
配置项定义在 MemoryConfig(Pydantic 模型),通过 config.yaml 的 memory 段加载。
| 配置项 | 默认 | 约束 | 作用 | Source |
|---|---|---|---|---|
enabled | True | bool | 记忆机制总开关,关闭则中间件直接 return | memory_config.py:9-12 |
storage_path | "" | str | 空=按用户隔离;绝对路径=所有用户共享一份(退出隔离);相对路径基于 base_dir | memory_config.py:13-26 |
storage_class | ...FileMemoryStorage | str | 存储 provider 类路径,经反射加载 | memory_config.py:27-30 |
debounce_seconds | 30 | 1-300 | 去抖等待秒数,窗口内重复入队会重排 timer | memory_config.py:31-36 |
model_name | None | str/None | 抽取用模型,None=用默认模型 | memory_config.py:37-40 |
max_facts | 100 | 10-500 | facts 上限,超出按 confidence 降序裁剪 | memory_config.py:41-46 |
fact_confidence_threshold | 0.7 | 0-1 | 新 fact 最低置信度,低于则丢弃 | memory_config.py:47-52 |
injection_enabled | True | bool | 是否把记忆注入系统提示 | memory_config.py:53-56 |
max_injection_tokens | 2000 | 100-8000 | 注入 token 预算,facts 逐行累加直到达上限 | memory_config.py:57-62 |
Common Pitfalls / Tips
- Timer 线程拿不到 ContextVar:
threading.Timer回调在独立线程执行,_current_userContextVar 不会自动跨裸线程传播。所以必须在MemoryMiddleware.after_agent(请求上下文还活着时)用get_effective_user_id()抓user_id塞进ConversationContext,而不能在 updater 里再调一次——那时拿到的会是"default"错桶 memory_middleware.py:96-99。 - 去抖会推迟而非丢弃:30 秒内连续对话不会逐轮写记忆,每次入队都
cancel旧 timer 重排,只有「静默 30 秒」后才真正抽取。期望「立刻看到记忆更新」时会困惑,需用flush()/add_nowait()强制 queue.py:153-164。 - daemon Timer 进程退出会丢队列:Timer 设了
daemon=True,进程在去抖窗口内退出时未处理的记忆更新直接丢失,这是 best-effort 的明确取舍 queue.py:231-233。 - 绝对 storage_path 会打通所有用户:
config.memory.storage_path设成绝对路径会让所有用户共享同一份memory.json,等于关闭按用户隔离,多用户部署慎用 storage.py:90-93。 - 上传文件不进长期记忆:
<uploaded_files>块在过滤阶段被剥离,纯上传消息整轮跳过;即便混进 summary/facts,_strip_upload_mentions_from_memory也会用正则二次清除,否则 agent 下次会去找早已不存在的会话级文件 updater.py:244-264。 - fact 去重靠空白归一:去重 key 是
content.strip().casefold(),大小写和首尾空白不同但实质相同的 fact 会被判为重复并跳过 append,不会重复入库 updater.py:267-273。 - 摘要会抢救待删消息:
SummarizationMiddleware删旧消息前触发memory_flush_hook,用add_nowait()(0 秒延迟)把即将被摘要丢掉的对话立即送进记忆队列,避免长对话被压缩后丢失记忆信息 summarization_hook.py:12-34。该 hook 仅在memory.enabled时注册进摘要中间件 agent.py:95-97。
References
- memory_middleware.py —
MemoryMiddleware.after_agent采集与入队 - queue.py — 去抖队列、
_queue_key去重、Timer 排程 - updater.py — LLM 抽取、fact 去重、原子写编排
- storage.py —
FileMemoryStorage按用户隔离与缓存 - prompt.py — 更新提示词与
format_memory_for_injection - message_processing.py — 消息过滤与纠错/正反馈检测
- summarization_hook.py — 摘要前记忆抢救
- memory_config.py —
MemoryConfig配置模型 - user_context.py —
get_effective_user_id/DEFAULT_USER_ID - lead_agent/prompt.py —
<memory>标签注入
Related Pages
| 章节 | 关系 |
|---|---|
| 12-中间件链机制 | MemoryMiddleware 是中间件链第 13 环,本章是其职责详解 |
| 14-鉴权-CSRF与授权 | get_effective_user_id() / DEFAULT_USER_ID 来自鉴权层,决定记忆隔离桶 |
| 33-上下文工程-摘要与循环检测 | SummarizationMiddleware 删消息前触发 memory_flush_hook 抢救记忆 |
| 16-持久化与存储层 | FileMemoryStorage 与 Paths 路径体系协同实现按用户隔离落盘 |