Skip to content

长期记忆系统

本章目标:

  • 讲清 DeerFlow 为什么要用「异步去抖 + 按用户隔离」的长期记忆,而不是每轮对话同步调 LLM 写盘
  • 拆解记忆全链路:MemoryMiddleware 过滤消息 → 去抖队列批处理 → updater 用 LLM 抽取 facts/context → 原子写盘 → 下一轮注入 <memory> 标签
  • 给出 memory.json 数据结构、关键配置项与跨线程 user_id 捕获等真实易踩坑点的速查矩阵

TL;DR

MemoryMiddleware.after_agent 过滤出「用户输入 + 终态 AI 回复」,在请求上下文还活着时用 get_effective_user_id() 抓取 user_id,连同消息入队 memory_middleware.py:99-108。队列按 (thread_id, user_id, agent_name) 去重并用 threading.Timer 去抖(默认 30 秒)queue.py:127-149。到点后 MemoryUpdater 用同步 model.invoke() 让 LLM 输出更新 JSON,空白归一去重 facts 后通过 temp 文件 + replace() 原子写盘 updater.py:347-367。存储按 (user_id, agent_name) 分文件隔离并缓存 storage.py:84-103。下一轮按 token 预算注入 <memory> 标签到系统提示 prompt.py:585-588

Overview

长期记忆系统要解决一个 super agent 的核心矛盾:对话是一次性的,但用户的偏好、项目背景、纠错经验是跨会话长期有效的。如果不持久化这些信息,agent 每次都从零开始,无法做个性化响应,也会重复犯同样的错。

为什么不在每轮对话结束时同步调一次 LLM 写记忆?有三个硬约束:

  1. 延迟敏感:记忆抽取本身要再跑一次 LLM 推理,如果同步阻塞在对话主路径上,用户每轮都要多等几秒。系统因此把记忆更新做成「fire-and-forget」的后台任务 memory_middleware.py:101-110
  2. 写放大与抖动:用户连续追问时,每个回合都触发一次完整记忆抽取既浪费 token 又会让记忆文件高频抖动。去抖队列把 30 秒窗口内同一 (thread_id, user_id, agent_name) 的多次入队合并成最后一次 queue.py:143-144
  3. 多用户隔离:Gateway 是多用户服务,A 的偏好绝不能泄漏给 B。记忆按 user_id 落到独立文件 {base_dir}/users/{user_id}/memory.json,缓存 key 也带上 user_id storage.py:155-157

去抖让事情变复杂的地方在于:threading.Timer 在另一个线程上触发,而 user_id 是存在 ContextVar 里的请求级状态,不会跨裸线程传播。所以系统必须在入队那一刻、请求上下文还活着时就把 user_id 抓出来塞进 ConversationContext 对象,让它「随数据走」而不是「随上下文走」memory_middleware.py:96-99

Architecture

整个系统由「采集 → 去抖 → 抽取 → 存储 → 注入」五段组成,跨越中间件链、后台线程与文件系统三个执行域。

Source 列表:

Components / Subsystems

MemoryMiddleware — 采集与入队

职责:作为中间件链第 13 环,在 after_agent 阶段把本轮对话过滤后入队。它先从 runtime.context 或 LangGraph configurablethread_id,过滤消息,检测纠错/正反馈信号,然后在请求上下文还活着时抓 user_id 入队。

