Milvus、Chroma、Pinecone实战指南

前言

向量数据库是专门存储和检索高维向量的数据库系统,是RAG、推荐系统、图像搜索等应用的核心组件。本文介绍主流向量数据库的使用方法。


向量数据库概述

为什么需要向量数据库

传统数据库向量数据库
精确匹配查询相似性搜索
SQL/关键词检索ANN近似最近邻
行列结构高维向量存储
毫秒级精确查询毫秒级语义搜索

主流向量数据库对比

数据库类型特点适用场景
Chroma嵌入式轻量、Python原生原型开发、小规模
FAISSMeta开源、高性能研究、单机大规模
Milvus分布式功能全面、可扩展生产环境
Pinecone云服务全托管、易用快速上线
Weaviate开源多模态支持混合搜索
Qdrant开源Rust实现、高性能生产环境

核心原理:索引算法 (Indexing)

向量数据库之所以快,是因为它不进行全量扫描,而是使用 ANN (Approximate Nearest Neighbor, 近似最近邻) 算法。

1. HNSW (Hierarchical Navigable Small World)

目前工业界最主流的索引算法。

  • 原理:构建多层图结构。顶层节点稀疏,用于快速定位大致区域;底层节点密集,用于精确查找。
  • 优点:检索速度极快,召回率高。
  • 缺点:内存消耗大(需要存储图结构)。

2. IVFFlat (Inverted File Index)

  • 原理:使用 K-Means 将向量空间划分为多个聚类(Voronoi Cells)。查询时先找到最近的聚类中心,只在该聚类内搜索。
  • 优点:内存占用低,适合超大规模数据。
  • 缺点:需要预训练聚类中心,召回率略低于 HNSW。

3. PQ (Product Quantization, 乘积量化)

  • 原理:将高维向量切分成多个子向量,对子向量进行聚类压缩。
  • 优点:极大地压缩存储空间(可达 10-100 倍)。

混合搜索 (Hybrid Search)

在生产环境中,单纯的向量搜索(语义搜索)有时会失效(例如搜索特定的产品型号、人名)。混合搜索结合了向量搜索和传统的关键词搜索(BM25)。

# 以 Qdrant 为例的混合搜索逻辑
from qdrant_client import QdrantClient
from qdrant_client.http import models

client = QdrantClient(":memory:")

# 混合搜索查询
search_result = client.query_points(
    collection_name="products",
    prefetch=[
        # 1. 向量搜索 (Dense)
        models.Prefetch(
            query=[0.1, 0.2, 0.3, ...], 
            limit=20
        ),
        # 2. 关键词搜索 (Sparse/BM25)
        models.Prefetch(
            query=models.SparseVector(indices=[1, 5], values=[0.5, 0.3]),
            using="sparse",
            limit=20
        ),
    ],
    # 3. 使用 RRF (Reciprocal Rank Fusion) 合并结果
    query=models.FusionQuery(fusion=models.Fusion.RRF),
)

过滤策略:Pre-filtering vs Post-filtering

当你需要执行类似 查找 2024 年之后发布的、关于 AI 的文档 这样的查询时:

策略流程缺点
Post-filtering先做向量搜索,再过滤元数据可能导致返回结果不足(过滤后剩下的太少)
Pre-filtering先过滤元数据,再在子集中做向量搜索推荐做法,索引必须支持标量过滤

Chroma 快速入门

安装与基础使用

pip install chromadb
import chromadb
from chromadb.utils import embedding_functions

# 创建客户端
client = chromadb.Client()  # 内存模式
# client = chromadb.PersistentClient(path="./chroma_db")  # 持久化

# 创建集合(使用默认Embedding)
collection = client.create_collection(
    name="my_documents",
    metadata={"description": "文档集合"}
)

# 添加文档
collection.add(
    documents=[
        "机器学习是人工智能的一个分支",
        "深度学习使用神经网络进行学习",
        "向量数据库用于存储和检索向量",
        "RAG结合检索和生成提升回答质量"
    ],
    ids=["doc1", "doc2", "doc3", "doc4"],
    metadatas=[
        {"source": "wiki", "topic": "ML"},
        {"source": "wiki", "topic": "DL"},
        {"source": "blog", "topic": "DB"},
        {"source": "paper", "topic": "LLM"}
    ]
)

print(f"集合中有 {collection.count()} 个文档")

查询操作

# 语义搜索
results = collection.query(
    query_texts=["什么是神经网络"],
    n_results=3,
    include=["documents", "distances", "metadatas"]
)

