Milvus、Chroma、Pinecone实战指南

前言

向量数据库是专门存储和检索高维向量的数据库系统,是RAG、推荐系统、图像搜索等应用的核心组件。本文介绍主流向量数据库的使用方法。


向量数据库概述

为什么需要向量数据库

传统数据库 向量数据库
精确匹配查询 相似性搜索
SQL/关键词检索 ANN近似最近邻
行列结构 高维向量存储
毫秒级精确查询 毫秒级语义搜索

主流向量数据库对比

数据库 类型 特点 适用场景
Chroma 嵌入式 轻量、Python原生 原型开发、小规模
FAISS Meta开源、高性能 研究、单机大规模
Milvus 分布式 功能全面、可扩展 生产环境
Pinecone 云服务 全托管、易用 快速上线
Weaviate 开源 多模态支持 混合搜索
Qdrant 开源 Rust实现、高性能 生产环境

核心原理:索引算法 (Indexing)

向量数据库之所以快,是因为它不进行全量扫描,而是使用 ANN (Approximate Nearest Neighbor, 近似最近邻) 算法。

1. HNSW (Hierarchical Navigable Small World)

目前工业界最主流的索引算法。

  • 原理:构建多层图结构。顶层节点稀疏,用于快速定位大致区域;底层节点密集,用于精确查找。
  • 优点:检索速度极快,召回率高。
  • 缺点:内存消耗大(需要存储图结构)。

2. IVFFlat (Inverted File Index)

  • 原理:使用 K-Means 将向量空间划分为多个聚类(Voronoi Cells)。查询时先找到最近的聚类中心,只在该聚类内搜索。
  • 优点:内存占用低,适合超大规模数据。
  • 缺点:需要预训练聚类中心,召回率略低于 HNSW。

3. PQ (Product Quantization, 乘积量化)

  • 原理:将高维向量切分成多个子向量,对子向量进行聚类压缩。
  • 优点:极大地压缩存储空间(可达 10-100 倍)。

在生产环境中,单纯的向量搜索(语义搜索)有时会失效(例如搜索特定的产品型号、人名)。混合搜索结合了向量搜索和传统的关键词搜索(BM25)。

# 以 Qdrant 为例的混合搜索逻辑
from qdrant_client import QdrantClient
from qdrant_client.http import models

client = QdrantClient(":memory:")

# 混合搜索查询
search_result = client.query_points(
    collection_name="products",
    prefetch=[
        # 1. 向量搜索 (Dense)
        models.Prefetch(
            query=[0.1, 0.2, 0.3, ...], 
            limit=20
        ),
        # 2. 关键词搜索 (Sparse/BM25)
        models.Prefetch(
            query=models.SparseVector(indices=[1, 5], values=[0.5, 0.3]),
            using="sparse",
            limit=20
        ),
    ],
    # 3. 使用 RRF (Reciprocal Rank Fusion) 合并结果
    query=models.FusionQuery(fusion=models.Fusion.RRF),
)

过滤策略:Pre-filtering vs Post-filtering

当你需要执行类似 查找 2024 年之后发布的、关于 AI 的文档 这样的查询时:

策略 流程 缺点
Post-filtering 先做向量搜索,再过滤元数据 可能导致返回结果不足(过滤后剩下的太少)
Pre-filtering 先过滤元数据,再在子集中做向量搜索 推荐做法,索引必须支持标量过滤

Chroma 快速入门

安装与基础使用

pip install chromadb
import chromadb
from chromadb.utils import embedding_functions

# 创建客户端
client = chromadb.Client()  # 内存模式
# client = chromadb.PersistentClient(path="./chroma_db")  # 持久化

# 创建集合(使用默认Embedding)
collection = client.create_collection(
    name="my_documents",
    metadata={"description": "文档集合"}
)

# 添加文档
collection.add(
    documents=[
        "机器学习是人工智能的一个分支",
        "深度学习使用神经网络进行学习",
        "向量数据库用于存储和检索向量",
        "RAG结合检索和生成提升回答质量"
    ],
    ids=["doc1", "doc2", "doc3", "doc4"],
    metadatas=[
        {"source": "wiki", "topic": "ML"},
        {"source": "wiki", "topic": "DL"},
        {"source": "blog", "topic": "DB"},
        {"source": "paper", "topic": "LLM"}
    ]
)

print(f"集合中有 {collection.count()} 个文档")

查询操作

# 语义搜索
results = collection.query(
    query_texts=["什么是神经网络"],
    n_results=3,
    include=["documents", "distances", "metadatas"]
)

