市面上有多种开源或闭源项目可用于 RAG 的搭建,我在一一了解之后写下这篇总结

常见可用于搭建知识库的技术:

技术 存储占用 查询速度 适合数据规模 适用场景 事务支持
FAISS 低(优化存储) 🚀🚀🚀(最快,支持 GPU) 百万 ~ 十亿 高吞吐向量搜索 ❌ 无
Milvus 中等(支持磁盘索引) 🚀🚀(适合分布式) 百万 ~ 十亿 企业级向量数据库 ✅ 有
Elasticsearch (ES) 高(倒排索引开销大) 🚀(适中) 百万 ~ 十亿 关键词 + 向量混合搜索 ✅ 有
pgvector 高(受 PG 事务影响) 🚀(适中) 10 万 ~ 百万 轻量级 RAG 应用 ✅ 有

我整理的大概表格,呈现比较表面,
他只根据这些技术的配置支持说明大概阐明如 FAISS 可以用 GPU 加速、Milvus 可以水平拓宽…
实际上运用中查询速度快慢其实和底层使用的知识库索引方式有着巨大相关性。

常见的底层索引技术:

索引技术 存储占用 查询速度 查询准确性 适合数据规模 适用场景 备注
Flat 低(存储完整向量,无额外结构) 线性扫描,速度慢 完全准确 小规模(通常千级以内) 精确检索、测试原型 无预先构建开销,但数据量增大时性能急剧下降
IVFFlat 中等(存储聚类中心和分簇信息) 快速(先聚类定位,再局部暴力搜索) 较高(略有近似误差) 中到大规模(百万级) 高吞吐检索、RAG应用 需先进行聚类训练,nlist、nprobe 参数需要调优
IVFPQ 低(通过产品量化压缩向量存储) 非常快(计算量大幅降低) 中(量化引入误差) 中到大规模 存储受限、实时推荐、超大数据场景 在牺牲部分精度的前提下,大幅降低存储与计算成本
HNSW 较高(需额外存储图结构信息) 非常快(基于图遍历实现高效导航) 中到大规模 实时查询、在线推荐、低延迟场景 索引构建和调优较复杂,内存占用较高

主要使用内存

Flat:通常将完整向量存储在内存中进行线性扫描检索,适用于小规模数据,在数据量较小时完全驻留在内存中处理较为高效。

HNSW:需要额外存储图结构信息,通常这些图结构和数据都在内存中以实现基于图遍历的高效导航,索引构建和调优较复杂,内存占用较高。

磁盘和内存都使用

IVFFlat:存储聚类中心和分簇信息等数据,在处理中到大规模数据时,数据可能一部分在内存,一部分在磁盘,检索时先在内存中进行聚类定位,再对局部数据进行暴力搜索,可能涉及磁盘数据读取。

IVFPQ:通过产品量化压缩向量存储,以降低存储成本和计算量,在处理中到大规模数据时,数据可能根据实际情况分布在内存和磁盘上,在内存中进行计算和处理时可能需要从磁盘读取数据。

以上即为搭建知识库需要的背景知识

FAISS

我的第一个 RAG 应用就是通过 FAISS 搭建

当时只用于小文件解读,langchain 调用 FAISS 默认的索引是 Flat 即暴力搜索,对于文档内容也是 Python 库暴力读取文字,然后硬性分割,再调用 openai 的 embedding model 去向量化去嵌入,实现过程也比较简单了。

其中也没有涉及到什么调 ORC 或者本地部署的 Llava (现在还有 Janus )等来识别文档(图片),也省去了一部分并发加锁的操作。

FAISS 的索引方式有三种:Flat 暴力搜索,IVFFlat 聚类搜索,HNSW 图搜索

接下来的步骤中,包含文本切割等操作,用于其他技术情况下同理,故重复部分其他技术不再赘述

embedding model

前提操作先把 embedding model 拿到手:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

from langchain_openai import OpenAIEmbeddings
from logger import logger

def init_embedding(embeddings_name: str, base_url: str, api_key: str,**kwargs) -> OpenAIEmbeddings:
"""Init EMBEDDING"""
embeddings = OpenAIEmbeddings(
model=embeddings_name,
openai_api_base=base_url,
openai_api_key=api_key,
**kwargs
)

logger.debug(f"Init LLM: {embeddings.model}")
return embeddings

主线流程

加载进知识库的文档准备好

