mem0学习笔记5:Mem0 是如何思考的

第四部分 中,我们研究了高层架构。现在,我们要进行"手术"。我们将查看 mem0/memory/main.py 中的代码,以确切了解当你调用 memory.add() 时会发生什么。

这是系统中最复杂的部分。它是"思考"发生的地方。

4 步pipline

当你调用 memory.add(messages, user_id="alice") 时,Mem0 会运行一个精心设计的 4 步管道:

  1. 事实提取 (Fact Extraction):"用户实际上在说什么?"
  2. 上下文检索 (Context Retrieval):"我已经知道这个了吗?"
  3. 行动决策 (Action Decision):"我应该添加、更新还是删除?"
  4. 执行 (Execution):"提交到数据库。"

让我们深入看看每一步的代码。

第 1 步:事实提取

智能判断:User Memory vs Agent Memory

Mem0 会根据上下文自动判断应该提取谁的记忆。

# mem0/memory/main.py (line 431)
is_agent_memory = self._should_use_agent_memory_extraction(messages, metadata)
system_prompt, user_prompt = get_fact_retrieval_messages(parsed_messages, is_agent_memory)

如果检测到 agent_id,或者消息中包含 assistant 角色,Mem0 会切换到"Agent Memory Extraction"模式,提取智能体自己的特征和偏好(而不是用户的)。

提示词设计

Mem0 使用两套不同的提示词:

User Memory Extraction Prompt(针对用户):

USER_MEMORY_EXTRACTION_PROMPT = """You are a Personal Information Organizer...

# [IMPORTANT]: GENERATE FACTS SOLELY BASED ON THE USER'S MESSAGES. 
# [IMPORTANT]: YOU WILL BE PENALIZED IF YOU INCLUDE INFORMATION FROM ASSISTANT OR SYSTEM MESSAGES.

Types of Information to Remember:
1. Store Personal Preferences: likes, dislikes...
2. Maintain Important Personal Details: names, relationships...
3. Track Plans and Intentions: upcoming events, goals...
...

Few shot examples:

User: Hi, my name is John. I am a software engineer.
Assistant: Nice to meet you, John!
Output: {"facts" : ["Name is John", "Is a Software engineer"]}

注意它强调了 只提取用户的信息,不要提取助手的信息

Agent Memory Extraction Prompt(针对智能体):

AGENT_MEMORY_EXTRACTION_PROMPT = """You are an Assistant Information Organizer...

# [IMPORTANT]: GENERATE FACTS SOLELY BASED ON THE ASSISTANT'S MESSAGES.

Types of Information to Remember:
1. Assistant's Preferences: likes, dislikes...
2. Assistant's Capabilities: skills, knowledge areas...
3. Assistant's Personality Traits...
...

User: Me favourite movies are Inception and Interstellar. What are yours?
Assistant: Great choices! Mine are The Dark Knight and The Shawshank Redemption.
Output: {"facts" : ["Favourite movies are Dark Knight and Shawshank Redemption"]}

这使得 Mem0 可以为智能体构建"自我认知"。

LLM 调用

# mem0/memory/main.py (line 434-440)
response = self.llm.generate_response(
    messages=[
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt},
    ],
    response_format={"type": "json_object"},  # 强制返回 JSON
)

关键设计response_format={"type": "json_object"} 确保 LLM 返回结构化数据,而不是自然语言。

结果解析

# mem0/memory/main.py (line 442-456)
try:
    response = remove_code_blocks(response)  # 移除 ```json 标记
    if not response.strip():
        new_retrieved_facts = []
    else:
        try:
            new_retrieved_facts = json.loads(response)["facts"]
        except json.JSONDecodeError:
            # 备用方案:从响应中提取 JSON
            extracted_json = extract_json(response)
            new_retrieved_facts = json.loads(extracted_json)["facts"]
except Exception as e:
    logger.error(f"Error in new_retrieved_facts: {e}")
    new_retrieved_facts = []

这段代码体现了 鲁棒性设计

  1. 尝试直接解析 JSON
  2. 如果失败,尝试从文本中提取 JSON
  3. 如果还是失败,返回空列表(而不是崩溃)

第 2 步:上下文检索

对于每个提取出的事实,Mem0 会在存储中搜索相似的记忆。

# mem0/memory/main.py (line 463-482)
retrieved_old_memory = []
new_message_embeddings = {}

search_filters = {}
if filters.get("user_id"):
    search_filters["user_id"] = filters["user_id"]
if filters.get("agent_id"):
    search_filters["agent_id"] = filters["agent_id"]
if filters.get("run_id"):
    search_filters["run_id"] = filters["run_id"]