print("搜索结果:")
for i, (doc, distance, metadata) in enumerate(zip(
    results['documents'][0],
    results['distances'][0],
    results['metadatas'][0]
)):
    print(f"{i+1}. [{distance:.4f}] {doc}")
    print(f"   元数据: {metadata}")

# 带过滤条件的查询
results = collection.query(
    query_texts=["人工智能技术"],
    n_results=2,
    where={"topic": "ML"}  # 元数据过滤
)

使用自定义Embedding

from sentence_transformers import SentenceTransformer

# 自定义Embedding函数
class CustomEmbeddingFunction:
    def __init__(self, model_name="BAAI/bge-large-zh-v1.5"):
        self.model = SentenceTransformer(model_name)
    
    def __call__(self, input):
        return self.model.encode(input).tolist()

# 创建使用自定义Embedding的集合
custom_ef = CustomEmbeddingFunction()
collection = client.create_collection(
    name="custom_embedding_collection",
    embedding_function=custom_ef
)

Milvus 生产级部署

Docker部署

# 单机版部署
docker run -d --name milvus \
  -p 19530:19530 \
  -p 9091:9091 \
  -v $(pwd)/milvus/db:/var/lib/milvus/db \
  milvusdb/milvus:latest \
  milvus run standalone

Python SDK使用

pip install pymilvus
from pymilvus import (
    connections, Collection, FieldSchema, 
    CollectionSchema, DataType, utility
)

# 连接Milvus
connections.connect(host="localhost", port="19530")

# 定义Schema
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
    FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=256),
]
schema = CollectionSchema(fields=fields, description="文档集合")

# 创建集合
collection_name = "documents"
if utility.has_collection(collection_name):
    utility.drop_collection(collection_name)

collection = Collection(name=collection_name, schema=schema)

# 创建索引
index_params = {
    "metric_type": "COSINE",
    "index_type": "IVF_FLAT",
    "params": {"nlist": 128}
}
collection.create_index(field_name="embedding", index_params=index_params)

print(f"集合 {collection_name} 创建成功")

数据操作

from sentence_transformers import SentenceTransformer
import numpy as np

# 加载Embedding模型
model = SentenceTransformer("BAAI/bge-large-zh-v1.5")

# 准备数据
texts = [
    "Python是一种流行的编程语言",
    "机器学习需要大量数据训练",
    "向量数据库支持相似性搜索",
    "RAG可以提升LLM的回答质量"
]
embeddings = model.encode(texts)
sources = ["doc", "doc", "blog", "paper"]

# 插入数据
data = [
    texts,
    embeddings.tolist(),
    sources
]
collection.insert([texts, embeddings.tolist(), sources])
collection.flush()

print(f"插入 {collection.num_entities} 条数据")

# 加载集合到内存
collection.load()

# 搜索
query = "如何进行语义搜索"
query_embedding = model.encode([query])

search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}
results = collection.search(
    data=query_embedding.tolist(),
    anns_field="embedding",
    param=search_params,
    limit=3,
    output_fields=["text", "source"]
)

print(f"\n查询: {query}")
print("搜索结果:")
for hits in results:
    for hit in hits:
        print(f"  [{hit.score:.4f}] {hit.entity.get('text')}")

FAISS 本地高性能检索

基础使用

pip install faiss-cpu  # CPU版本
# pip install faiss-gpu  # GPU版本
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer

# 准备数据
model = SentenceTransformer("BAAI/bge-large-zh-v1.5")
texts = [
    "机器学习基础概念",
    "深度学习神经网络",
    "自然语言处理技术",
    "计算机视觉应用",
    "强化学习算法"
]
embeddings = model.encode(texts).astype('float32')

# 创建索引
dimension = embeddings.shape[1]
index = faiss.IndexFlatIP(dimension)  # 内积索引

# 归一化向量(用于余弦相似度)
faiss.normalize_L2(embeddings)
index.add(embeddings)

print(f"索引中有 {index.ntotal} 个向量")

# 搜索
query = "什么是神经网络"
query_vec = model.encode([query]).astype('float32')
faiss.normalize_L2(query_vec)

k = 3
distances, indices = index.search(query_vec, k)

print(f"\n查询: {query}")
print("搜索结果:")
for i, (idx, dist) in enumerate(zip(indices[0], distances[0])):
    print(f"  {i+1}. [{dist:.4f}] {texts[idx]}")

高效索引类型