1
2
3
4
5
6
7
8
9
10
11
12
def update_vdb(index_file_path:str, mapping_file_path:str, directory_path:str, embeddings:OpenAIEmbeddings):
vector_store=vector_store_init(index_file_path,embeddings) # 初始化向量知识库
# 遍历目录下的所有文件
for root, _, files in os.walk(directory_path):
for file in files:
logger.info(f"Processing file: {file}")
file_path = os.path.join(root, file)
documents = load_and_split_documents(file_path,embeddings)
uuids = [str(uuid4()) for _ in range(len(documents))]
vector_store.add_documents(documents=documents, ids=uuids)
# 保存更新后的索引和映射
save_faiss_index(vector_store, index_file_path, mapping_file_path)

初始化向量数据库

初始化向量数据库

1
2
3
4
5
6
7
8
9
10
def vector_store_init(index_file_path:str,embeddings:OpenAIEmbeddings):
index=index_init(index_file_path,embeddings)# 初始化索引
vector_store = FAISS(
embedding_function=embeddings,
index=index,
docstore=InMemoryDocstore({}),
index_to_docstore_id={}
)
return vector_store

初始化索引

默认模式 默认为Flat 暴力搜索 (精确查找)

1
2
3
def index_init(index_file_path:str,embeddings:OpenAIEmbeddings):
index = faiss.IndexFlatL2(len(embeddings.embed_query(index_file_path)))
return index

IVFFlat 聚类搜索 (近似查找)

1
2
3
4
5
6
7
8
9
10
def index_init(index_file_path: str, embeddings: OpenAIEmbeddings):
sample_vector = embeddings.embed_query(index_file_path)
d = len(sample_vector)
# 创建量化器
quantizer = faiss.IndexFlatL2(d)
# 设置聚类中心的数量
nlist = 100
# 创建 IndexIVFFlat 索引
index = faiss.IndexIVFFlat(quantizer, d, nlist, faiss.METRIC_L2)
return index

HNSW 图搜索 (近似查找)

1
2
3
4
5
6
def index_init(index_file_path: str, embeddings: OpenAIEmbeddings):
sample_vector = embeddings.embed_query(index_file_path)
d = len(sample_vector)
# 创建 HNSW 索引
hnsw = faiss.IndexHNSWFlat(d, 32) # 32 是每个节点的最大连接数,可以根据需要调整
return hnsw

文本切割

硬性分割 fast:
直接根据字符串长度切割(最不好的,最快的)

百分位数 percentile:
默认的分割方式是基于百分位数。在这个方法中,
会计算所有句子之间的差异,然后将大于X百分位数的任何差异进行分割。

标准差 standard_deviation:
在这个方法中,任何大于X个标准差的差异都会被分割。

四分位距 interquartile:
在这个方法中,使用四分位距来分割文本块。

梯度 gradient:
在这个方法中,使用距离的梯度以及百分位数方法来分割文本块。
当文本块彼此高度相关或特定于某个领域(例如法律或医学)时,
此方法很有用。其理念是在梯度数组上应用异常检测,以便使分布更宽,
并更容易在高度语义化的数据中识别边界。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from langchain_core.documents.base import Document
from langchain_experimental.text_splitter import SemanticChunker
from langchain_text_splitters import RecursiveCharacterTextSplitter
from .embedding import init_embedding


def load_and_split_documents(file_path,embeddings,text_spliter_way:str ="fast"):
content = extract_text_from_file(file_path) #从文件中提取文本
if text_spliter_way not in ["fast","percentile","interquartile","standard_deviation","gradient"]:
return ValueError

'''分割方法判别'''
if text_spliter_way=="fast": #硬性分割
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50
)
texts = text_splitter.split_text(content)
documents = [Document(page_content=t,metadata={"source": file_path}) for t in texts]
return documents

else: #其他分割方法
text_splitter = SemanticChunker(embeddings,breakpoint_threshold_type=text_spliter_way)
docs = text_splitter.create_documents([content])
return docs

if __name__ == "__main__":
file_path = "。。。"
from env import OPENAI_API_KEY,OPENAI_EMBEDDING_MODEL,OPENAI_BASE_URL
embeddings = init_embedding(。。。)
documents = load_and_split_documents(file_path, embeddings,"percentile")
for doc in documents:
print(doc.page_content)

持久化

虽然说 FAISS 本身是基于内存的,但是为了方便,还是需要持久化

把 FAISS 索引和映射文件保存到指定的路径