for new_mem in new_retrieved_facts:
    # 计算嵌入向量
    messages_embeddings = self.embedding_model.embed(new_mem, "add")
    new_message_embeddings[new_mem] = messages_embeddings  # 缓存嵌入
    
    # 向量搜索
    existing_memories = self.vector_store.search(
        query=new_mem,
        vectors=messages_embeddings,
        limit=5,  # 只检索前 5 个最相似的记忆
        filters=search_filters,
    )
    
    for mem in existing_memories:
        retrieved_old_memory.append({"id": mem.id, "text": mem.payload.get("data", "")})

关键优化

  1. 嵌入缓存 (new_message_embeddings):避免重复计算相同文本的嵌入。
  2. 用户隔离 (filters):确保只搜索当前用户/会话的记忆。

去重

# mem0/memory/main.py (line 484-487)
unique_data = {}
for item in retrieved_old_memory:
    unique_data[item["id"]] = item
retrieved_old_memory = list(unique_data.values())

因为相同的记忆可能在多个新事实的搜索中出现,我们需要去重。

第 3 步:行动决策("大脑")

这是 Mem0 最聪明的部分。

防幻觉:temp_uuid_mapping

LLM 经常会编造 UUID。如果你直接给它一个像 a3f2b1c9-... 这样的 UUID,它可能会返回一个不存在的 UUID。

Mem0 的解决方案:将 UUID 映射为简单的整数。

# mem0/memory/main.py (line 490-494)
temp_uuid_mapping = {}
for idx, item in enumerate(retrieved_old_memory):
    temp_uuid_mapping[str(idx)] = item["id"]  # 真实 UUID
    retrieved_old_memory[idx]["id"] = str(idx)  # LLM 看到的是 "0", "1", "2"

示例

# 输入给 LLM(隐藏真实 UUID)
[
    {"id": "0", "text": "User lives in Paris"},
    {"id": "1", "text": "User is a software engineer"}
]

# 内部映射
temp_uuid_mapping = {
    "0": "a3f2b1c9-4e5d-6f7a-8b9c-0d1e2f3a4b5c",
    "1": "b4g3c2d0-5f6e-7g8b-9c0d-1e2f3g4c5d6e"
}

LLM 返回 "id": "0" 时,我们用 temp_uuid_mapping["0"] 找回真实 UUID。

Update Memory Prompt

# mem0/memory/main.py (line 497-499)
function_calling_prompt = get_update_memory_messages(
    retrieved_old_memory,  # 旧记忆(ID 已映射为整数)
    new_retrieved_facts,   # 新事实
    self.config.custom_update_memory_prompt  # 可选自定义提示词
)

这个提示词定义了决策逻辑:

DEFAULT_UPDATE_MEMORY_PROMPT = """You are a smart memory manager which controls the memory of a system.
You can perform four operations: (1) ADD, (2) UPDATE, (3) DELETE, and (4) NONE.

Guidelines:
1. **Add**: If the retrieved facts contain new information not present in the memory.
2. **Update**: If the retrieved facts contain information that is already present but different.
   - Example (a): "User likes to play cricket" → "Loves to play cricket with friends" → UPDATE
   - Example (b): "Likes cheese pizza" → "Loves cheese pizza" → NONE (same meaning)
3. **Delete**: If the retrieved facts contradict the information in the memory.
   - Example: "Loves cheese pizza" → "Dislikes cheese pizza" → DELETE
4. **No Change**: If the retrieved facts are already present.

Return JSON:
{
    "memory": [
        {
            "id": "<ID>",
            "text": "<Content>",
            "event": "ADD|UPDATE|DELETE|NONE",
            "old_memory": "<old content>"  // only for UPDATE
        }
    ]
}

LLM 决策

# mem0/memory/main.py (line 502-519)
try:
    response: str = self.llm.generate_response(
        messages=[{"role": "user", "content": function_calling_prompt}],
        response_format={"type": "json_object"},
    )
except Exception as e:
    logger.error(f"Error in new memory actions response: {e}")
    response = ""

try:
    if not response or not response.strip():
        logger.warning("Empty response from LLM, no memories to extract")
        new_memories_with_actions = {}
    else:
        response = remove_code_blocks(response)
        new_memories_with_actions = json.loads(response)
except Exception as e:
    logger.error(f"Invalid JSON response: {e}")
    new_memories_with_actions = {}

第 4 步:执行

根据 LLM 返回的行动列表,执行相应的数据库操作。

# mem0/memory/main.py (line 524-589)
returned_memories = []
for resp in new_memories_with_actions.get("memory", []):
    action_text = resp.get("text")
    if not action_text:
        logger.info("Skipping memory entry because of empty `text` field.")
        continue

    event_type = resp.get("event")
    
    if event_type == "ADD":
        memory_id = self._create_memory(
            data=action_text,
            existing_embeddings=new_message_embeddings,  # 使用缓存的嵌入
            metadata=deepcopy(metadata),
        )
        returned_memories.append({
            "id": memory_id,
            "memory": action_text,
            "event": event_type
        })
    
    elif event_type == "UPDATE":
        self._update_memory(
            memory_id=temp_uuid_mapping[resp.get("id")],  # 映射回真实 UUID
            data=action_text,
            existing_embeddings=new_message_embeddings,
            metadata=deepcopy(metadata),
        )
        returned_memories.append({
            "id": temp_uuid_mapping[resp.get("id")],
            "memory": action_text,
            "event": event_type,
            "previous_memory": resp.get("old_memory"),
        })
    
    elif event_type == "DELETE":
        self._delete_memory(memory_id=temp_uuid_mapping[resp.get("id")])
        returned_memories.append({
            "id": temp_uuid_mapping[resp.get("id")],
            "memory": action_text,
            "event": event_type,
        })
    
    elif event_type == "NONE":
        # 即使内容不需要更新,也可能需要更新 session IDs
        memory_id = temp_uuid_mapping.get(resp.get("id"))
        if memory_id and (metadata.get("agent_id") or metadata.get("run_id")):
            existing_memory = self.vector_store.get(vector_id=memory_id)
            updated_metadata = deepcopy(existing_memory.payload)
            if metadata.get("agent_id"):
                updated_metadata["agent_id"] = metadata["agent_id"]
            if metadata.get("run_id"):
                updated_metadata["run_id"] = metadata["run_id"]
            
            self.vector_store.update(
                vector_id=memory_id,
                vector=None,  # 保持相同的嵌入
                payload=updated_metadata,
            )

注意:即使是 NONE 事件,Mem0 也会智能地更新 agent_idrun_id,以支持跨会话的记忆关联。

技术挑战与解决方案

1. UUID 幻觉

问题:LLM 会编造不存在的 UUID。

解决方案temp_uuid_mapping,将 UUID 替换为整数 ID。

2. 并发与一致性

问题:向量存储和图存储可能同时失败。

解决方案:使用 ThreadPoolExecutor 进行并行写入,但记录错误。在企业级应用中,应使用事务或消息队列。

# (在 _create_memory 中)
with concurrent.futures.ThreadPoolExecutor() as executor:
    future_vector = executor.submit(self._add_to_vector_store, ...)
    future_graph = executor.submit(self._add_to_graph, ...)
    concurrent.futures.wait([future_vector, future_graph])

3. Token 管理

问题:提取 100 条对话的事实会爆掉上下文窗口。

解决方案

  1. 应用层控制:只发送最近的 N 条消息
  2. Mem0 层面:批处理或分块处理

4. "Update" 的本质

向量数据库中,向量通常是不可变的。

Mem0 的 UPDATE 操作实际上是

  1. 删除旧向量
  2. 计算新文本的嵌入
  3. 插入新向量
# _update_memory (简化版)
def _update_memory(self, memory_id, data, existing_embeddings, metadata):
    # 1. 计算新嵌入
    if data in existing_embeddings:
        embeddings = existing_embeddings[data]
    else:
        embeddings = self.embedding_model.embed(data, "update")
    
    # 2. 更新向量存储(实际是 delete + add)
    self.vector_store.update(
        vector_id=memory_id,
        vector=embeddings,
        payload=metadata
    )
    
    # 3. 更新历史数据库
    self.db.update_memory(memory_id, data, metadata)

完整流程图

用户调用: memory.add(messages, user_id="alice")
                    ↓
        ┌───────────────────────────┐
        │   1. 事实提取              │
        │   - 选择提示词(User/Agent)│
        │   - LLM 分析对话            │
        │   - 返回 JSON facts         │
        └───────────┬───────────────┘
                    ↓
        ┌───────────────────────────┐
        │   2. 上下文检索             │
        │   - 计算嵌入向量             │
        │   - 向量搜索相似记忆         │
        │   - UUID → 整数映射         │
        └───────────┬───────────────┘
                    ↓
        ┌───────────────────────────┐
        │   3. 行动决策              │
        │   - LLM 比较新旧事实        │
        │   - 决定 ADD/UPDATE/DELETE │
        │   - 返回行动列表            │
        └───────────┬───────────────┘
                    ↓
        ┌───────────────────────────┐
        │   4. 并行执行              │
        │   - Vector Store 更新      │
        │   - Graph Store 更新       │
        │   - SQLite 历史记录        │
        └───────────────────────────┘

结论

Mem0 不仅仅是 OpenAI 的一个包装器。它是一个 状态管理引擎。通过分析代码,我们看到了它如何解决一致性、去重和降噪等难题——这些是每个 AI 工程师最终都会面临的问题。

从提示词设计(User vs Agent Memory)到防幻觉机制(temp_uuid_mapping),从并行执行到智能决策,Mem0 的每一行代码都体现了对生产级 AI 系统的深刻理解。