print("搜索结果:")
for i, (doc, distance, metadata) in enumerate(zip(
    results['documents'][0],
    results['distances'][0],
    results['metadatas'][0]
)):
    print(f"{i+1}. [{distance:.4f}] {doc}")
    print(f"   元数据: {metadata}")

# 带过滤条件的查询
results = collection.query(
    query_texts=["人工智能技术"],
    n_results=2,
    where={"topic": "ML"}  # 元数据过滤
)

使用自定义Embedding

from sentence_transformers import SentenceTransformer

# 自定义Embedding函数
class CustomEmbeddingFunction:
    def __init__(self, model_name="BAAI/bge-large-zh-v1.5"):
        self.model = SentenceTransformer(model_name)
    
    def __call__(self, input):
        return self.model.encode(input).tolist()

# 创建使用自定义Embedding的集合
custom_ef = CustomEmbeddingFunction()
collection = client.create_collection(
    name="custom_embedding_collection",
    embedding_function=custom_ef
)

Milvus 生产级部署

Docker部署

# 单机版部署
docker run -d --name milvus \
  -p 19530:19530 \
  -p 9091:9091 \
  -v $(pwd)/milvus/db:/var/lib/milvus/db \
  milvusdb/milvus:latest \
  milvus run standalone

Python SDK使用

pip install pymilvus
from pymilvus import (
    connections, Collection, FieldSchema, 
    CollectionSchema, DataType, utility
)

# 连接Milvus
connections.connect(host="localhost", port="19530")

# 定义Schema
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
    FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=256),
]
schema = CollectionSchema(fields=fields, description="文档集合")

# 创建集合
collection_name = "documents"
if utility.has_collection(collection_name):
    utility.drop_collection(collection_name)

collection = Collection(name=collection_name, schema=schema)

# 创建索引
index_params = {
    "metric_type": "COSINE",
    "index_type": "IVF_FLAT",
    "params": {"nlist": 128}
}
collection.create_index(field_name="embedding", index_params=index_params)

print(f"集合 {collection_name} 创建成功")

数据操作

from sentence_transformers import SentenceTransformer
import numpy as np

# 加载Embedding模型
model = SentenceTransformer("BAAI/bge-large-zh-v1.5")

# 准备数据
texts = [
    "Python是一种流行的编程语言",
    "机器学习需要大量数据训练",
    "向量数据库支持相似性搜索",
    "RAG可以提升LLM的回答质量"
]
embeddings = model.encode(texts)
sources = ["doc", "doc", "blog", "paper"]

# 插入数据
data = [
    texts,
    embeddings.tolist(),
    sources
]
collection.insert([texts, embeddings.tolist(), sources])
collection.flush()

print(f"插入 {collection.num_entities} 条数据")

# 加载集合到内存
collection.load()

# 搜索
query = "如何进行语义搜索"
query_embedding = model.encode([query])

search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}
results = collection.search(
    data=query_embedding.tolist(),
    anns_field="embedding",
    param=search_params,
    limit=3,
    output_fields=["text", "source"]
)

print(f"\n查询: {query}")
print("搜索结果:")
for hits in results:
    for hit in hits:
        print(f"  [{hit.score:.4f}] {hit.entity.get('text')}")

FAISS 本地高性能检索

基础使用

pip install faiss-cpu  # CPU版本
# pip install faiss-gpu  # GPU版本
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer

# 准备数据
model = SentenceTransformer("BAAI/bge-large-zh-v1.5")
texts = [
    "机器学习基础概念",
    "深度学习神经网络",
    "自然语言处理技术",
    "计算机视觉应用",
    "强化学习算法"
]
embeddings = model.encode(texts).astype('float32')

# 创建索引
dimension = embeddings.shape[1]
index = faiss.IndexFlatIP(dimension)  # 内积索引

# 归一化向量(用于余弦相似度)
faiss.normalize_L2(embeddings)
index.add(embeddings)

print(f"索引中有 {index.ntotal} 个向量")

# 搜索
query = "什么是神经网络"
query_vec = model.encode([query]).astype('float32')
faiss.normalize_L2(query_vec)

k = 3
distances, indices = index.search(query_vec, k)

print(f"\n查询: {query}")
print("搜索结果:")
for i, (idx, dist) in enumerate(zip(indices[0], distances[0])):
    print(f"  {i+1}. [{dist:.4f}] {texts[idx]}")

高效索引类型