mapping 文件保存了文档的索引到文档存储的映射关系,用于在加载时恢复索引。

1
2
3
4
5
6
7
8
def save_faiss_index(vector_store, index_file_path, mapping_file_path):
faiss.write_index(vector_store.index, index_file_path)
with open(mapping_file_path, 'wb') as f:
pickle.dump({
'docstore': vector_store.docstore,
'index_to_docstore_id': vector_store.index_to_docstore_id
}, f)

加载

使用的时候,把他重新加载到内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14

def load_faiss_index(index_file_path, mapping_file_path, embeddings):
index = faiss.read_index(index_file_path)
with open(mapping_file_path, 'rb') as f:
mapping = pickle.load(f)
docstore = mapping['docstore']
index_to_docstore_id = mapping['index_to_docstore_id']
vector_store = FAISS(
embedding_function=embeddings,
index=index,
docstore=docstore,
index_to_docstore_id=index_to_docstore_id
)
return vector_store

查询

1
2
3
4
5
#此处可做问题生成多个问题 达到广覆盖

results = vector_store.search(req.message, search_type="similarity", k=1)

#此处可调用重排模型 如bge-rerank-v2-m3

Milvus

Milvus 是一个开源的向量数据库,专为大规模相似性搜索和 AI 应用设计

与 FAISS 相比,Milvus 提供了更完整的数据库功能(更像一个 DB,FAISS 有点像插件),包括持久化、高可用性、水平扩展和事务支持。它不仅仅是一个向量索引库,而是一个功能齐全的向量数据库系统。

支持的索引类型:

  • HNSW
  • IVF_FLAT
  • IVF_PQ
  • FLAT

安装 Milvus

Milvus 可以通过 Docker 或 Kubernetes 部署:

1
2
3
# 使用 Docker Compose 安装 Milvus Standalone
wget https://github.com/milvus-io/milvus/releases/download/v2.3.3/milvus-standalone-docker-compose.yml -O docker-compose.yml
docker-compose up -d

通过 Python SDK 连接 Milvus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from pymilvus import connections, Collection, utility
from logger import logger

def connect_to_milvus(host="localhost", port="19530"):
"""连接到 Milvus 服务器"""
try:
connections.connect(
alias="default",
host=host,
port=port
)
logger.info(f"成功连接到 Milvus 服务器 {host}:{port}")
return True
except Exception as e:
logger.error(f"连接 Milvus 失败: {e}")
return False

embedding model

与 FAISS 类似,我们也需要初始化 embedding 模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from langchain_openai import OpenAIEmbeddings
from logger import logger

def init_embedding(embeddings_name: str, base_url: str, api_key: str, **kwargs) -> OpenAIEmbeddings:
"""初始化 EMBEDDING"""
embeddings = OpenAIEmbeddings(
model=embeddings_name,
openai_api_base=base_url,
openai_api_key=api_key,
**kwargs
)

logger.debug(f"初始化 LLM: {embeddings.model}")
return embeddings

创建集合

在 Milvus 中,我们需要创建一个集合来存储向量数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from pymilvus import CollectionSchema, FieldSchema, DataType

def create_collection(collection_name, dimension):
"""创建 Milvus 集合"""
if utility.has_collection(collection_name):
logger.info(f"集合 {collection_name} 已存在")
return Collection(collection_name)

# 定义字段
fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=100),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=20000),
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=1000),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=dimension)
]

# 创建集合模式
schema = CollectionSchema(fields=fields, description=f"文档嵌入集合 {collection_name}")

# 创建集合
collection = Collection(name=collection_name, schema=schema)
logger.info(f"成功创建集合 {collection_name}")

# 创建索引
index_params = {
"index_type": "HNSW",
"metric_type": "L2",
"params": {"M": 8, "efConstruction": 64}
}
collection.create_index(field_name="embedding", index_params=index_params)
logger.info(f"已为 {collection_name} 创建 HNSW 索引")

return collection

主线流程

加载文档并更新向量数据库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from uuid import uuid4
import os

def update_milvus_db(collection_name, directory_path, embeddings):
"""更新 Milvus 向量数据库"""
# 确保连接到 Milvus
if not connections.has_connection("default"):
connect_to_milvus()

# 获取 embedding 维度
sample_text = "测试文本"
sample_vector = embeddings.embed_query(sample_text)
dimension = len(sample_vector)

# 初始化集合
collection = create_collection(collection_name, dimension)

