PostgreSQL+pgvector存储架构设计与优化

在前面的文章中,我们已经搭建了基础的运行环境。本篇将深入介绍如何设计和实现高效的数据库存储架构,包括关系型数据和向量数据的存储优化。

数据库架构设计

整体存储架构

graph TB
    subgraph "应用层"
        API[FastAPI应用]
        Worker[后台任务]
    end
    
    subgraph "缓存层"
        Redis[(Redis)]
        MemCache[内存缓存]
    end
    
    subgraph "数据存储层"
        PGMain[(PostgreSQL主库)]
        PGReplica[(PostgreSQL从库)]
        ObjectStore[(MinIO对象存储)]
    end
    
    subgraph "数据分类"
        StructuredData[结构化数据]
        VectorData[向量数据]
        BlobData[文件数据]
    end
    
    API --> Redis
    API --> PGMain
    Worker --> PGMain
    
    PGMain --> PGReplica
    PGMain --> ObjectStore
    
    StructuredData --> PGMain
    VectorData --> PGMain
    BlobData --> ObjectStore

数据分层存储策略

存储分层:
  热数据: # 频繁访问的数据
    - 最近30天的对话记录
    - 用户会话信息
    - 活跃用户的向量数据
    存储: PostgreSQL + Redis缓存
    
  温数据: # 中等频率访问
    - 历史文档向量
    - 用户历史对话
    - 系统配置信息
    存储: PostgreSQL主库
    
  冷数据: # 低频访问的数据
    - 原始文档文件
    - 日志文件
    - 备份数据
    存储: MinIO对象存储
    
  归档数据: # 长期保存
    - 历史审计日志
    - 已删除的文档备份
    存储: 压缩存储 + 定期清理

数据库表结构设计

核心业务表

1. 用户管理表

-- 用户基础信息表
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    full_name VARCHAR(100),
    department VARCHAR(50),
    role VARCHAR(20) DEFAULT 'user' CHECK (role IN ('admin', 'operator', 'user')),
    is_active BOOLEAN DEFAULT true,
    last_login TIMESTAMP,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 用户权限表
CREATE TABLE user_permissions (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
    resource VARCHAR(50) NOT NULL,  -- 资源类型:document, conversation, system
    action VARCHAR(20) NOT NULL,    -- 操作类型:read, write, delete, admin
    granted_by INTEGER REFERENCES users(id),
    granted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(user_id, resource, action)
);

-- 用户会话表
CREATE TABLE user_sessions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
    session_token VARCHAR(255) UNIQUE NOT NULL,
    expires_at TIMESTAMP NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    ip_address INET,
    user_agent TEXT
);

-- 创建索引
CREATE INDEX idx_users_username ON users(username);
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_active ON users(is_active);
CREATE INDEX idx_sessions_token ON user_sessions(session_token);
CREATE INDEX idx_sessions_user ON user_sessions(user_id);
CREATE INDEX idx_sessions_expires ON user_sessions(expires_at);

2. 文档管理表

-- 文档元数据表
CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    title VARCHAR(255) NOT NULL,
    content TEXT,
    file_path VARCHAR(500),
    file_size BIGINT,
    file_type VARCHAR(20),
    mime_type VARCHAR(100),
    source VARCHAR(100),  -- 来源:upload, web_crawl, api_sync, manual
    category VARCHAR(50),
    tags TEXT[],
    language VARCHAR(10) DEFAULT 'zh',
    status VARCHAR(20) DEFAULT 'active' CHECK (status IN ('active', 'processing', 'error', 'deleted')),
    embedding_model VARCHAR(50),
    chunk_strategy VARCHAR(20) DEFAULT 'recursive',
    metadata JSONB,
    created_by INTEGER REFERENCES users(id),
    updated_by INTEGER REFERENCES users(id),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    deleted_at TIMESTAMP
);

-- 文档分片表(向量存储)
CREATE TABLE document_chunks (
    id SERIAL PRIMARY KEY,
    document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
    chunk_index INTEGER NOT NULL,
    content TEXT NOT NULL,
    content_length INTEGER,
    overlap_tokens INTEGER DEFAULT 0,
    metadata JSONB,
    embedding vector(1536),  -- OpenAI text-embedding-ada-002 维度
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(document_id, chunk_index)
);