# IVF索引(适合大规模数据)
nlist = 100  # 聚类中心数量
quantizer = faiss.IndexFlatL2(dimension)
index_ivf = faiss.IndexIVFFlat(quantizer, dimension, nlist)
index_ivf.train(embeddings)  # 需要训练
index_ivf.add(embeddings)
index_ivf.nprobe = 10  # 搜索时探测的聚类数

# HNSW索引(高召回率)
index_hnsw = faiss.IndexHNSWFlat(dimension, 32)  # 32是图的连接数
index_hnsw.add(embeddings)

# PQ索引(压缩存储)
m = 8  # 子向量数量
index_pq = faiss.IndexPQ(dimension, m, 8)
index_pq.train(embeddings)
index_pq.add(embeddings)

保存和加载索引

# 保存索引
faiss.write_index(index, "my_index.faiss")

# 加载索引
loaded_index = faiss.read_index("my_index.faiss")

Qdrant 云原生向量数据库

Docker部署

docker run -p 6333:6333 -p 6334:6334 \
  -v $(pwd)/qdrant_storage:/qdrant/storage \
  qdrant/qdrant

Python使用

pip install qdrant-client
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from sentence_transformers import SentenceTransformer

# 连接Qdrant
client = QdrantClient(host="localhost", port=6333)

# 创建集合
collection_name = "documents"
client.recreate_collection(
    collection_name=collection_name,
    vectors_config=VectorParams(size=768, distance=Distance.COSINE)
)

# 准备数据
model = SentenceTransformer("BAAI/bge-large-zh-v1.5")
texts = [
    "Python编程语言入门",
    "机器学习算法详解",
    "深度学习实战项目"
]
embeddings = model.encode(texts)

# 插入数据
points = [
    PointStruct(
        id=i,
        vector=embedding.tolist(),
        payload={"text": text, "index": i}
    )
    for i, (text, embedding) in enumerate(zip(texts, embeddings))
]
client.upsert(collection_name=collection_name, points=points)

# 搜索
query = "如何学习编程"
query_vec = model.encode([query])[0]

results = client.search(
    collection_name=collection_name,
    query_vector=query_vec.tolist(),
    limit=3
)

print(f"查询: {query}")
for result in results:
    print(f"  [{result.score:.4f}] {result.payload['text']}")

向量数据库选型指南

需求推荐方案
快速原型Chroma
单机大规模FAISS
生产分布式Milvus / Qdrant
全托管服务Pinecone
混合搜索Weaviate

性能考量

因素说明
数据规模百万级以下Chroma足够,亿级需要Milvus
查询延迟HNSW索引延迟低,IVF索引可调节
内存占用PQ压缩可减少10x存储
更新频率频繁更新选择支持实时插入的数据库

常见问题

Q1: 向量数据库和传统数据库可以结合使用吗?

可以。常见模式是向量数据库存储向量和ID,元数据存储在传统数据库中。

Q2: 如何处理向量更新?

大多数向量数据库支持upsert操作,可以通过ID更新已有向量。

Q3: 索引构建需要多长时间?

取决于数据量和索引类型。IVF需要训练,HNSW构建较慢但查询快。

Q4: 如何评估检索质量?

  • Recall@K:Top-K中包含正确结果的比例
  • MRR:平均倒数排名
  • NDCG:归一化折损累积增益

生产环境考量

在将向量数据库投入生产时,需要关注以下维度:

1. 内存管理

向量索引(尤其是 HNSW)通常驻留在内存中。

  • 估算公式内存 ≈ 向量数量 * 维度 * 4 字节 * 1.5 (索引开销)
  • 优化:使用磁盘索引(如 Milvus 的 DiskANN)或量化技术(PQ/SQ)。

2. 数据一致性

  • 最终一致性:写入后可能需要几秒钟才能被检索到(常见于分布式系统)。
  • 强一致性:写入即刻可见,但会降低写入吞吐量。

3. 多租户隔离

如果你的应用服务于多个客户,必须确保数据隔离。

  • 方案 A:每个客户一个 Collection(适合客户少、数据多)。
  • 方案 B:使用 partition_key 或元数据过滤(适合客户多、数据少)。

总结

数据库特点推荐场景
Chroma简单易用开发测试
FAISS高性能库研究、单机
Milvus功能全面生产环境
Qdrant云原生K8s部署

参考资料

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:LLM应用开发——向量数据库

本文链接:https://www.sshipanoo.com/blog/ai/llm-app/向量数据库/

本文最后一次更新为 天前,文章中的某些内容可能已过时!