已经是最新一篇文章了!
已经是最后一篇文章了!
PostgreSQL+pgvector存储架构设计与优化
在前面的文章中,我们已经搭建了基础的运行环境。本篇将深入介绍如何设计和实现高效的数据库存储架构,包括关系型数据和向量数据的存储优化。
数据库架构设计
整体存储架构
graph TB
subgraph "应用层"
API[FastAPI应用]
Worker[后台任务]
end
subgraph "缓存层"
Redis[(Redis)]
MemCache[内存缓存]
end
subgraph "数据存储层"
PGMain[(PostgreSQL主库)]
PGReplica[(PostgreSQL从库)]
ObjectStore[(MinIO对象存储)]
end
subgraph "数据分类"
StructuredData[结构化数据]
VectorData[向量数据]
BlobData[文件数据]
end
API --> Redis
API --> PGMain
Worker --> PGMain
PGMain --> PGReplica
PGMain --> ObjectStore
StructuredData --> PGMain
VectorData --> PGMain
BlobData --> ObjectStore数据分层存储策略
存储分层:
热数据: # 频繁访问的数据
- 最近30天的对话记录
- 用户会话信息
- 活跃用户的向量数据
存储: PostgreSQL + Redis缓存
温数据: # 中等频率访问
- 历史文档向量
- 用户历史对话
- 系统配置信息
存储: PostgreSQL主库
冷数据: # 低频访问的数据
- 原始文档文件
- 日志文件
- 备份数据
存储: MinIO对象存储
归档数据: # 长期保存
- 历史审计日志
- 已删除的文档备份
存储: 压缩存储 + 定期清理数据库表结构设计
核心业务表
1. 用户管理表
-- 用户基础信息表
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
full_name VARCHAR(100),
department VARCHAR(50),
role VARCHAR(20) DEFAULT 'user' CHECK (role IN ('admin', 'operator', 'user')),
is_active BOOLEAN DEFAULT true,
last_login TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 用户权限表
CREATE TABLE user_permissions (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
resource VARCHAR(50) NOT NULL, -- 资源类型:document, conversation, system
action VARCHAR(20) NOT NULL, -- 操作类型:read, write, delete, admin
granted_by INTEGER REFERENCES users(id),
granted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, resource, action)
);
-- 用户会话表
CREATE TABLE user_sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
session_token VARCHAR(255) UNIQUE NOT NULL,
expires_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ip_address INET,
user_agent TEXT
);
-- 创建索引
CREATE INDEX idx_users_username ON users(username);
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_active ON users(is_active);
CREATE INDEX idx_sessions_token ON user_sessions(session_token);
CREATE INDEX idx_sessions_user ON user_sessions(user_id);
CREATE INDEX idx_sessions_expires ON user_sessions(expires_at);2. 文档管理表
-- 文档元数据表
CREATE TABLE documents (
id SERIAL PRIMARY KEY,
title VARCHAR(255) NOT NULL,
content TEXT,
file_path VARCHAR(500),
file_size BIGINT,
file_type VARCHAR(20),
mime_type VARCHAR(100),
source VARCHAR(100), -- 来源:upload, web_crawl, api_sync, manual
category VARCHAR(50),
tags TEXT[],
language VARCHAR(10) DEFAULT 'zh',
status VARCHAR(20) DEFAULT 'active' CHECK (status IN ('active', 'processing', 'error', 'deleted')),
embedding_model VARCHAR(50),
chunk_strategy VARCHAR(20) DEFAULT 'recursive',
metadata JSONB,
created_by INTEGER REFERENCES users(id),
updated_by INTEGER REFERENCES users(id),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP
);
-- 文档分片表(向量存储)
CREATE TABLE document_chunks (
id SERIAL PRIMARY KEY,
document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
content_length INTEGER,
overlap_tokens INTEGER DEFAULT 0,
metadata JSONB,
embedding vector(1536), -- OpenAI text-embedding-ada-002 维度
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(document_id, chunk_index)
);
-- 文档版本表
CREATE TABLE document_versions (
id SERIAL PRIMARY KEY,
document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
version_number INTEGER NOT NULL,
title VARCHAR(255),
content TEXT,
change_summary TEXT,
created_by INTEGER REFERENCES users(id),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(document_id, version_number)
);
-- 文档访问日志表
CREATE TABLE document_access_logs (
id SERIAL PRIMARY KEY,
document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
user_id INTEGER REFERENCES users(id),
access_type VARCHAR(20), -- read, search, download
query_text TEXT,
relevance_score FLOAT,
access_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ip_address INET,
user_agent TEXT
);
-- 创建索引
CREATE INDEX idx_documents_title ON documents USING gin(to_tsvector('english', title));
CREATE INDEX idx_documents_content ON documents USING gin(to_tsvector('english', content));
CREATE INDEX idx_documents_category ON documents(category);
CREATE INDEX idx_documents_tags ON documents USING gin(tags);
CREATE INDEX idx_documents_status ON documents(status);
CREATE INDEX idx_documents_created ON documents(created_at);
CREATE INDEX idx_documents_updated ON documents(updated_at);
-- 向量相似度索引
CREATE INDEX idx_chunks_embedding ON document_chunks USING hnsw (embedding vector_cosine_ops);
CREATE INDEX idx_chunks_document ON document_chunks(document_id);
CREATE INDEX idx_chunks_length ON document_chunks(content_length);
-- 版本和日志索引
CREATE INDEX idx_versions_document ON document_versions(document_id, version_number);
CREATE INDEX idx_access_logs_document ON document_access_logs(document_id);
CREATE INDEX idx_access_logs_user ON document_access_logs(user_id);
CREATE INDEX idx_access_logs_time ON document_access_logs(access_time);3. 对话管理表
-- 对话会话表
CREATE TABLE conversations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
title VARCHAR(200),
system_prompt TEXT,
model_name VARCHAR(50) DEFAULT 'gpt-3.5-turbo',
temperature FLOAT DEFAULT 0.7,
max_tokens INTEGER DEFAULT 2000,
context_window INTEGER DEFAULT 4000,
rag_enabled BOOLEAN DEFAULT true,
rag_settings JSONB,
status VARCHAR(20) DEFAULT 'active' CHECK (status IN ('active', 'archived', 'deleted')),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_message_at TIMESTAMP
);
-- 消息记录表
CREATE TABLE messages (
id SERIAL PRIMARY KEY,
conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE,
role VARCHAR(10) NOT NULL CHECK (role IN ('user', 'assistant', 'system')),
content TEXT NOT NULL,
tokens_count INTEGER,
model_name VARCHAR(50),
finish_reason VARCHAR(20),
retrieved_documents JSONB, -- RAG检索到的文档信息
tool_calls JSONB, -- MCP工具调用信息
metadata JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 对话评价表
CREATE TABLE conversation_ratings (
id SERIAL PRIMARY KEY,
conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE,
message_id INTEGER REFERENCES messages(id) ON DELETE CASCADE,
user_id INTEGER REFERENCES users(id),
rating INTEGER CHECK (rating BETWEEN 1 AND 5),
feedback TEXT,
improvement_suggestions TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 创建索引
CREATE INDEX idx_conversations_user ON conversations(user_id);
CREATE INDEX idx_conversations_status ON conversations(status);
CREATE INDEX idx_conversations_updated ON conversations(updated_at);
CREATE INDEX idx_conversations_last_message ON conversations(last_message_at);
CREATE INDEX idx_messages_conversation ON messages(conversation_id);
CREATE INDEX idx_messages_created ON messages(created_at);
CREATE INDEX idx_messages_role ON messages(role);
CREATE INDEX idx_ratings_conversation ON conversation_ratings(conversation_id);
CREATE INDEX idx_ratings_user ON conversation_ratings(user_id);4. 系统管理表
-- 系统配置表
CREATE TABLE system_config (
id SERIAL PRIMARY KEY,
config_key VARCHAR(50) UNIQUE NOT NULL,
config_value TEXT,
config_type VARCHAR(20) DEFAULT 'string', -- string, integer, float, boolean, json
description TEXT,
is_public BOOLEAN DEFAULT false, -- 是否对普通用户可见
updated_by INTEGER REFERENCES users(id),
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 任务队列表
CREATE TABLE background_tasks (
id SERIAL PRIMARY KEY,
task_id VARCHAR(50) UNIQUE NOT NULL,
task_type VARCHAR(50) NOT NULL, -- document_process, model_train, data_sync
status VARCHAR(20) DEFAULT 'pending' CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled')),
priority INTEGER DEFAULT 0,
payload JSONB,
result JSONB,
error_message TEXT,
progress_percent INTEGER DEFAULT 0,
created_by INTEGER REFERENCES users(id),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3
);
-- 审计日志表
CREATE TABLE audit_logs (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
action VARCHAR(50) NOT NULL, -- login, logout, create, update, delete
resource_type VARCHAR(50), -- document, conversation, user, system
resource_id VARCHAR(50),
old_values JSONB,
new_values JSONB,
ip_address INET,
user_agent TEXT,
success BOOLEAN DEFAULT true,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 创建索引
CREATE INDEX idx_config_key ON system_config(config_key);
CREATE INDEX idx_config_public ON system_config(is_public);
CREATE INDEX idx_tasks_status ON background_tasks(status);
CREATE INDEX idx_tasks_type ON background_tasks(task_type);
CREATE INDEX idx_tasks_priority ON background_tasks(priority DESC);
CREATE INDEX idx_tasks_created ON background_tasks(created_at);
CREATE INDEX idx_audit_user ON audit_logs(user_id);
CREATE INDEX idx_audit_action ON audit_logs(action);
CREATE INDEX idx_audit_resource ON audit_logs(resource_type, resource_id);
CREATE INDEX idx_audit_time ON audit_logs(created_at);数据库函数和触发器
1. 自动更新时间戳
-- 创建更新时间戳函数
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ language 'plpgsql';
-- 为相关表添加触发器
CREATE TRIGGER update_users_updated_at BEFORE UPDATE ON users
FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column();
CREATE TRIGGER update_documents_updated_at BEFORE UPDATE ON documents
FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column();
CREATE TRIGGER update_conversations_updated_at BEFORE UPDATE ON conversations
FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column();2. 向量相似度搜索函数
-- 创建向量相似度搜索函数
CREATE OR REPLACE FUNCTION search_similar_documents(
query_embedding vector(1536),
similarity_threshold float DEFAULT 0.7,
max_results integer DEFAULT 10,
filter_category text DEFAULT NULL
)
RETURNS TABLE(
document_id integer,
chunk_id integer,
similarity_score float,
content text,
document_title text,
category text
) AS $$
BEGIN
RETURN QUERY
SELECT
d.id as document_id,
dc.id as chunk_id,
(1 - (dc.embedding <=> query_embedding)) as similarity_score,
dc.content,
d.title as document_title,
d.category
FROM document_chunks dc
JOIN documents d ON dc.document_id = d.id
WHERE
d.status = 'active'
AND (filter_category IS NULL OR d.category = filter_category)
AND (1 - (dc.embedding <=> query_embedding)) > similarity_threshold
ORDER BY dc.embedding <=> query_embedding
LIMIT max_results;
END;
$$ LANGUAGE plpgsql;
-- 创建混合检索函数(关键词+向量)
CREATE OR REPLACE FUNCTION hybrid_search(
query_text text,
query_embedding vector(1536),
similarity_threshold float DEFAULT 0.7,
max_results integer DEFAULT 10,
keyword_weight float DEFAULT 0.3,
vector_weight float DEFAULT 0.7
)
RETURNS TABLE(
document_id integer,
chunk_id integer,
hybrid_score float,
content text,
document_title text,
category text
) AS $$
BEGIN
RETURN QUERY
WITH keyword_scores AS (
SELECT
d.id as document_id,
dc.id as chunk_id,
ts_rank(to_tsvector('english', dc.content), plainto_tsquery('english', query_text)) as keyword_score,
dc.content,
d.title as document_title,
d.category
FROM document_chunks dc
JOIN documents d ON dc.document_id = d.id
WHERE
d.status = 'active'
AND to_tsvector('english', dc.content) @@ plainto_tsquery('english', query_text)
),
vector_scores AS (
SELECT
d.id as document_id,
dc.id as chunk_id,
(1 - (dc.embedding <=> query_embedding)) as vector_score,
dc.content,
d.title as document_title,
d.category
FROM document_chunks dc
JOIN documents d ON dc.document_id = d.id
WHERE
d.status = 'active'
AND (1 - (dc.embedding <=> query_embedding)) > similarity_threshold
)
SELECT
COALESCE(ks.document_id, vs.document_id) as document_id,
COALESCE(ks.chunk_id, vs.chunk_id) as chunk_id,
(COALESCE(ks.keyword_score, 0) * keyword_weight +
COALESCE(vs.vector_score, 0) * vector_weight) as hybrid_score,
COALESCE(ks.content, vs.content) as content,
COALESCE(ks.document_title, vs.document_title) as document_title,
COALESCE(ks.category, vs.category) as category
FROM keyword_scores ks
FULL OUTER JOIN vector_scores vs ON ks.chunk_id = vs.chunk_id
ORDER BY hybrid_score DESC
LIMIT max_results;
END;
$$ LANGUAGE plpgsql;数据库连接管理
连接池配置
# database/connection.py
import asyncio
import logging
from typing import AsyncGenerator, Optional
from contextlib import asynccontextmanager
import asyncpg
from asyncpg import Pool
from pydantic import BaseSettings
logger = logging.getLogger(__name__)
class DatabaseConfig(BaseSettings):
"""数据库配置"""
host: str = "localhost"
port: int = 5432
database: str = "rag_enterprise"
username: str = "rag_user"
password: str = "rag_password_2026"
# 连接池配置
min_pool_size: int = 5
max_pool_size: int = 20
max_queries_per_connection: int = 50000
max_inactive_connection_lifetime: int = 300
# 查询超时配置
command_timeout: int = 30
query_timeout: int = 60
class Config:
env_prefix = "DB_"
class DatabaseManager:
"""数据库连接管理器"""
def __init__(self, config: DatabaseConfig):
self.config = config
self.pool: Optional[Pool] = None
async def create_pool(self) -> Pool:
"""创建连接池"""
if self.pool:
return self.pool
try:
self.pool = await asyncpg.create_pool(
host=self.config.host,
port=self.config.port,
database=self.config.database,
user=self.config.username,
password=self.config.password,
min_size=self.config.min_pool_size,
max_size=self.config.max_pool_size,
max_queries=self.config.max_queries_per_connection,
max_inactive_connection_lifetime=self.config.max_inactive_connection_lifetime,
command_timeout=self.config.command_timeout,
init=self._init_connection
)
logger.info(f"数据库连接池创建成功,大小: {self.config.min_pool_size}-{self.config.max_pool_size}")
return self.pool
except Exception as e:
logger.error(f"创建数据库连接池失败: {e}")
raise
async def _init_connection(self, conn):
"""初始化数据库连接"""
await conn.execute("SET application_name = 'rag_enterprise'")
await conn.execute("SET timezone = 'Asia/Shanghai'")
async def close_pool(self):
"""关闭连接池"""
if self.pool:
await self.pool.close()
self.pool = None
logger.info("数据库连接池已关闭")
@asynccontextmanager
async def get_connection(self) -> AsyncGenerator[asyncpg.Connection, None]:
"""获取数据库连接(上下文管理器)"""
if not self.pool:
await self.create_pool()
async with self.pool.acquire() as connection:
try:
yield connection
except Exception as e:
logger.error(f"数据库操作错误: {e}")
raise
@asynccontextmanager
async def get_transaction(self) -> AsyncGenerator[asyncpg.Connection, None]:
"""获取数据库事务(上下文管理器)"""
async with self.get_connection() as conn:
async with conn.transaction():
yield conn
async def execute_query(
self,
query: str,
*args,
timeout: Optional[int] = None
) -> str:
"""执行查询并返回结果"""
timeout = timeout or self.config.query_timeout
async with self.get_connection() as conn:
return await conn.fetch(query, *args, timeout=timeout)
async def execute_one(
self,
query: str,
*args,
timeout: Optional[int] = None
) -> Optional[asyncpg.Record]:
"""执行查询并返回单条结果"""
timeout = timeout or self.config.query_timeout
async with self.get_connection() as conn:
return await conn.fetchrow(query, *args, timeout=timeout)
async def execute_command(
self,
command: str,
*args,
timeout: Optional[int] = None
) -> str:
"""执行命令(INSERT, UPDATE, DELETE)"""
timeout = timeout or self.config.command_timeout
async with self.get_connection() as conn:
return await conn.execute(command, *args, timeout=timeout)
# 全局数据库管理器实例
db_config = DatabaseConfig()
db_manager = DatabaseManager(db_config)数据访问层(DAL)
# database/models.py
from datetime import datetime
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
from enum import Enum
import asyncpg
import numpy as np
class UserRole(str, Enum):
"""用户角色枚举"""
ADMIN = "admin"
OPERATOR = "operator"
USER = "user"
class DocumentStatus(str, Enum):
"""文档状态枚举"""
ACTIVE = "active"
PROCESSING = "processing"
ERROR = "error"
DELETED = "deleted"
@dataclass
class User:
"""用户模型"""
id: Optional[int] = None
username: str = ""
email: str = ""
full_name: Optional[str] = None
department: Optional[str] = None
role: UserRole = UserRole.USER
is_active: bool = True
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
@dataclass
class Document:
"""文档模型"""
id: Optional[int] = None
title: str = ""
content: Optional[str] = None
file_path: Optional[str] = None
file_size: Optional[int] = None
file_type: Optional[str] = None
source: str = "upload"
category: Optional[str] = None
tags: List[str] = None
language: str = "zh"
status: DocumentStatus = DocumentStatus.ACTIVE
embedding_model: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
created_by: Optional[int] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
@dataclass
class DocumentChunk:
"""文档分片模型"""
id: Optional[int] = None
document_id: int = 0
chunk_index: int = 0
content: str = ""
content_length: int = 0
overlap_tokens: int = 0
metadata: Optional[Dict[str, Any]] = None
embedding: Optional[np.ndarray] = None
created_at: Optional[datetime] = None
class UserRepository:
"""用户数据访问类"""
def __init__(self, db_manager):
self.db = db_manager
async def create_user(self, user: User) -> int:
"""创建用户"""
query = """
INSERT INTO users (username, email, password_hash, full_name, department, role)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id
"""
async with self.db.get_connection() as conn:
row = await conn.fetchrow(
query,
user.username,
user.email,
"", # password_hash需要在调用前处理
user.full_name,
user.department,
user.role
)
return row['id']
async def get_user_by_id(self, user_id: int) -> Optional[User]:
"""根据ID获取用户"""
query = """
SELECT id, username, email, full_name, department, role, is_active, created_at, updated_at
FROM users WHERE id = $1 AND is_active = true
"""
row = await self.db.execute_one(query, user_id)
if row:
return User(**dict(row))
return None
async def get_user_by_username(self, username: str) -> Optional[User]:
"""根据用户名获取用户"""
query = """
SELECT id, username, email, full_name, department, role, is_active, created_at, updated_at
FROM users WHERE username = $1 AND is_active = true
"""
row = await self.db.execute_one(query, username)
if row:
return User(**dict(row))
return None
async def list_users(
self,
offset: int = 0,
limit: int = 50,
role_filter: Optional[UserRole] = None
) -> List[User]:
"""获取用户列表"""
base_query = """
SELECT id, username, email, full_name, department, role, is_active, created_at, updated_at
FROM users WHERE is_active = true
"""
params = []
if role_filter:
base_query += " AND role = $1"
params.append(role_filter)
query = base_query + " ORDER BY created_at DESC LIMIT $2 OFFSET $3"
params.extend([limit, offset])
rows = await self.db.execute_query(query, *params)
return [User(**dict(row)) for row in rows]
class DocumentRepository:
"""文档数据访问类"""
def __init__(self, db_manager):
self.db = db_manager
async def create_document(self, document: Document) -> int:
"""创建文档"""
query = """
INSERT INTO documents (
title, content, file_path, file_size, file_type, source,
category, tags, language, embedding_model, metadata, created_by
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
RETURNING id
"""
async with self.db.get_connection() as conn:
row = await conn.fetchrow(
query,
document.title,
document.content,
document.file_path,
document.file_size,
document.file_type,
document.source,
document.category,
document.tags or [],
document.language,
document.embedding_model,
document.metadata,
document.created_by
)
return row['id']
async def get_document_by_id(self, doc_id: int) -> Optional[Document]:
"""根据ID获取文档"""
query = """
SELECT * FROM documents WHERE id = $1 AND status != 'deleted'
"""
row = await self.db.execute_one(query, doc_id)
if row:
doc_dict = dict(row)
return Document(**doc_dict)
return None
async def search_documents(
self,
query_text: Optional[str] = None,
category: Optional[str] = None,
tags: Optional[List[str]] = None,
offset: int = 0,
limit: int = 20
) -> List[Document]:
"""搜索文档"""
conditions = ["status = 'active'"]
params = []
param_count = 0
if query_text:
param_count += 1
conditions.append(f"to_tsvector('english', title || ' ' || content) @@ plainto_tsquery('english', ${param_count})")
params.append(query_text)
if category:
param_count += 1
conditions.append(f"category = ${param_count}")
params.append(category)
if tags:
param_count += 1
conditions.append(f"tags && ${param_count}")
params.append(tags)
param_count += 1
limit_param = f"${param_count}"
params.append(limit)
param_count += 1
offset_param = f"${param_count}"
params.append(offset)
query = f"""
SELECT * FROM documents
WHERE {' AND '.join(conditions)}
ORDER BY updated_at DESC
LIMIT {limit_param} OFFSET {offset_param}
"""
rows = await self.db.execute_query(query, *params)
return [Document(**dict(row)) for row in rows]
async def create_document_chunk(self, chunk: DocumentChunk) -> int:
"""创建文档分片"""
query = """
INSERT INTO document_chunks (
document_id, chunk_index, content, content_length,
overlap_tokens, metadata, embedding
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id
"""
# 转换numpy数组为PostgreSQL向量
embedding_vec = None
if chunk.embedding is not None:
embedding_vec = chunk.embedding.tolist()
async with self.db.get_connection() as conn:
row = await conn.fetchrow(
query,
chunk.document_id,
chunk.chunk_index,
chunk.content,
chunk.content_length,
chunk.overlap_tokens,
chunk.metadata,
embedding_vec
)
return row['id']
async def vector_search(
self,
query_embedding: np.ndarray,
similarity_threshold: float = 0.7,
max_results: int = 10,
category_filter: Optional[str] = None
) -> List[Dict[str, Any]]:
"""向量相似度搜索"""
query = """
SELECT * FROM search_similar_documents($1, $2, $3, $4)
"""
embedding_list = query_embedding.tolist()
rows = await self.db.execute_query(
query,
embedding_list,
similarity_threshold,
max_results,
category_filter
)
return [dict(row) for row in rows]
async def hybrid_search(
self,
query_text: str,
query_embedding: np.ndarray,
similarity_threshold: float = 0.7,
max_results: int = 10,
keyword_weight: float = 0.3,
vector_weight: float = 0.7
) -> List[Dict[str, Any]]:
"""混合检索"""
query = """
SELECT * FROM hybrid_search($1, $2, $3, $4, $5, $6)
"""
embedding_list = query_embedding.tolist()
rows = await self.db.execute_query(
query,
query_text,
embedding_list,
similarity_threshold,
max_results,
keyword_weight,
vector_weight
)
return [dict(row) for row in rows]性能优化策略
1. 索引优化
-- 分析查询计划
EXPLAIN (ANALYZE, BUFFERS)
SELECT * FROM search_similar_documents(
'[0.1, 0.2, ...]'::vector,
0.7,
10,
'technical'
);
-- 优化向量索引参数
DROP INDEX IF EXISTS idx_chunks_embedding;
CREATE INDEX idx_chunks_embedding ON document_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- 创建部分索引(只索引活跃文档)
CREATE INDEX idx_active_documents_embedding ON document_chunks
USING hnsw (embedding vector_cosine_ops)
WHERE document_id IN (
SELECT id FROM documents WHERE status = 'active'
);
-- 复合索引优化查询
CREATE INDEX idx_documents_category_status_updated ON documents(category, status, updated_at DESC);
CREATE INDEX idx_chunks_document_length ON document_chunks(document_id, content_length);2. 查询优化
-- 创建物化视图加速常用查询
CREATE MATERIALIZED VIEW popular_documents AS
SELECT
d.id,
d.title,
d.category,
COUNT(dal.id) as access_count,
AVG(dal.relevance_score) as avg_relevance
FROM documents d
LEFT JOIN document_access_logs dal ON d.id = dal.document_id
WHERE d.status = 'active'
AND dal.access_time >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY d.id, d.title, d.category
ORDER BY access_count DESC;
-- 创建唯一索引
CREATE UNIQUE INDEX idx_popular_documents_id ON popular_documents(id);
-- 定期刷新物化视图
CREATE OR REPLACE FUNCTION refresh_popular_documents()
RETURNS void AS $$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY popular_documents;
END;
$$ LANGUAGE plpgsql;
-- 创建定时任务
SELECT cron.schedule('refresh-popular-docs', '0 2 * * *', 'SELECT refresh_popular_documents();');3. 数据分区
-- 按时间分区消息表
CREATE TABLE messages_partitioned (
id SERIAL,
conversation_id UUID NOT NULL,
role VARCHAR(10) NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) PARTITION BY RANGE (created_at);
-- 创建分区
CREATE TABLE messages_2026_q1 PARTITION OF messages_partitioned
FOR VALUES FROM ('2026-01-01') TO ('2026-04-01');
CREATE TABLE messages_2026_q2 PARTITION OF messages_partitioned
FOR VALUES FROM ('2026-04-01') TO ('2026-07-01');
-- 自动创建分区的函数
CREATE OR REPLACE FUNCTION create_monthly_partition(table_name text, start_date date)
RETURNS void AS $$
DECLARE
partition_name text;
end_date date;
BEGIN
partition_name := table_name || '_' || to_char(start_date, 'YYYY_MM');
end_date := start_date + INTERVAL '1 month';
EXECUTE format('CREATE TABLE %I PARTITION OF %I FOR VALUES FROM (%L) TO (%L)',
partition_name, table_name, start_date, end_date);
EXECUTE format('CREATE INDEX %I ON %I (created_at)',
partition_name || '_created_at_idx', partition_name);
END;
$$ LANGUAGE plpgsql;
在本篇文章中,我们详细设计了企业级RAG应用的数据库架构,包括完整的表结构、索引策略、查询优化和性能调优。在下一篇文章中,我们将介绍如何部署和配置VLLM推理引擎,以及模型微调的具体实现。
下一篇预告:模型服务与推理 - 深入介绍VLLM部署配置、模型微调流水线、推理优化和GPU资源管理。
版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。
(采用 CC BY-NC-SA 4.0 许可协议进行授权)
本文标题:企业级RAG应用系列(3):数据库与向量存储
本文链接:https://www.sshipanoo.com/blog/ai/企业级RAG应用系列-03-数据库设计/
本文最后一次更新为 天前,文章中的某些内容可能已过时!