关键类:MemoryMiddleware(AgentMiddleware) memory_middleware.py:28-38after_agent 在没有 thread_id、没有消息、或缺少「至少一条 user + 一条 ai」时直接返回不入队 memory_middleware.py:68-91。消息过滤逻辑在 filter_messages_for_memory:只保留 human 输入和不带 tool_calls 的终态 ai 回复,并剥离 <uploaded_files> 块——纯上传消息会跳过,且其后的 ai 回复也一并丢弃 message_processing.py:56-85。纠错/正反馈检测扫描最近 6 条用户消息,匹配中英文模式(如 不对 / that's wrong / 完全正确)message_processing.py:88-109

MemoryUpdateQueue — 去抖批处理队列

职责:收集 ConversationContext,用 threading.Timer 实现去抖,窗口内同 key 合并,到点后串行调 MemoryUpdater

关键类:MemoryUpdateQueue queue.py:28-257,全局单例由 get_memory_queue() 提供 queue.py:265-275。去重身份是 _queue_key = (thread_id, user_id, agent_name) queue.py:43-50_enqueue_locked 在追加新 context 前先把队列里同 key 的旧 context 过滤掉,但会把旧 context 的 correction_detected / reinforcement_detected OR 合并进来,避免去重时丢失信号 queue.py:127-144add()debounce_seconds 延迟,add_nowait() 用 0 秒立即排程(摘要前抢救场景用)queue.py:90-115_process_queue 若发现已有 worker 在跑(_processing),会重新排 0 秒 timer 而非丢弃,保住立即 flush 语义 queue.py:171-184

MemoryUpdater — LLM 抽取与原子写编排

职责:加载当前记忆 → 拼更新提示词 → 用同步 model.invoke() 调 LLM → 解析 JSON → 应用更新(去重 / 移除 / 上限裁剪)→ 原子写盘。

关键类:MemoryUpdater updater.py:276-586。它刻意走同步 LLM 调用路径(_do_update_memory_sync)而非异步,以避免触碰 lead agent 共享的 langchain 全局缓存 AsyncClient 连接池造成跨事件循环复用 bug(issue #2615)updater.py:396-412_apply_updatesnewFacts 做两道过滤:置信度低于 fact_confidence_threshold 的丢弃,内容经 casefold() + strip() 归一后与既有 facts 比对去重 updater.py:546-575;facts 超过 max_facts 时按置信度降序保留 top-N updater.py:577-584_strip_upload_mentions_from_memory 用正则把「上传文件事件」从 summary 和 facts 中清掉,因为上传文件是会话级临时资源,写进长期记忆会让 agent 在后续会话里找不存在的文件 updater.py:244-264

FileMemoryStorage — 按用户隔离 + 缓存

职责:把记忆按 (user_id, agent_name) 落到独立 JSON 文件,提供 mtime 失效的内存缓存,save 走 temp 文件 + 原子 rename。

关键类:FileMemoryStorage(MemoryStorage) storage.py:62-190_get_memory_file_path 决定落盘位置:有 user_id 时走 {base_dir}/users/{user_id}/memory.json(或带 agent_name 的 per-agent 路径);配置里给了绝对 storage_path 则所有用户共享一份(退出隔离)storage.py:84-103。缓存 key 是 (user_id, agent_name) 元组,load() 用文件 st_mtime 判断缓存是否失效 storage.py:119-143。存储实例本身也是单例,由 get_memory_storage() 经反射从 config.storage_class 加载,失败回落到 FileMemoryStorage storage.py:196-231

Data Flow

Implementation Details

跨线程 user_id 捕获

threading.Timer 在独立线程触发回调,而 user_id 存在 ContextVar(_current_user)里,裸线程不会传播 ContextVar。解决方案是在中间件里、请求上下文还活着时就把它抓出来当成普通数据塞进队列:

python
# memory_middleware.py:96-108
# Capture user_id at enqueue time while the request context is still alive.
# threading.Timer fires on a different thread where ContextVar values are not
# propagated, so we must store user_id explicitly in ConversationContext.
user_id = get_effective_user_id()
queue = get_memory_queue()
queue.add(
    thread_id=thread_id,
    messages=filtered_messages,
    agent_name=self._agent_name,
    user_id=user_id,
    correction_detected=correction_detected,
    reinforcement_detected=reinforcement_detected,
)

get_effective_user_id()_current_user ContextVar 取值,取不到则回落到常量 DEFAULT_USER_ID = "default"(无鉴权模式)user_context.py:100-109。抓到的 user_id 存进 ConversationContext 字段 queue.py:16-25,一路传到 updater.update_memory(..., user_id=context.user_id) queue.py:193-200

去抖 Timer 的 cancel/重排

python
# queue.py:153-164
def _schedule_timer(self, delay_seconds: float) -> None:
    # Cancel existing timer if any
    if self._timer is not None:
        self._timer.cancel()

    self._timer = threading.Timer(
        delay_seconds,
        self._process_queue,
    )
    self._timer.daemon = True
    self._timer.start()

每次 add() 都在 self._lock 保护下重新排 timer:先 cancel() 旧 timer 再起新的,所以 30 秒窗口内的连续入队会不断推迟处理,直到「静默 30 秒」才真正触发 LLM 抽取。Timer 设为 daemon,进程退出时不阻塞——这是「best-effort 记忆」的取舍,极端情况下未处理的队列会丢 queue.py:159-164

原子写盘

python
# storage.py:170-176
memory_data = {**memory_data, "lastUpdated": utc_now_iso_z()}

temp_path = file_path.with_suffix(f".{uuid.uuid4().hex}.tmp")
with open(temp_path, "w", encoding="utf-8") as f:
    json.dump(memory_data, f, indent=2, ensure_ascii=False)

temp_path.replace(file_path)

先写带随机 uuid 后缀的临时文件,再用 Path.replace()(原子 rename)覆盖目标文件,保证读者永远看不到半截 JSON。写前对入参做浅拷贝再加 lastUpdated,避免副作用污染调用方的 dict,也避免文件写成功前缓存引用被提前更新 storage.py:160-186。配套地,updater._finalize_update 在 in-place 修改前对 current_memorycopy.deepcopy,这样 save 失败也不会污染仍被缓存的原对象 updater.py:362-367

速查表

memory.json 顶层结构由 create_empty_memory() 定义 storage.py:24-40

区段字段类型含义Source
顶层versionstr结构版本,固定 "1.0"storage.py:27
顶层lastUpdatedstr每次 save 时刷新的 ISO-8601(Z 后缀)时间戳storage.py:170
userworkContext{summary, updatedAt}职业角色 / 公司 / 主项目 / 技术栈(1-3 句)storage.py:30
userpersonalContext{summary, updatedAt}语言能力 / 沟通偏好 / 兴趣(1-2 句)storage.py:31
usertopOfMind{summary, updatedAt}当前 3-5 个并行关注点(更新最频繁)storage.py:32
historyrecentMonths{summary, updatedAt}近 1-3 个月活动详述storage.py:35
historyearlierContext{summary, updatedAt}3-12 个月前的历史模式storage.py:36
historylongTermBackground{summary, updatedAt}长期不变的基础背景storage.py:37
facts[]idstrfact_ + uuid 前 8 位updater.py:561
facts[]contentstr事实正文(strip 后存储)updater.py:562
facts[]categorystrpreference/knowledge/context/behavior/goal/correctionupdater.py:563
facts[]confidencefloat0-1 置信度,低于阈值不入库updater.py:550-551
facts[]createdAtstr创建时间戳updater.py:565
facts[]sourcestr来源 thread_id,无则 "unknown",手工创建为 "manual"updater.py:566
facts[]sourceErrorstr(可选)correction 类且显式错误时写入,注入时显示为 (avoid: ...)updater.py:568-572

注入侧:format_memory_for_injection 把 facts 按 confidence 降序排列,逐行累加 token 直到达到 max_injection_tokens(默认 2000),并非固定截取 top 15;能放进预算的 fact 行越多越好 prompt.py:256-301

Configuration

配置项定义在 MemoryConfig(Pydantic 模型),通过 config.yamlmemory 段加载。

配置项默认约束作用Source
enabledTruebool记忆机制总开关,关闭则中间件直接 returnmemory_config.py:9-12
storage_path""str空=按用户隔离;绝对路径=所有用户共享一份(退出隔离);相对路径基于 base_dirmemory_config.py:13-26
storage_class...FileMemoryStoragestr存储 provider 类路径,经反射加载memory_config.py:27-30
debounce_seconds301-300去抖等待秒数,窗口内重复入队会重排 timermemory_config.py:31-36
model_nameNonestr/None抽取用模型,None=用默认模型memory_config.py:37-40
max_facts10010-500facts 上限,超出按 confidence 降序裁剪memory_config.py:41-46
fact_confidence_threshold0.70-1新 fact 最低置信度,低于则丢弃memory_config.py:47-52
injection_enabledTruebool是否把记忆注入系统提示memory_config.py:53-56
max_injection_tokens2000100-8000注入 token 预算,facts 逐行累加直到达上限memory_config.py:57-62

Common Pitfalls / Tips

  • Timer 线程拿不到 ContextVar:threading.Timer 回调在独立线程执行,_current_user ContextVar 不会自动跨裸线程传播。所以必须在 MemoryMiddleware.after_agent(请求上下文还活着时)用 get_effective_user_id()user_id 塞进 ConversationContext,而不能在 updater 里再调一次——那时拿到的会是 "default" 错桶 memory_middleware.py:96-99
  • 去抖会推迟而非丢弃:30 秒内连续对话不会逐轮写记忆,每次入队都 cancel 旧 timer 重排,只有「静默 30 秒」后才真正抽取。期望「立刻看到记忆更新」时会困惑,需用 flush() / add_nowait() 强制 queue.py:153-164
  • daemon Timer 进程退出会丢队列:Timer 设了 daemon=True,进程在去抖窗口内退出时未处理的记忆更新直接丢失,这是 best-effort 的明确取舍 queue.py:231-233
  • 绝对 storage_path 会打通所有用户:config.memory.storage_path 设成绝对路径会让所有用户共享同一份 memory.json,等于关闭按用户隔离,多用户部署慎用 storage.py:90-93
  • 上传文件不进长期记忆:<uploaded_files> 块在过滤阶段被剥离,纯上传消息整轮跳过;即便混进 summary/facts,_strip_upload_mentions_from_memory 也会用正则二次清除,否则 agent 下次会去找早已不存在的会话级文件 updater.py:244-264
  • fact 去重靠空白归一:去重 key 是 content.strip().casefold(),大小写和首尾空白不同但实质相同的 fact 会被判为重复并跳过 append,不会重复入库 updater.py:267-273
  • 摘要会抢救待删消息:SummarizationMiddleware 删旧消息前触发 memory_flush_hook,用 add_nowait()(0 秒延迟)把即将被摘要丢掉的对话立即送进记忆队列,避免长对话被压缩后丢失记忆信息 summarization_hook.py:12-34。该 hook 仅在 memory.enabled 时注册进摘要中间件 agent.py:95-97

References

章节关系
12-中间件链机制MemoryMiddleware 是中间件链第 13 环,本章是其职责详解
14-鉴权-CSRF与授权get_effective_user_id() / DEFAULT_USER_ID 来自鉴权层,决定记忆隔离桶
33-上下文工程-摘要与循环检测SummarizationMiddleware 删消息前触发 memory_flush_hook 抢救记忆
16-持久化与存储层FileMemoryStoragePaths 路径体系协同实现按用户隔离落盘

公司内部参考 · 由 claude-wiki-gen 基于源码自动生成的二次分析