# 遍历目录下的所有文件
for root, _, files in os.walk(directory_path):
for file in files:
logger.info(f"处理文件: {file}")
file_path = os.path.join(root, file)
documents = load_and_split_documents(file_path, embeddings)

# 准备数据
ids = [str(uuid4()) for _ in range(len(documents))]
texts = [doc.page_content for doc in documents]
sources = [doc.metadata.get("source", file_path) for doc in documents]
embeddings_list = embeddings.embed_documents(texts)

# 插入数据
entities = [
ids,
texts,
sources,
embeddings_list
]

collection.insert(entities)
logger.info(f"已将 {len(documents)} 个文档从 {file_path} 添加到 Milvus")

# 加载集合以确保数据可用
collection.load()
logger.info(f"集合 {collection_name} 已加载并准备查询")

return collection

不同索引类型的实现

Milvus 支持多种索引类型,我们可以根据需求选择:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def create_index(collection, index_type="HNSW"):
"""为集合创建不同类型的索引"""
if index_type == "FLAT":
index_params = {
"index_type": "FLAT",
"metric_type": "L2",
"params": {}
}
elif index_type == "IVF_FLAT":
index_params = {
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {"nlist": 1024}
}
elif index_type == "IVF_PQ":
index_params = {
"index_type": "IVF_PQ",
"metric_type": "L2",
"params": {"nlist": 1024, "m": 8, "nbits": 8}
}
elif index_type == "HNSW":
index_params = {
"index_type": "HNSW",
"metric_type": "L2",
"params": {"M": 8, "efConstruction": 64}
}
else:
raise ValueError(f"不支持的索引类型: {index_type}")

collection.create_index(field_name="embedding", index_params=index_params)
logger.info(f"已为集合创建 {index_type} 索引")

文本切割

文本切割方法与 FAISS 部分相同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from langchain_core.documents.base import Document
from langchain_experimental.text_splitter import SemanticChunker
from langchain_text_splitters import RecursiveCharacterTextSplitter

def load_and_split_documents(file_path, embeddings, text_spliter_way:str ="fast"):
content = extract_text_from_file(file_path) #从文件中提取文本
if text_spliter_way not in ["fast","percentile","interquartile","standard_deviation","gradient"]:
return ValueError

'''分割方法判别'''
if text_spliter_way=="fast": #硬性分割
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50
)
texts = text_splitter.split_text(content)
documents = [Document(page_content=t,metadata={"source": file_path}) for t in texts]
return documents

else: #其他分割方法
text_splitter = SemanticChunker(embeddings,breakpoint_threshold_type=text_spliter_way)
docs = text_splitter.create_documents([content])
return docs

持久化与数据管理

与 FAISS 不同,Milvus 自身提供了数据持久化功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def manage_milvus_collection(collection_name, operation="release"):
"""管理 Milvus 集合的加载和释放"""
if not utility.has_collection(collection_name):
logger.warning(f"集合 {collection_name} 不存在")
return False

collection = Collection(collection_name)

if operation == "load":
collection.load()
logger.info(f"集合 {collection_name} 已加载到内存")
return True
elif operation == "release":
collection.release()
logger.info(f"集合 {collection_name} 已从内存释放")
return True
elif operation == "drop":
utility.drop_collection(collection_name)
logger.info(f"集合 {collection_name} 已删除")
return True
else:
logger.error(f"不支持的操作: {operation}")
return False

查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def search_in_milvus(collection_name, query_text, embeddings, top_k=5, search_params=None):
"""在 Milvus 中搜索最相似的文档"""
if not utility.has_collection(collection_name):
logger.error(f"集合 {collection_name} 不存在")
return []

collection = Collection(collection_name)

# 确保集合已加载
if not collection.is_loaded():
collection.load()

# 将查询文本转换为向量
query_vector = embeddings.embed_query(query_text)

# 设置默认搜索参数
if search_params is None:
search_params = {"ef": 64} # 对于 HNSW 索引

# 执行搜索
search_results = collection.search(
data=[query_vector],
anns_field="embedding",
param=search_params,
limit=top_k,
output_fields=["id", "text", "source"]
)

# 处理结果
results = []
for hits in search_results:
for hit in hits:
results.append({
"id": hit.id,
"text": hit.entity.get("text"),
"source": hit.entity.get("source"),
"score": hit.score
})

return results

将 Milvus 与 LangChain 集成

