在 第二部分 中,我们手动实现了一个带记忆的 ReAct 智能体。但在真实项目中,你不会从零开始写 ReAct 循环。
你会使用像 LangChain 这样的成熟框架。那么,如何将 Mem0 集成进去呢?
LangChain 的 Agent 架构
LangChain 提供了一个强大的 AgentExecutor,它已经实现了完整的 ReAct 循环。你只需要:
- 定义工具 (Tools)
- 提供 LLM
- 运行
executor.run()
from langchain.agents import initialize_agent, Tool
from langchain.llms import OpenAI
tools = [
Tool(name="Search", func=search_function, description="搜索信息"),
Tool(name="Calculator", func=calc_function, description="计算")
]
llm = OpenAI(temperature=0)
agent = initialize_agent(tools, llm, agent="zero-shot-react-description")
result = agent.run("今天纽约的天气是多少度?")
问题:这个智能体没有长期记忆。每次对话都是全新的。
方案:将 Mem0 集成为 Memory 组件
LangChain 有一个 BaseChatMemory 抽象类,用于管理对话记忆。我们可以实现一个 Mem0Memory 类来桥接。
1. 实现 Mem0Memory
from mem0 import Memory
from langchain.memory import BaseChatMemory
from langchain.schema import BaseMessage, HumanMessage, AIMessage
from typing import List, Dict, Any
class Mem0Memory(BaseChatMemory):
"""LangChain Memory wrapper for Mem0"""
def __init__(self, user_id: str, **kwargs):
super().__init__(**kwargs)
self.memory = Memory()
self.user_id = user_id
self.chat_history: List[BaseMessage] = []
@property
def memory_variables(self) -> List[str]:
"""返回记忆变量。"""
return ["history", "mem0_context"]
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""从 Mem0 加载相关记忆。"""
query = inputs.get("input", "")
# 从 Mem0 检索相关记忆
memories = self.memory.search(
query=query,
user_id=self.user_id,
limit=5
)
mem0_context = "\n".join([
f"- {m['memory']}" for m in memories
]) if memories else ""
return {
"history": self._get_chat_history(),
"mem0_context": mem0_context
}
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""将对话存入 Mem0。"""
input_text = inputs.get("input", "")
output_text = outputs.get("output", "")
# 保存对话历史(用于短期记忆)
self.chat_history.append(HumanMessage(content=input_text))
self.chat_history.append(AIMessage(content=output_text))
# 异步保存到 Mem0(长期记忆)
self.memory.add(
messages=[
{"role": "user", "content": input_text},
{"role": "assistant", "content": output_text}
],
user_id=self.user_id
)
def _get_chat_history(self) -> str:
"""获取格式化的聊天历史"""
return "\n".join([
f"{'Human' if isinstance(msg, HumanMessage) else 'AI'}: {msg.content}"
for msg in self.chat_history[-4:] # 只保留最近 4 轮
])
def clear(self) -> None:
"""清空聊天历史(但不清空 Mem0)"""
self.chat_history = []
2. 在 AgentExecutor 中使用
from langchain.agents import initialize_agent, AgentType
from langchain.chat_models import ChatOpenAI
# 初始化 LLM 和工具
llm = ChatOpenAI(model="gpt-4", temperature=0)
tools = [...] # 你的工具列表
# 使用 Mem0Memory
memory = Mem0Memory(user_id="alice")
# 初始化智能体
agent = initialize_agent(
tools=tools,
llm=llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
memory=memory,
verbose=True
)
# 第一次对话
response = agent.run("帮我查一下 Python 的 GIL 是什么")
# Mem0 会保存这个查询结果
# 10 分钟后...
response = agent.run("刚才那个概念,在多线程场景下会有什么影响?")
# 智能体会从 Mem0 中检索到"GIL"的定义!
高级:自定义 Prompt 注入记忆
LangChain 的标准 Agent Prompt 不会自动使用 mem0_context。我们需要自定义 Prompt。
from langchain.prompts import PromptTemplate
# 自定义 ReAct Prompt
CUSTOM_PROMPT = PromptTemplate(
input_variables=["input", "agent_scratchpad", "mem0_context"],
template="""你是一个 ReAct 智能体。
已知信息(从记忆中检索):
{mem0_context}
工具:
{tools}
问题:{input}
请按照以下格式思考:
Thought: [你的思考]
Action: [工具名称]
Action Input: [工具参数]
Observation: [观察结果]
... (重复 Thought/Action/Observation)
Thought: 我知道最终答案了
Final Answer: [最终答案]
开始!
{agent_scratchpad}
"""
)
# 使用自定义 Prompt
from langchain.agents import create_react_agent
agent = create_react_agent(
llm=llm,
tools=tools,
prompt=CUSTOM_PROMPT
)
executor = AgentExecutor(agent=agent, tools=tools, memory=memory)
性能优化:异步记忆更新
在生产环境中,每次对话都同步写入 Mem0 会增加延迟。我们可以异步处理。
import asyncio
from threading import Thread
class AsyncMem0Memory(Mem0Memory):
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""异步保存上下文"""
super().save_context(inputs, outputs)
# 在后台线程中保存到 Mem0
def _async_save():
asyncio.run(self._save_to_mem0(inputs, outputs))
Thread(target=_async_save, daemon=True).start()
async def _save_to_mem0(self, inputs, outputs):
"""实际的保存逻辑"""
# 这里可以添加批处理、去重等优化
pass
推荐方案:使用 LangGraph 的 create_react_agent
LangGraph 提供了一个更现代、更简洁的方式来创建 ReAct 智能体。相比传统的 LangChain AgentExecutor,LangGraph 的API更简洁,且支持流式输出和更灵活的状态管理,人机交互, 支持function call,工具调用更准确。
高级:将 Mem0 集成到Langgraph State 中
更优雅的方式是将 Mem0 直接集成到 LangGraph 的状态管理中。
from langgraph.graph import StateGraph, MessagesState
from typing import TypedDict
class AgentStateWithMemory(MessagesState):
"""扩展的状态,包含 Mem0 上下文"""
mem0_context: str
def retrieve_memory_node(state: AgentStateWithMemory):
"""记忆检索节点"""
last_message = state["messages"][-1].content
memories = mem0.search(query=last_message, user_id=user_id, limit=5)
context = "\n".join([f"- {m['memory']}" for m in memories]) if memories else ""
state["mem0_context"] = context
return state
def agent_node(state: AgentStateWithMemory):
"""智能体节点"""
# 将记忆注入到消息中
messages = state["messages"].copy()
if state.get("mem0_context"):
messages.insert(0, SystemMessage(content=f"已知信息:\n{state['mem0_context']}"))
result = agent_executor.invoke({"messages": messages})
return {"messages": result["messages"]}
# 构建图
workflow = StateGraph(AgentStateWithMemory)
workflow.add_node("retrieve_memory", retrieve_memory_node)
workflow.add_node("agent", agent_node)
workflow.set_entry_point("retrieve_memory")
workflow.add_edge("retrieve_memory", "agent")
app = workflow.compile()
# 运行
result = app.invoke({
"messages": [HumanMessage(content="帮我查一下 ReAct 范式是什么")]
})
最佳实践
1. 用户隔离
永远按 user_id 隔离记忆:
# 错误:所有用户共享记忆
memory = Mem0Memory(user_id="global")
# 正确:每个用户独立记忆
memory = Mem0Memory(user_id=current_user.id)
2. 限制检索数量
不要一次加载太多记忆,会稀释相关性:
# 只检索最相关的 3-5 条
memories = self.memory.search(query, limit=3)
3. 定期清理
对于长期运行的智能体,定期清理过期记忆:
# 删除超过 30 天的记忆
memory.delete_old_memories(days=30)
下一步是什么?
我们已经掌握了如何将 Mem0 集成到生产级框架中。但这一切是如何工作的?Mem0 内部是如何管理记忆的?
在 下一篇文章 中,我们将揭开 Mem0 的引擎盖,深入研究其混合存储架构、工厂模式设计,以及为什么"向量 + 图"的组合如此强大。