# IVF索引(适合大规模数据)
nlist = 100  # 聚类中心数量
quantizer = faiss.IndexFlatL2(dimension)
index_ivf = faiss.IndexIVFFlat(quantizer, dimension, nlist)
index_ivf.train(embeddings)  # 需要训练
index_ivf.add(embeddings)
index_ivf.nprobe = 10  # 搜索时探测的聚类数

# HNSW索引(高召回率)
index_hnsw = faiss.IndexHNSWFlat(dimension, 32)  # 32是图的连接数
index_hnsw.add(embeddings)

# PQ索引(压缩存储)
m = 8  # 子向量数量
index_pq = faiss.IndexPQ(dimension, m, 8)
index_pq.train(embeddings)
index_pq.add(embeddings)

保存和加载索引

# 保存索引
faiss.write_index(index, "my_index.faiss")

# 加载索引
loaded_index = faiss.read_index("my_index.faiss")

Qdrant 云原生向量数据库

Docker部署

docker run -p 6333:6333 -p 6334:6334 \
  -v $(pwd)/qdrant_storage:/qdrant/storage \
  qdrant/qdrant

Python使用

pip install qdrant-client
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from sentence_transformers import SentenceTransformer

# 连接Qdrant
client = QdrantClient(host="localhost", port=6333)

# 创建集合
collection_name = "documents"
client.recreate_collection(
    collection_name=collection_name,
    vectors_config=VectorParams(size=768, distance=Distance.COSINE)
)

# 准备数据
model = SentenceTransformer("BAAI/bge-large-zh-v1.5")
texts = [
    "Python编程语言入门",
    "机器学习算法详解",
    "深度学习实战项目"
]
embeddings = model.encode(texts)

# 插入数据
points = [
    PointStruct(
        id=i,
        vector=embedding.tolist(),
        payload={"text": text, "index": i}
    )
    for i, (text, embedding) in enumerate(zip(texts, embeddings))
]
client.upsert(collection_name=collection_name, points=points)

# 搜索
query = "如何学习编程"
query_vec = model.encode([query])[0]

results = client.search(
    collection_name=collection_name,
    query_vector=query_vec.tolist(),
    limit=3
)

print(f"查询: {query}")
for result in results:
    print(f"  [{result.score:.4f}] {result.payload['text']}")

向量数据库选型指南

需求 推荐方案
快速原型 Chroma
单机大规模 FAISS
生产分布式 Milvus / Qdrant
全托管服务 Pinecone
混合搜索 Weaviate

性能考量

因素 说明
数据规模 百万级以下Chroma足够,亿级需要Milvus
查询延迟 HNSW索引延迟低,IVF索引可调节
内存占用 PQ压缩可减少10x存储
更新频率 频繁更新选择支持实时插入的数据库

常见问题

Q1: 向量数据库和传统数据库可以结合使用吗?

可以。常见模式是向量数据库存储向量和ID,元数据存储在传统数据库中。

Q2: 如何处理向量更新?

大多数向量数据库支持upsert操作,可以通过ID更新已有向量。

Q3: 索引构建需要多长时间?

取决于数据量和索引类型。IVF需要训练,HNSW构建较慢但查询快。

Q4: 如何评估检索质量?

  • Recall@K:Top-K中包含正确结果的比例
  • MRR:平均倒数排名
  • NDCG:归一化折损累积增益

生产环境考量

在将向量数据库投入生产时,需要关注以下维度:

1. 内存管理

向量索引(尤其是 HNSW)通常驻留在内存中。

  • 估算公式内存 ≈ 向量数量 * 维度 * 4 字节 * 1.5 (索引开销)
  • 优化:使用磁盘索引(如 Milvus 的 DiskANN)或量化技术(PQ/SQ)。

2. 数据一致性

  • 最终一致性:写入后可能需要几秒钟才能被检索到(常见于分布式系统)。
  • 强一致性:写入即刻可见,但会降低写入吞吐量。

3. 多租户隔离

如果你的应用服务于多个客户,必须确保数据隔离。

  • 方案 A:每个客户一个 Collection(适合客户少、数据多)。
  • 方案 B:使用 partition_key 或元数据过滤(适合客户多、数据少)。

总结

数据库 特点 推荐场景
Chroma 简单易用 开发测试
FAISS 高性能库 研究、单机
Milvus 功能全面 生产环境
Qdrant 云原生 K8s部署

参考资料

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:《 LLM应用开发——向量数据库 》

本文链接:http://localhost:3015/ai/%E5%90%91%E9%87%8F%E6%95%B0%E6%8D%AE%E5%BA%93.html

本文最后一次更新为 天前,文章中的某些内容可能已过时!