在 第四部分 中,我们研究了高层架构。现在,我们要进行"手术"。我们将查看 mem0/memory/main.py 中的代码,以确切了解当你调用 memory.add() 时会发生什么。
这是系统中最复杂的部分。它是"思考"发生的地方。
4 步pipline
当你调用 memory.add(messages, user_id="alice") 时,Mem0 会运行一个精心设计的 4 步管道:
- 事实提取 (Fact Extraction):"用户实际上在说什么?"
- 上下文检索 (Context Retrieval):"我已经知道这个了吗?"
- 行动决策 (Action Decision):"我应该添加、更新还是删除?"
- 执行 (Execution):"提交到数据库。"
让我们深入看看每一步的代码。
第 1 步:事实提取
智能判断:User Memory vs Agent Memory
Mem0 会根据上下文自动判断应该提取谁的记忆。
# mem0/memory/main.py (line 431)
is_agent_memory = self._should_use_agent_memory_extraction(messages, metadata)
system_prompt, user_prompt = get_fact_retrieval_messages(parsed_messages, is_agent_memory)
如果检测到 agent_id,或者消息中包含 assistant 角色,Mem0 会切换到"Agent Memory Extraction"模式,提取智能体自己的特征和偏好(而不是用户的)。
提示词设计
Mem0 使用两套不同的提示词:
User Memory Extraction Prompt(针对用户):
USER_MEMORY_EXTRACTION_PROMPT = """You are a Personal Information Organizer...
# [IMPORTANT]: GENERATE FACTS SOLELY BASED ON THE USER'S MESSAGES.
# [IMPORTANT]: YOU WILL BE PENALIZED IF YOU INCLUDE INFORMATION FROM ASSISTANT OR SYSTEM MESSAGES.
Types of Information to Remember:
1. Store Personal Preferences: likes, dislikes...
2. Maintain Important Personal Details: names, relationships...
3. Track Plans and Intentions: upcoming events, goals...
...
Few shot examples:
User: Hi, my name is John. I am a software engineer.
Assistant: Nice to meet you, John!
Output: {"facts" : ["Name is John", "Is a Software engineer"]}
注意它强调了 只提取用户的信息,不要提取助手的信息。
Agent Memory Extraction Prompt(针对智能体):
AGENT_MEMORY_EXTRACTION_PROMPT = """You are an Assistant Information Organizer...
# [IMPORTANT]: GENERATE FACTS SOLELY BASED ON THE ASSISTANT'S MESSAGES.
Types of Information to Remember:
1. Assistant's Preferences: likes, dislikes...
2. Assistant's Capabilities: skills, knowledge areas...
3. Assistant's Personality Traits...
...
User: Me favourite movies are Inception and Interstellar. What are yours?
Assistant: Great choices! Mine are The Dark Knight and The Shawshank Redemption.
Output: {"facts" : ["Favourite movies are Dark Knight and Shawshank Redemption"]}
这使得 Mem0 可以为智能体构建"自我认知"。
LLM 调用
# mem0/memory/main.py (line 434-440)
response = self.llm.generate_response(
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
response_format={"type": "json_object"}, # 强制返回 JSON
)
关键设计:response_format={"type": "json_object"} 确保 LLM 返回结构化数据,而不是自然语言。
结果解析
# mem0/memory/main.py (line 442-456)
try:
response = remove_code_blocks(response) # 移除 ```json 标记
if not response.strip():
new_retrieved_facts = []
else:
try:
new_retrieved_facts = json.loads(response)["facts"]
except json.JSONDecodeError:
# 备用方案:从响应中提取 JSON
extracted_json = extract_json(response)
new_retrieved_facts = json.loads(extracted_json)["facts"]
except Exception as e:
logger.error(f"Error in new_retrieved_facts: {e}")
new_retrieved_facts = []
这段代码体现了 鲁棒性设计:
- 尝试直接解析 JSON
- 如果失败,尝试从文本中提取 JSON
- 如果还是失败,返回空列表(而不是崩溃)
第 2 步:上下文检索
对于每个提取出的事实,Mem0 会在存储中搜索相似的记忆。
# mem0/memory/main.py (line 463-482)
retrieved_old_memory = []
new_message_embeddings = {}
search_filters = {}
if filters.get("user_id"):
search_filters["user_id"] = filters["user_id"]
if filters.get("agent_id"):
search_filters["agent_id"] = filters["agent_id"]
if filters.get("run_id"):
search_filters["run_id"] = filters["run_id"]
for new_mem in new_retrieved_facts:
# 计算嵌入向量
messages_embeddings = self.embedding_model.embed(new_mem, "add")
new_message_embeddings[new_mem] = messages_embeddings # 缓存嵌入
# 向量搜索
existing_memories = self.vector_store.search(
query=new_mem,
vectors=messages_embeddings,
limit=5, # 只检索前 5 个最相似的记忆
filters=search_filters,
)
for mem in existing_memories:
retrieved_old_memory.append({"id": mem.id, "text": mem.payload.get("data", "")})
关键优化:
- 嵌入缓存 (
new_message_embeddings):避免重复计算相同文本的嵌入。 - 用户隔离 (
filters):确保只搜索当前用户/会话的记忆。
去重
# mem0/memory/main.py (line 484-487)
unique_data = {}
for item in retrieved_old_memory:
unique_data[item["id"]] = item
retrieved_old_memory = list(unique_data.values())
因为相同的记忆可能在多个新事实的搜索中出现,我们需要去重。
第 3 步:行动决策("大脑")
这是 Mem0 最聪明的部分。
防幻觉:temp_uuid_mapping
LLM 经常会编造 UUID。如果你直接给它一个像 a3f2b1c9-... 这样的 UUID,它可能会返回一个不存在的 UUID。
Mem0 的解决方案:将 UUID 映射为简单的整数。
# mem0/memory/main.py (line 490-494)
temp_uuid_mapping = {}
for idx, item in enumerate(retrieved_old_memory):
temp_uuid_mapping[str(idx)] = item["id"] # 真实 UUID
retrieved_old_memory[idx]["id"] = str(idx) # LLM 看到的是 "0", "1", "2"
示例:
# 输入给 LLM(隐藏真实 UUID)
[
{"id": "0", "text": "User lives in Paris"},
{"id": "1", "text": "User is a software engineer"}
]
# 内部映射
temp_uuid_mapping = {
"0": "a3f2b1c9-4e5d-6f7a-8b9c-0d1e2f3a4b5c",
"1": "b4g3c2d0-5f6e-7g8b-9c0d-1e2f3g4c5d6e"
}
LLM 返回 "id": "0" 时,我们用 temp_uuid_mapping["0"] 找回真实 UUID。
Update Memory Prompt
# mem0/memory/main.py (line 497-499)
function_calling_prompt = get_update_memory_messages(
retrieved_old_memory, # 旧记忆(ID 已映射为整数)
new_retrieved_facts, # 新事实
self.config.custom_update_memory_prompt # 可选自定义提示词
)
这个提示词定义了决策逻辑:
DEFAULT_UPDATE_MEMORY_PROMPT = """You are a smart memory manager which controls the memory of a system.
You can perform four operations: (1) ADD, (2) UPDATE, (3) DELETE, and (4) NONE.
Guidelines:
1. **Add**: If the retrieved facts contain new information not present in the memory.
2. **Update**: If the retrieved facts contain information that is already present but different.
- Example (a): "User likes to play cricket" → "Loves to play cricket with friends" → UPDATE
- Example (b): "Likes cheese pizza" → "Loves cheese pizza" → NONE (same meaning)
3. **Delete**: If the retrieved facts contradict the information in the memory.
- Example: "Loves cheese pizza" → "Dislikes cheese pizza" → DELETE
4. **No Change**: If the retrieved facts are already present.
Return JSON:
{
"memory": [
{
"id": "<ID>",
"text": "<Content>",
"event": "ADD|UPDATE|DELETE|NONE",
"old_memory": "<old content>" // only for UPDATE
}
]
}
LLM 决策
# mem0/memory/main.py (line 502-519)
try:
response: str = self.llm.generate_response(
messages=[{"role": "user", "content": function_calling_prompt}],
response_format={"type": "json_object"},
)
except Exception as e:
logger.error(f"Error in new memory actions response: {e}")
response = ""
try:
if not response or not response.strip():
logger.warning("Empty response from LLM, no memories to extract")
new_memories_with_actions = {}
else:
response = remove_code_blocks(response)
new_memories_with_actions = json.loads(response)
except Exception as e:
logger.error(f"Invalid JSON response: {e}")
new_memories_with_actions = {}
第 4 步:执行
根据 LLM 返回的行动列表,执行相应的数据库操作。
# mem0/memory/main.py (line 524-589)
returned_memories = []
for resp in new_memories_with_actions.get("memory", []):
action_text = resp.get("text")
if not action_text:
logger.info("Skipping memory entry because of empty `text` field.")
continue
event_type = resp.get("event")
if event_type == "ADD":
memory_id = self._create_memory(
data=action_text,
existing_embeddings=new_message_embeddings, # 使用缓存的嵌入
metadata=deepcopy(metadata),
)
returned_memories.append({
"id": memory_id,
"memory": action_text,
"event": event_type
})
elif event_type == "UPDATE":
self._update_memory(
memory_id=temp_uuid_mapping[resp.get("id")], # 映射回真实 UUID
data=action_text,
existing_embeddings=new_message_embeddings,
metadata=deepcopy(metadata),
)
returned_memories.append({
"id": temp_uuid_mapping[resp.get("id")],
"memory": action_text,
"event": event_type,
"previous_memory": resp.get("old_memory"),
})
elif event_type == "DELETE":
self._delete_memory(memory_id=temp_uuid_mapping[resp.get("id")])
returned_memories.append({
"id": temp_uuid_mapping[resp.get("id")],
"memory": action_text,
"event": event_type,
})
elif event_type == "NONE":
# 即使内容不需要更新,也可能需要更新 session IDs
memory_id = temp_uuid_mapping.get(resp.get("id"))
if memory_id and (metadata.get("agent_id") or metadata.get("run_id")):
existing_memory = self.vector_store.get(vector_id=memory_id)
updated_metadata = deepcopy(existing_memory.payload)
if metadata.get("agent_id"):
updated_metadata["agent_id"] = metadata["agent_id"]
if metadata.get("run_id"):
updated_metadata["run_id"] = metadata["run_id"]
self.vector_store.update(
vector_id=memory_id,
vector=None, # 保持相同的嵌入
payload=updated_metadata,
)
注意:即使是 NONE 事件,Mem0 也会智能地更新 agent_id 和 run_id,以支持跨会话的记忆关联。
技术挑战与解决方案
1. UUID 幻觉
问题:LLM 会编造不存在的 UUID。
解决方案:temp_uuid_mapping,将 UUID 替换为整数 ID。
2. 并发与一致性
问题:向量存储和图存储可能同时失败。
解决方案:使用 ThreadPoolExecutor 进行并行写入,但记录错误。在企业级应用中,应使用事务或消息队列。
# (在 _create_memory 中)
with concurrent.futures.ThreadPoolExecutor() as executor:
future_vector = executor.submit(self._add_to_vector_store, ...)
future_graph = executor.submit(self._add_to_graph, ...)
concurrent.futures.wait([future_vector, future_graph])
3. Token 管理
问题:提取 100 条对话的事实会爆掉上下文窗口。
解决方案:
- 应用层控制:只发送最近的 N 条消息
- Mem0 层面:批处理或分块处理
4. "Update" 的本质
向量数据库中,向量通常是不可变的。
Mem0 的 UPDATE 操作实际上是:
- 删除旧向量
- 计算新文本的嵌入
- 插入新向量
# _update_memory (简化版)
def _update_memory(self, memory_id, data, existing_embeddings, metadata):
# 1. 计算新嵌入
if data in existing_embeddings:
embeddings = existing_embeddings[data]
else:
embeddings = self.embedding_model.embed(data, "update")
# 2. 更新向量存储(实际是 delete + add)
self.vector_store.update(
vector_id=memory_id,
vector=embeddings,
payload=metadata
)
# 3. 更新历史数据库
self.db.update_memory(memory_id, data, metadata)
完整流程图
用户调用: memory.add(messages, user_id="alice")
↓
┌───────────────────────────┐
│ 1. 事实提取 │
│ - 选择提示词(User/Agent)│
│ - LLM 分析对话 │
│ - 返回 JSON facts │
└───────────┬───────────────┘
↓
┌───────────────────────────┐
│ 2. 上下文检索 │
│ - 计算嵌入向量 │
│ - 向量搜索相似记忆 │
│ - UUID → 整数映射 │
└───────────┬───────────────┘
↓
┌───────────────────────────┐
│ 3. 行动决策 │
│ - LLM 比较新旧事实 │
│ - 决定 ADD/UPDATE/DELETE │
│ - 返回行动列表 │
└───────────┬───────────────┘
↓
┌───────────────────────────┐
│ 4. 并行执行 │
│ - Vector Store 更新 │
│ - Graph Store 更新 │
│ - SQLite 历史记录 │
└───────────────────────────┘
结论
Mem0 不仅仅是 OpenAI 的一个包装器。它是一个 状态管理引擎。通过分析代码,我们看到了它如何解决一致性、去重和降噪等难题——这些是每个 AI 工程师最终都会面临的问题。
从提示词设计(User vs Agent Memory)到防幻觉机制(temp_uuid_mapping),从并行执行到智能决策,Mem0 的每一行代码都体现了对生产级 AI 系统的深刻理解。