在前两篇文章中,我们已经达成了共识:“Milvus + BGE-base + BGE-Reranker” 是目前构建中文 RAG 系统的高性价比黄金组合。今天,我们将使用 LangChain 框架将这三个组件串联起来,构建一个真正具备“漏斗式检索”能力的 RAG pipline。
准备工作
在开始之前,请确保你已经安装了 Docker 并启动了 Milvus 实例。同时,你需要安装以下 Python 库:
pip install langchain langchain-community langchain-huggingface pymilvus sentence-transformers
注:我们将使用最新的 langchain-huggingface 库来加载模型,这是 LangChain 官方推荐的现代化方式。
核心架构设计
我们的代码将遵循以下数据流:
- 加载与切分 (Load & Split): 将长文档切分成 500 token 左右的小块。
- 向量化 (Embed): 使用
BGE-base-zh生成向量,并开启归一化。 - 存储 (Store): 存入 Milvus Collection。
- 粗排 (Retrieve): 召回 Top 10 相关文档。
- 精排 (Rerank): 使用
BGE-Reranker对 Top 10 进行打分,筛选出 Top 3。 - 生成 (Generate): 将 Top 3 喂给 LLM 生成答案。
代码实战:Step-by-Step
步骤 1:初始化 Embedding 模型
这是极其关键的一步。正如上一篇所说,BGE 推荐使用点积距离,且需要对向量进行归一化。
import torch
from langchain_huggingface import HuggingFaceEmbeddings
# 检测是否有 GPU
device = "cuda" if torch.cuda.is_available() else "cpu"
# 加载 BGE Embedding (双塔模型)
print(f"Loading Embedding Model on {device}...")
embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-base-zh-v1.5",
model_kwargs={'device': device},
# 【关键点】开启归一化,这对 BGE 的检索效果至关重要
encode_kwargs={'normalize_embeddings': True}
)
步骤 2:连接 Milvus 并处理数据
我们将模拟一些关于“RAG 技术”的文档数据,切分并存入 Milvus。
from langchain_milvus import Milvus
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
# 1. 准备模拟数据
raw_texts = [
"RAG (检索增强生成) 是一种结合了检索系统和生成模型的技术。",
"Milvus 是一个开源的向量数据库,支持百亿级向量存储。",
"BGE 模型由智源研究院发布,在 C-MTEB 榜单上表现优异。",
"Cross-Encoder (Reranker) 相比 Bi-Encoder 精度更高,但计算速度较慢。",
"在 RAG 中,通常使用 Embedding 做粗排,Reranker 做精排。",
"LangChain 是一个强大的框架,用于构建 LLM 应用。",
"今天天气不错,适合去公园散步。", # 干扰项
"苹果公司发布了最新的 iPhone。", # 干扰项
]
documents = [Document(page_content=text) for text in raw_texts]
# 2. 文本切分 (Chunking)
# 生产环境中,chunk_size 通常设为 300-500,overlap 设为 50-100
text_splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=10)
split_docs = text_splitter.split_documents(documents)
# 3. 存入 Milvus
print("Indexing data into Milvus...")
vector_store = Milvus(
embedding_function=embeddings,
connection_args={"host": "localhost", "port": "19530"},
collection_name="rag_advanced_demo",
auto_id=True,
drop_old=True # 仅供演示,生产环境请慎用
)
vector_store.add_documents(split_docs)
print("Data indexed successfully.")
步骤 3:配置 Reranker (精排机制)
这是区分“玩具 Demo”和“生产级代码”的分水岭。我们使用 ContextualCompressionRetriever 来实现重排序逻辑。
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
# 1. 加载 Reranker 模型 (单塔模型)
print("Loading Reranker Model...")
reranker_model = HuggingFaceCrossEncoder(
model_name="BAAI/bge-reranker-base",
model_kwargs={'device': device}
)
# 2. 定义压缩器 (Compressor)
# 这里所谓的“压缩”,实际上是指“筛选 Top N”
compressor = CrossEncoderReranker(model=reranker_model, top_n=3)
# 3. 定义基础检索器 (Base Retriever) - 负责粗排
# 从 Milvus 召回 Top 10
base_retriever = vector_store.as_retriever(
search_type="similarity",
search_kwargs={"k": 10}
)
# 4. 组合成精排检索器
# 流程:Query -> Milvus(Top10) -> Reranker(打分) -> Top3
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=base_retriever
)
步骤 4:构建 RAG 链并运行
最后,我们将精排后的结果喂给 LLM。
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
# 这里使用 OpenAI 协议,你可以替换为本地的 vLLM 或 Ollama 地址
llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0,
openai_api_key="sk-xxxx", # 替换你的 Key
base_url="https://api.openai.com/v1"
)
# 定义 Prompt
template = """你是一个智能助手。请根据以下上下文回答问题。如果不清楚,请说不知道。
上下文:
{context}
问题:{question}
"""
prompt = ChatPromptTemplate.from_template(template)
def format_docs(docs):
return "\n\n".join([d.page_content for d in docs])
# 构建 Chain
rag_chain = (
{"context": compression_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
# --- 测试运行 ---
query = "RAG 系统中 Embedding 和 Reranker 有什么区别?"
print(f"\nUser Query: {query}")
print("-" * 50)
# 1. 查看精排结果
print("Retrieved Docs (After Reranking):")
retrieved_docs = compression_retriever.invoke(query)
for i, doc in enumerate(retrieved_docs):
# Reranker 的分数通常保存在 metadata 或 state 中,LangChain 处理方式略有不同
# 这里我们只打印内容验证相关性
print(f"[{i+1}] {doc.page_content}")
print("-" * 50)
# 2. 生成最终回答
try:
print("LLM Response:")
response = rag_chain.invoke(query)
print(response)
except Exception as e:
print(f"LLM 调用需配置 API Key: {e}")
代码解读:为什么这么写?
normalize_embeddings=True:- BGE 的预训练使用了对比学习,其 loss function 基于 Softmax 归一化。因此,在推理时开启归一化,能让点积距离(Inner Product)完美等价于余弦相似度,显著提升召回准确率。
- 归一化是什么: 把向量长度变为 1 的数学操作。消除句子长短的影响,只看语义是否匹配。归一化就像是一个“自动音量平衡器”。它把你的声音调小,把我的声音调大,让我们两个人的最大音量都限制在 100% (也就是 1)。
- 为了什么: 消除文本长度、词频等干扰,让模型专注于语义(方向)。
- 工程意义: 归一化后,使用简单的点积(IP) 就能算出精准的余弦相似度,极大提升了检索性能。我们想要 余弦相似度 的准确性(只看语义)。我们想要 点积 的计算速度(Milvus 算点积极快)。归一化 让我们实现了“既要又要"。
k=10vstop_n=3:- 这是一个典型的“漏斗”设计。Milvus 负责快速把范围缩小到 10 个(包含可能的噪音),Reranker 负责慢工出细活,把真正高分的 3 个挑出来。如果直接用 Milvus 搜 Top 3,很可能前 3 名里并没有最佳答案(因为向量距离有时候不如交叉编码精准)。
ContextualCompressionRetriever:- LangChain 的命名比较抽象,初学者容易晕。你只需要记住:在 LangChain 里,凡是涉及到**“对检索回来的结果进行后处理(过滤、重排、重写)”**的操作,都统称为 Contextual Compression。
总结
至此,你已经拥有了一套代码结构清晰、符合工业界最佳实践的 RAG 系统基座。
但是,这套系统是“静态”的。它假设所有的知识都是平等的。但在现实世界中,昨天发布的新闻一定比 5 年前的旧闻更重要。
如果用户问“最新的 iPhone 是什么?”,而你的数据库里既有 iPhone 6 的文档,又有 iPhone 15 的文档,Milvus 很可能会根据语义相似度(都包含“iPhone”、“发布”等词)把它们都搜出来。
如何让向量数据库具备“时间观念”?如何实现时间衰减(Time Decay)?
下一篇,我们将探讨这个 Milvus 官方文档里没有告诉你的高级技巧。