可以创建一个自定义的检索器类,将 Milvus 与 LangChain 集成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from langchain_core.vectorstores import VectorStoreRetriever
from langchain_core.documents import Document

class MilvusRetriever(VectorStoreRetriever):
def __init__(self, collection_name, embeddings, top_k=5):
self.collection_name = collection_name
self.embeddings = embeddings
self.top_k = top_k

def get_relevant_documents(self, query):
results = search_in_milvus(
self.collection_name,
query,
self.embeddings,
self.top_k
)

documents = []
for result in results:
doc = Document(
page_content=result["text"],
metadata={"source": result["source"], "score": result["score"]}
)
documents.append(doc)

return documents

async def aget_relevant_documents(self, query):
return self.get_relevant_documents(query)

这样,我们就可以像使用 FAISS 一样使用 Milvus 向量数据库进行 RAG 应用的构建。
Milvus 特别适合需要处理大规模向量数据并需要持久化、高可用和事务支持的企业级应用。

Postgresql 的 Pgvector

我在操作这个之前呢,其实有想过原来的表的那些索引还有分表,能不能在 pgvector 上复用。
比如说我垂直分表,是根据我的用户ID的索引去区分不同用户数据,来构建各自隔离的知识库。

但实际操作中,发现 pgvector 仅仅算是一个插件,他只是可以把表当做向量来使用,但是不能复用原来的索引。
想实现这种功能还需要在这张表(或另一张表)里增加一个向量字段(例如 embedding vector(768)),并在其上建立向量索引,然后才能做 ORDER BY embedding <-> ‘[query_vec]’ 这样的相似度排序查询。

支持的索引类型:

  • HNSW
  • IVF_FLAT
  • ANN
  • 暴力搜索

比如说这是我的表结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class historySchema(BaseSchema):
__tablename__ = "history"
hid = Column(Integer, primary_key=True, autoincrement=True)
uid = Column(Integer, ForeignKey(UserSchema.uid, ondelete="CASCADE"), nullable=False)
sid = Column(Integer, ForeignKey(ChatSessionSchema.sid, ondelete="CASCADE"), nullable=False)
create_at = Column(DateTime, default=datetime.now())
is_deleted = Column(Boolean, nullable=False, default=False)
ip = Column(String(255), nullable=False)
user_api_key = Column(String(255), nullable=False)
user_base_url = Column(String(255), nullable=False, default="")
llm_model = Column(String(255), nullable=False, default="")
usermessage = Column(String(4096), nullable=False)
botmessage = Column(String(4096), nullable=False)

想用 pgvector,需要向量列 + 向量索引
pgvector 是 PostgreSQL 里的一个扩展,可以让你在表中存储一个向量类型的字段,并在其上建立向量索引(如 ivfflat、hnsw 等),然后使用 ORDER BY column <-> ‘[query_vec]’ 进行相似度排序查询。

想让你的表支持向量检索,需要在 history 表中 增加一个存储嵌入向量的列,例如:

1
2
3
-- PostgreSQL层面
ALTER TABLE history
ADD COLUMN embedding vector(768);

然后在 SQLAlchemy 的模型里对应加上:

1
2
3
4
5
6
7
8
from sqlalchemy.dialects.postgresql import VECTOR

class historySchema(BaseSchema):
__tablename__ = "history"
# ... 原有字段

# 新增向量列
embedding = Column(VECTOR(768)) # 例如 768 维度

接着,在 PostgreSQL 里为这个向量列建立向量索引:

1
2
3
CREATE INDEX ON history
USING ivfflat (embedding vector_l2_ops)
WITH (lists = 100);

这样就可以执行类似:

1
2
3
4
5
SELECT *
FROM history
WHERE uid = :user_id
ORDER BY embedding <-> '[query_vector]'
LIMIT 10;

来拿到同一个用户下与 query_vector 最相似的消息记录,从而实现“针对某个用户的语义检索”。

综合以上,你会发现你需要在 pgvector 上重新建立索引,然后才能使用。
并不能对原来的表和数据进行复用。

但好处是可以跟操作数据库一样,对向量进行操作。

小总结

FAISS、Milvus、Pgvector 都是基于向量数据库的实现,
FAISS 功能比较纯粹,就是向量检索,
Milvus 功能比较全面,支持向量检索、持久化、高可用、水平扩展和事务支持。
Pgvector 是 PostgreSQL 的扩展,让操作数据库的语法可以操作向量。