-- 文档版本表
CREATE TABLE document_versions (
    id SERIAL PRIMARY KEY,
    document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
    version_number INTEGER NOT NULL,
    title VARCHAR(255),
    content TEXT,
    change_summary TEXT,
    created_by INTEGER REFERENCES users(id),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(document_id, version_number)
);

-- 文档访问日志表
CREATE TABLE document_access_logs (
    id SERIAL PRIMARY KEY,
    document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
    user_id INTEGER REFERENCES users(id),
    access_type VARCHAR(20), -- read, search, download
    query_text TEXT,
    relevance_score FLOAT,
    access_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    ip_address INET,
    user_agent TEXT
);

-- 创建索引
CREATE INDEX idx_documents_title ON documents USING gin(to_tsvector('english', title));
CREATE INDEX idx_documents_content ON documents USING gin(to_tsvector('english', content));
CREATE INDEX idx_documents_category ON documents(category);
CREATE INDEX idx_documents_tags ON documents USING gin(tags);
CREATE INDEX idx_documents_status ON documents(status);
CREATE INDEX idx_documents_created ON documents(created_at);
CREATE INDEX idx_documents_updated ON documents(updated_at);

-- 向量相似度索引
CREATE INDEX idx_chunks_embedding ON document_chunks USING hnsw (embedding vector_cosine_ops);
CREATE INDEX idx_chunks_document ON document_chunks(document_id);
CREATE INDEX idx_chunks_length ON document_chunks(content_length);

-- 版本和日志索引
CREATE INDEX idx_versions_document ON document_versions(document_id, version_number);
CREATE INDEX idx_access_logs_document ON document_access_logs(document_id);
CREATE INDEX idx_access_logs_user ON document_access_logs(user_id);
CREATE INDEX idx_access_logs_time ON document_access_logs(access_time);

3. 对话管理表

-- 对话会话表
CREATE TABLE conversations (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
    title VARCHAR(200),
    system_prompt TEXT,
    model_name VARCHAR(50) DEFAULT 'gpt-3.5-turbo',
    temperature FLOAT DEFAULT 0.7,
    max_tokens INTEGER DEFAULT 2000,
    context_window INTEGER DEFAULT 4000,
    rag_enabled BOOLEAN DEFAULT true,
    rag_settings JSONB,
    status VARCHAR(20) DEFAULT 'active' CHECK (status IN ('active', 'archived', 'deleted')),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    last_message_at TIMESTAMP
);

-- 消息记录表
CREATE TABLE messages (
    id SERIAL PRIMARY KEY,
    conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE,
    role VARCHAR(10) NOT NULL CHECK (role IN ('user', 'assistant', 'system')),
    content TEXT NOT NULL,
    tokens_count INTEGER,
    model_name VARCHAR(50),
    finish_reason VARCHAR(20),
    retrieved_documents JSONB,  -- RAG检索到的文档信息
    tool_calls JSONB,           -- MCP工具调用信息
    metadata JSONB,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 对话评价表
CREATE TABLE conversation_ratings (
    id SERIAL PRIMARY KEY,
    conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE,
    message_id INTEGER REFERENCES messages(id) ON DELETE CASCADE,
    user_id INTEGER REFERENCES users(id),
    rating INTEGER CHECK (rating BETWEEN 1 AND 5),
    feedback TEXT,
    improvement_suggestions TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 创建索引
CREATE INDEX idx_conversations_user ON conversations(user_id);
CREATE INDEX idx_conversations_status ON conversations(status);
CREATE INDEX idx_conversations_updated ON conversations(updated_at);
CREATE INDEX idx_conversations_last_message ON conversations(last_message_at);

CREATE INDEX idx_messages_conversation ON messages(conversation_id);
CREATE INDEX idx_messages_created ON messages(created_at);
CREATE INDEX idx_messages_role ON messages(role);

CREATE INDEX idx_ratings_conversation ON conversation_ratings(conversation_id);
CREATE INDEX idx_ratings_user ON conversation_ratings(user_id);

4. 系统管理表

-- 系统配置表
CREATE TABLE system_config (
    id SERIAL PRIMARY KEY,
    config_key VARCHAR(50) UNIQUE NOT NULL,
    config_value TEXT,
    config_type VARCHAR(20) DEFAULT 'string',  -- string, integer, float, boolean, json
    description TEXT,
    is_public BOOLEAN DEFAULT false,  -- 是否对普通用户可见
    updated_by INTEGER REFERENCES users(id),
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 任务队列表
CREATE TABLE background_tasks (
    id SERIAL PRIMARY KEY,
    task_id VARCHAR(50) UNIQUE NOT NULL,
    task_type VARCHAR(50) NOT NULL,  -- document_process, model_train, data_sync
    status VARCHAR(20) DEFAULT 'pending' CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled')),
    priority INTEGER DEFAULT 0,
    payload JSONB,
    result JSONB,
    error_message TEXT,
    progress_percent INTEGER DEFAULT 0,
    created_by INTEGER REFERENCES users(id),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    retry_count INTEGER DEFAULT 0,
    max_retries INTEGER DEFAULT 3
);

-- 审计日志表
CREATE TABLE audit_logs (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    action VARCHAR(50) NOT NULL,  -- login, logout, create, update, delete
    resource_type VARCHAR(50),    -- document, conversation, user, system
    resource_id VARCHAR(50),
    old_values JSONB,
    new_values JSONB,
    ip_address INET,
    user_agent TEXT,
    success BOOLEAN DEFAULT true,
    error_message TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 创建索引
CREATE INDEX idx_config_key ON system_config(config_key);
CREATE INDEX idx_config_public ON system_config(is_public);

CREATE INDEX idx_tasks_status ON background_tasks(status);
CREATE INDEX idx_tasks_type ON background_tasks(task_type);
CREATE INDEX idx_tasks_priority ON background_tasks(priority DESC);
CREATE INDEX idx_tasks_created ON background_tasks(created_at);

CREATE INDEX idx_audit_user ON audit_logs(user_id);
CREATE INDEX idx_audit_action ON audit_logs(action);
CREATE INDEX idx_audit_resource ON audit_logs(resource_type, resource_id);
CREATE INDEX idx_audit_time ON audit_logs(created_at);

数据库函数和触发器

1. 自动更新时间戳

-- 创建更新时间戳函数
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = CURRENT_TIMESTAMP;
    RETURN NEW;
END;
$$ language 'plpgsql';

-- 为相关表添加触发器
CREATE TRIGGER update_users_updated_at BEFORE UPDATE ON users
    FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column();

CREATE TRIGGER update_documents_updated_at BEFORE UPDATE ON documents
    FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column();

CREATE TRIGGER update_conversations_updated_at BEFORE UPDATE ON conversations
    FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column();

2. 向量相似度搜索函数

-- 创建向量相似度搜索函数
CREATE OR REPLACE FUNCTION search_similar_documents(
    query_embedding vector(1536),
    similarity_threshold float DEFAULT 0.7,
    max_results integer DEFAULT 10,
    filter_category text DEFAULT NULL
)
RETURNS TABLE(
    document_id integer,
    chunk_id integer,
    similarity_score float,
    content text,
    document_title text,
    category text
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        d.id as document_id,
        dc.id as chunk_id,
        (1 - (dc.embedding <=> query_embedding)) as similarity_score,
        dc.content,
        d.title as document_title,
        d.category
    FROM document_chunks dc
    JOIN documents d ON dc.document_id = d.id
    WHERE 
        d.status = 'active'
        AND (filter_category IS NULL OR d.category = filter_category)
        AND (1 - (dc.embedding <=> query_embedding)) > similarity_threshold
    ORDER BY dc.embedding <=> query_embedding
    LIMIT max_results;
END;
$$ LANGUAGE plpgsql;

-- 创建混合检索函数(关键词+向量)
CREATE OR REPLACE FUNCTION hybrid_search(
    query_text text,
    query_embedding vector(1536),
    similarity_threshold float DEFAULT 0.7,
    max_results integer DEFAULT 10,
    keyword_weight float DEFAULT 0.3,
    vector_weight float DEFAULT 0.7
)
RETURNS TABLE(
    document_id integer,
    chunk_id integer,
    hybrid_score float,
    content text,
    document_title text,
    category text
) AS $$
BEGIN
    RETURN QUERY
    WITH keyword_scores AS (
        SELECT 
            d.id as document_id,
            dc.id as chunk_id,
            ts_rank(to_tsvector('english', dc.content), plainto_tsquery('english', query_text)) as keyword_score,
            dc.content,
            d.title as document_title,
            d.category
        FROM document_chunks dc
        JOIN documents d ON dc.document_id = d.id
        WHERE 
            d.status = 'active'
            AND to_tsvector('english', dc.content) @@ plainto_tsquery('english', query_text)
    ),
    vector_scores AS (
        SELECT 
            d.id as document_id,
            dc.id as chunk_id,
            (1 - (dc.embedding <=> query_embedding)) as vector_score,
            dc.content,
            d.title as document_title,
            d.category
        FROM document_chunks dc
        JOIN documents d ON dc.document_id = d.id
        WHERE 
            d.status = 'active'
            AND (1 - (dc.embedding <=> query_embedding)) > similarity_threshold
    )
    SELECT 
        COALESCE(ks.document_id, vs.document_id) as document_id,
        COALESCE(ks.chunk_id, vs.chunk_id) as chunk_id,
        (COALESCE(ks.keyword_score, 0) * keyword_weight + 
         COALESCE(vs.vector_score, 0) * vector_weight) as hybrid_score,
        COALESCE(ks.content, vs.content) as content,
        COALESCE(ks.document_title, vs.document_title) as document_title,
        COALESCE(ks.category, vs.category) as category
    FROM keyword_scores ks
    FULL OUTER JOIN vector_scores vs ON ks.chunk_id = vs.chunk_id
    ORDER BY hybrid_score DESC
    LIMIT max_results;
END;
$$ LANGUAGE plpgsql;

数据库连接管理

连接池配置

# database/connection.py
import asyncio
import logging
from typing import AsyncGenerator, Optional
from contextlib import asynccontextmanager

import asyncpg
from asyncpg import Pool
from pydantic import BaseSettings

logger = logging.getLogger(__name__)

class DatabaseConfig(BaseSettings):
    """数据库配置"""
    host: str = "localhost"
    port: int = 5432
    database: str = "rag_enterprise"
    username: str = "rag_user"
    password: str = "rag_password_2026"
    
    # 连接池配置
    min_pool_size: int = 5
    max_pool_size: int = 20
    max_queries_per_connection: int = 50000
    max_inactive_connection_lifetime: int = 300
    
    # 查询超时配置
    command_timeout: int = 30
    query_timeout: int = 60
    
    class Config:
        env_prefix = "DB_"

class DatabaseManager:
    """数据库连接管理器"""
    
    def __init__(self, config: DatabaseConfig):
        self.config = config
        self.pool: Optional[Pool] = None
        
    async def create_pool(self) -> Pool:
        """创建连接池"""
        if self.pool:
            return self.pool
            
        try:
            self.pool = await asyncpg.create_pool(
                host=self.config.host,
                port=self.config.port,
                database=self.config.database,
                user=self.config.username,
                password=self.config.password,
                min_size=self.config.min_pool_size,
                max_size=self.config.max_pool_size,
                max_queries=self.config.max_queries_per_connection,
                max_inactive_connection_lifetime=self.config.max_inactive_connection_lifetime,
                command_timeout=self.config.command_timeout,
                init=self._init_connection
            )
            
            logger.info(f"数据库连接池创建成功,大小: {self.config.min_pool_size}-{self.config.max_pool_size}")
            return self.pool
            
        except Exception as e:
            logger.error(f"创建数据库连接池失败: {e}")
            raise
    
    async def _init_connection(self, conn):
        """初始化数据库连接"""
        await conn.execute("SET application_name = 'rag_enterprise'")
        await conn.execute("SET timezone = 'Asia/Shanghai'")
        
    async def close_pool(self):
        """关闭连接池"""
        if self.pool:
            await self.pool.close()
            self.pool = None
            logger.info("数据库连接池已关闭")
    
    @asynccontextmanager
    async def get_connection(self) -> AsyncGenerator[asyncpg.Connection, None]:
        """获取数据库连接(上下文管理器)"""
        if not self.pool:
            await self.create_pool()
            
        async with self.pool.acquire() as connection:
            try:
                yield connection
            except Exception as e:
                logger.error(f"数据库操作错误: {e}")
                raise
    
    @asynccontextmanager
    async def get_transaction(self) -> AsyncGenerator[asyncpg.Connection, None]:
        """获取数据库事务(上下文管理器)"""
        async with self.get_connection() as conn:
            async with conn.transaction():
                yield conn
    
    async def execute_query(
        self, 
        query: str, 
        *args, 
        timeout: Optional[int] = None
    ) -> str:
        """执行查询并返回结果"""
        timeout = timeout or self.config.query_timeout
        
        async with self.get_connection() as conn:
            return await conn.fetch(query, *args, timeout=timeout)
    
    async def execute_one(
        self, 
        query: str, 
        *args, 
        timeout: Optional[int] = None
    ) -> Optional[asyncpg.Record]:
        """执行查询并返回单条结果"""
        timeout = timeout or self.config.query_timeout
        
        async with self.get_connection() as conn:
            return await conn.fetchrow(query, *args, timeout=timeout)
    
    async def execute_command(
        self, 
        command: str, 
        *args, 
        timeout: Optional[int] = None
    ) -> str:
        """执行命令(INSERT, UPDATE, DELETE)"""
        timeout = timeout or self.config.command_timeout
        
        async with self.get_connection() as conn:
            return await conn.execute(command, *args, timeout=timeout)

# 全局数据库管理器实例
db_config = DatabaseConfig()
db_manager = DatabaseManager(db_config)

数据访问层(DAL)

# database/models.py
from datetime import datetime
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
from enum import Enum
import asyncpg
import numpy as np

class UserRole(str, Enum):
    """用户角色枚举"""
    ADMIN = "admin"
    OPERATOR = "operator"  
    USER = "user"

class DocumentStatus(str, Enum):
    """文档状态枚举"""
    ACTIVE = "active"
    PROCESSING = "processing"
    ERROR = "error"
    DELETED = "deleted"

@dataclass
class User:
    """用户模型"""
    id: Optional[int] = None
    username: str = ""
    email: str = ""
    full_name: Optional[str] = None
    department: Optional[str] = None
    role: UserRole = UserRole.USER
    is_active: bool = True
    created_at: Optional[datetime] = None
    updated_at: Optional[datetime] = None

@dataclass
class Document:
    """文档模型"""
    id: Optional[int] = None
    title: str = ""
    content: Optional[str] = None
    file_path: Optional[str] = None
    file_size: Optional[int] = None
    file_type: Optional[str] = None
    source: str = "upload"
    category: Optional[str] = None
    tags: List[str] = None
    language: str = "zh"
    status: DocumentStatus = DocumentStatus.ACTIVE
    embedding_model: Optional[str] = None
    metadata: Optional[Dict[str, Any]] = None
    created_by: Optional[int] = None
    created_at: Optional[datetime] = None
    updated_at: Optional[datetime] = None

@dataclass
class DocumentChunk:
    """文档分片模型"""
    id: Optional[int] = None
    document_id: int = 0
    chunk_index: int = 0
    content: str = ""
    content_length: int = 0
    overlap_tokens: int = 0
    metadata: Optional[Dict[str, Any]] = None
    embedding: Optional[np.ndarray] = None
    created_at: Optional[datetime] = None

class UserRepository:
    """用户数据访问类"""
    
    def __init__(self, db_manager):
        self.db = db_manager
    
    async def create_user(self, user: User) -> int:
        """创建用户"""
        query = """
        INSERT INTO users (username, email, password_hash, full_name, department, role)
        VALUES ($1, $2, $3, $4, $5, $6)
        RETURNING id
        """
        
        async with self.db.get_connection() as conn:
            row = await conn.fetchrow(
                query, 
                user.username, 
                user.email, 
                "",  # password_hash需要在调用前处理
                user.full_name,
                user.department,
                user.role
            )
            return row['id']
    
    async def get_user_by_id(self, user_id: int) -> Optional[User]:
        """根据ID获取用户"""
        query = """
        SELECT id, username, email, full_name, department, role, is_active, created_at, updated_at
        FROM users WHERE id = $1 AND is_active = true
        """
        
        row = await self.db.execute_one(query, user_id)
        if row:
            return User(**dict(row))
        return None
    
    async def get_user_by_username(self, username: str) -> Optional[User]:
        """根据用户名获取用户"""
        query = """
        SELECT id, username, email, full_name, department, role, is_active, created_at, updated_at
        FROM users WHERE username = $1 AND is_active = true
        """
        
        row = await self.db.execute_one(query, username)
        if row:
            return User(**dict(row))
        return None
    
    async def list_users(
        self, 
        offset: int = 0, 
        limit: int = 50,
        role_filter: Optional[UserRole] = None
    ) -> List[User]:
        """获取用户列表"""
        base_query = """
        SELECT id, username, email, full_name, department, role, is_active, created_at, updated_at
        FROM users WHERE is_active = true
        """
        
        params = []
        if role_filter:
            base_query += " AND role = $1"
            params.append(role_filter)
        
        query = base_query + " ORDER BY created_at DESC LIMIT $2 OFFSET $3"
        params.extend([limit, offset])
        
        rows = await self.db.execute_query(query, *params)
        return [User(**dict(row)) for row in rows]

class DocumentRepository:
    """文档数据访问类"""
    
    def __init__(self, db_manager):
        self.db = db_manager
    
    async def create_document(self, document: Document) -> int:
        """创建文档"""
        query = """
        INSERT INTO documents (
            title, content, file_path, file_size, file_type, source,
            category, tags, language, embedding_model, metadata, created_by
        )
        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
        RETURNING id
        """
        
        async with self.db.get_connection() as conn:
            row = await conn.fetchrow(
                query,
                document.title,
                document.content,
                document.file_path,
                document.file_size,
                document.file_type,
                document.source,
                document.category,
                document.tags or [],
                document.language,
                document.embedding_model,
                document.metadata,
                document.created_by
            )
            return row['id']
    
    async def get_document_by_id(self, doc_id: int) -> Optional[Document]:
        """根据ID获取文档"""
        query = """
        SELECT * FROM documents WHERE id = $1 AND status != 'deleted'
        """
        
        row = await self.db.execute_one(query, doc_id)
        if row:
            doc_dict = dict(row)
            return Document(**doc_dict)
        return None
    
    async def search_documents(
        self,
        query_text: Optional[str] = None,
        category: Optional[str] = None,
        tags: Optional[List[str]] = None,
        offset: int = 0,
        limit: int = 20
    ) -> List[Document]:
        """搜索文档"""
        conditions = ["status = 'active'"]
        params = []
        param_count = 0
        
        if query_text:
            param_count += 1
            conditions.append(f"to_tsvector('english', title || ' ' || content) @@ plainto_tsquery('english', ${param_count})")
            params.append(query_text)
        
        if category:
            param_count += 1
            conditions.append(f"category = ${param_count}")
            params.append(category)
        
        if tags:
            param_count += 1
            conditions.append(f"tags && ${param_count}")
            params.append(tags)
        
        param_count += 1
        limit_param = f"${param_count}"
        params.append(limit)
        
        param_count += 1
        offset_param = f"${param_count}"
        params.append(offset)
        
        query = f"""
        SELECT * FROM documents 
        WHERE {' AND '.join(conditions)}
        ORDER BY updated_at DESC 
        LIMIT {limit_param} OFFSET {offset_param}
        """
        
        rows = await self.db.execute_query(query, *params)
        return [Document(**dict(row)) for row in rows]
    
    async def create_document_chunk(self, chunk: DocumentChunk) -> int:
        """创建文档分片"""
        query = """
        INSERT INTO document_chunks (
            document_id, chunk_index, content, content_length,
            overlap_tokens, metadata, embedding
        )
        VALUES ($1, $2, $3, $4, $5, $6, $7)
        RETURNING id
        """
        
        # 转换numpy数组为PostgreSQL向量
        embedding_vec = None
        if chunk.embedding is not None:
            embedding_vec = chunk.embedding.tolist()
        
        async with self.db.get_connection() as conn:
            row = await conn.fetchrow(
                query,
                chunk.document_id,
                chunk.chunk_index,
                chunk.content,
                chunk.content_length,
                chunk.overlap_tokens,
                chunk.metadata,
                embedding_vec
            )
            return row['id']
    
    async def vector_search(
        self,
        query_embedding: np.ndarray,
        similarity_threshold: float = 0.7,
        max_results: int = 10,
        category_filter: Optional[str] = None
    ) -> List[Dict[str, Any]]:
        """向量相似度搜索"""
        query = """
        SELECT * FROM search_similar_documents($1, $2, $3, $4)
        """
        
        embedding_list = query_embedding.tolist()
        rows = await self.db.execute_query(
            query,
            embedding_list,
            similarity_threshold,
            max_results,
            category_filter
        )
        
        return [dict(row) for row in rows]
    
    async def hybrid_search(
        self,
        query_text: str,
        query_embedding: np.ndarray,
        similarity_threshold: float = 0.7,
        max_results: int = 10,
        keyword_weight: float = 0.3,
        vector_weight: float = 0.7
    ) -> List[Dict[str, Any]]:
        """混合检索"""
        query = """
        SELECT * FROM hybrid_search($1, $2, $3, $4, $5, $6)
        """
        
        embedding_list = query_embedding.tolist()
        rows = await self.db.execute_query(
            query,
            query_text,
            embedding_list,
            similarity_threshold,
            max_results,
            keyword_weight,
            vector_weight
        )
        
        return [dict(row) for row in rows]

性能优化策略

1. 索引优化

-- 分析查询计划
EXPLAIN (ANALYZE, BUFFERS) 
SELECT * FROM search_similar_documents(
    '[0.1, 0.2, ...]'::vector, 
    0.7, 
    10, 
    'technical'
);

-- 优化向量索引参数
DROP INDEX IF EXISTS idx_chunks_embedding;
CREATE INDEX idx_chunks_embedding ON document_chunks 
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- 创建部分索引(只索引活跃文档)
CREATE INDEX idx_active_documents_embedding ON document_chunks 
USING hnsw (embedding vector_cosine_ops)
WHERE document_id IN (
    SELECT id FROM documents WHERE status = 'active'
);

-- 复合索引优化查询
CREATE INDEX idx_documents_category_status_updated ON documents(category, status, updated_at DESC);
CREATE INDEX idx_chunks_document_length ON document_chunks(document_id, content_length);

2. 查询优化

-- 创建物化视图加速常用查询
CREATE MATERIALIZED VIEW popular_documents AS
SELECT 
    d.id,
    d.title,
    d.category,
    COUNT(dal.id) as access_count,
    AVG(dal.relevance_score) as avg_relevance
FROM documents d
LEFT JOIN document_access_logs dal ON d.id = dal.document_id
WHERE d.status = 'active' 
    AND dal.access_time >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY d.id, d.title, d.category
ORDER BY access_count DESC;

-- 创建唯一索引
CREATE UNIQUE INDEX idx_popular_documents_id ON popular_documents(id);

-- 定期刷新物化视图
CREATE OR REPLACE FUNCTION refresh_popular_documents()
RETURNS void AS $$
BEGIN
    REFRESH MATERIALIZED VIEW CONCURRENTLY popular_documents;
END;
$$ LANGUAGE plpgsql;

-- 创建定时任务
SELECT cron.schedule('refresh-popular-docs', '0 2 * * *', 'SELECT refresh_popular_documents();');

3. 数据分区

-- 按时间分区消息表
CREATE TABLE messages_partitioned (
    id SERIAL,
    conversation_id UUID NOT NULL,
    role VARCHAR(10) NOT NULL,
    content TEXT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) PARTITION BY RANGE (created_at);

-- 创建分区
CREATE TABLE messages_2026_q1 PARTITION OF messages_partitioned
    FOR VALUES FROM ('2026-01-01') TO ('2026-04-01');

CREATE TABLE messages_2026_q2 PARTITION OF messages_partitioned
    FOR VALUES FROM ('2026-04-01') TO ('2026-07-01');

-- 自动创建分区的函数
CREATE OR REPLACE FUNCTION create_monthly_partition(table_name text, start_date date)
RETURNS void AS $$
DECLARE
    partition_name text;
    end_date date;
BEGIN
    partition_name := table_name || '_' || to_char(start_date, 'YYYY_MM');
    end_date := start_date + INTERVAL '1 month';
    
    EXECUTE format('CREATE TABLE %I PARTITION OF %I FOR VALUES FROM (%L) TO (%L)',
                   partition_name, table_name, start_date, end_date);
    
    EXECUTE format('CREATE INDEX %I ON %I (created_at)', 
                   partition_name || '_created_at_idx', partition_name);
END;
$$ LANGUAGE plpgsql;

在本篇文章中,我们详细设计了企业级RAG应用的数据库架构,包括完整的表结构、索引策略、查询优化和性能调优。在下一篇文章中,我们将介绍如何部署和配置VLLM推理引擎,以及模型微调的具体实现。

下一篇预告模型服务与推理 - 深入介绍VLLM部署配置、模型微调流水线、推理优化和GPU资源管理。

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:企业级RAG应用系列(3):数据库与向量存储

本文链接:https://www.sshipanoo.com/blog/ai/企业级RAG应用系列-03-数据库设计/

本文最后一次更新为 天前,文章中的某些内容可能已过时!