智能调度实现成本与性能平衡

前言

不同的 LLM 在成本、速度和能力上各有优劣。通过智能路由和多模型协作,可以在保证质量的同时优化成本和响应速度。本文将介绍多模型路由的设计和实现方法。


多模型路由概述

为什么需要多模型路由

┌─────────────────────────────────────────────────────────────────┐
│                     多模型路由价值                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  成本优化 ──────────────────────────────────────────────────   │
│  │ • 简单任务用便宜模型                                          │
│  │ • 节省 70%+ 成本                                             │
│                                                                 │
│  性能优化 ──────────────────────────────────────────────────   │
│  │ • 低延迟任务用快速模型                                        │
│  │ • 复杂任务用强大模型                                          │
│                                                                 │
│  可用性 ────────────────────────────────────────────────────   │
│  │ • 主模型故障时自动切换                                        │
│  │ • 负载均衡                                                   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

主流模型对比

模型 速度 能力 成本 (输入/输出 per 1M) 适用场景
GPT-4o $2.5 / $10 复杂推理
GPT-4o-mini 很快 中+ $0.15 / $0.6 一般任务
GPT-3.5-turbo 很快 $0.5 / $1.5 简单任务
Claude 3.5 Sonnet $3 / $15 长文本、代码
Claude 3 Haiku 很快 $0.25 / $1.25 快速响应

基于规则的路由

简单规则路由

from openai import OpenAI
from enum import Enum
from typing import Optional

class ModelTier(Enum):
    FAST = "fast"      # 快速便宜
    STANDARD = "standard"  # 标准
    POWERFUL = "powerful"  # 强大

class RuleBasedRouter:
    """基于规则的模型路由器"""
    
    MODEL_CONFIG = {
        ModelTier.FAST: {
            "model": "gpt-4o-mini",
            "max_tokens": 1000,
            "cost_per_1k": 0.00015
        },
        ModelTier.STANDARD: {
            "model": "gpt-4o-mini",
            "max_tokens": 2000,
            "cost_per_1k": 0.00015
        },
        ModelTier.POWERFUL: {
            "model": "gpt-4o",
            "max_tokens": 4000,
            "cost_per_1k": 0.0025
        }
    }
    
    def __init__(self):
        self.client = OpenAI()
    
    def route(self, prompt: str, context: dict = None) -> ModelTier:
        """根据规则选择模型层级"""
        context = context or {}
        
        # 1. 根据长度路由
        if len(prompt) > 10000:
            return ModelTier.POWERFUL
            
        # 2. 根据关键词路由
        complex_keywords = ["分析", "推理", "代码重构", "架构设计"]
        if any(kw in prompt for kw in complex_keywords):
            return ModelTier.POWERFUL
            
        return ModelTier.FAST

---

#### 进阶:语义路由 (Semantic Routing)

基于规则的路由太死板语义路由通过计算 Prompt 的向量将其与预定义的意图类别进行匹配从而实现更智能的调度

#### 使用 Semantic Router 库

```python
from semantic_router import Route
from semantic_router.encoders import OpenAIEncoder
from semantic_router.layer import RouteLayer

# 1. 定义路由意图
chitchat = Route(
    name="chitchat",
    utterances=[
        "你好", "今天天气怎么样", "讲个笑话", "你是谁"
    ],
)

complex_task = Route(
    name="complex_task",
    utterances=[
        "帮我写一个分布式锁的实现",
        "分析这段复杂的 SQL 性能瓶颈",
        "解释量子纠缠的原理"
    ],
)

# 2. 创建路由层
encoder = OpenAIEncoder()
rl = RouteLayer(encoder=encoder, routes=[chitchat, complex_task])

# 3. 执行路由
def smart_route(text: str):
    route = rl(text)
    if route.name == "chitchat":
        return "gpt-4o-mini"  # 闲聊用便宜模型
    elif route.name == "complex_task":
        return "gpt-4o"       # 复杂任务用强模型
    else:
        return "gpt-4o-mini"  # 默认

级联策略 (Cascading / Fallback)

级联策略的核心是:先尝试便宜的,不行再换贵的。

实现逻辑

  1. 尝试 (Try):使用小型模型(如 GPT-4o-mini)生成初步回答。
  2. 评估 (Evaluate):使用一个轻量级的逻辑(或另一个模型)判断回答是否满足要求。
  3. 回退 (Fallback):如果评估不通过,则调用大型模型(如 GPT-4o)重新生成。
def cascading_generation(prompt: str):
    # 第一步:尝试小模型
    response = call_model("gpt-4o-mini", prompt)
    
    # 第二步:快速评估(例如检查是否包含特定格式,或长度是否达标)
    if is_quality_sufficient(response):
        return response
        
    # 第三步:回退到大模型
    print("小模型回答质量不足,正在切换到大模型...")
    return call_model("gpt-4o", prompt)

模型协作:Mixture of Agents (MoA)

MoA 是一种让多个模型共同参与生成的技术。研究表明,多个中等规模模型的协作效果往往能超过单个顶级模型。

MoA 架构

  • Proposers (提议者):多个不同的模型(如 Llama 3, Qwen, Claude Haiku)针对同一个问题给出各自的回答。
  • Aggregator (聚合者):一个强大的模型(如 GPT-4o)接收所有提议者的回答,并进行综合、纠错和润色,输出最终结果。
def mixture_of_agents(prompt: str):
    # 1. 并行获取多个模型的建议
    proposals = parallel_call([
        "llama-3-70b", 
        "qwen-max", 
        "claude-3-haiku"
    ], prompt)
    
    # 2. 聚合
    aggregation_prompt = f"""以下是多个 AI 对同一个问题的回答,请你参考这些回答,
提取它们的优点,纠正错误,并给出一个最完美的最终答案。

问题:{prompt}
回答列表:
{proposals}
"""
    return call_model("gpt-4o", aggregation_prompt)

行业前沿:RouteLLM

RouteLLM 是一个开源框架,它训练了一个专门的“路由器模型”。这个路由器可以预测:对于当前的 Prompt,GPT-4o-mini 是否能达到 GPT-4o 90% 的表现?如果能,就用 mini。 这种方法比简单的语义匹配更科学,能将成本降低 50% 以上,而性能几乎无损。


总结

多模型路由是 LLM 应用走向大规模商业化的必经之路。

  • 初级:基于规则(长度、关键词)。
  • 中级语义路由 + 级联回退
  • 高级MoA 协作 + 训练专用路由器 (RouteLLM)

通过这种“混合动力”模式,开发者可以在有限的预算内,提供极致的用户体验。 # 规则 1:基于 prompt 长度 if len(prompt) < 100: return ModelTier.FAST

    # 规则 2:基于任务类型
    task_type = context.get("task_type", "general")
    if task_type in ["code_generation", "complex_reasoning", "analysis"]:
        return ModelTier.POWERFUL
    
    # 规则 3:基于关键词
    powerful_keywords = ["分析", "推理", "代码", "复杂", "详细解释"]
    if any(kw in prompt for kw in powerful_keywords):
        return ModelTier.POWERFUL
    
    # 规则 4:基于用户等级
    user_tier = context.get("user_tier", "free")
    if user_tier == "premium":
        return ModelTier.STANDARD
    
    return ModelTier.FAST

def call(self, prompt: str, context: dict = None) -> dict:
    """路由并调用模型"""
    tier = self.route(prompt, context)
    config = self.MODEL_CONFIG[tier]
    
    response = self.client.chat.completions.create(
        model=config["model"],
        messages=[{"role": "user", "content": prompt}],
        max_tokens=config["max_tokens"]
    )
    
    return {
        "content": response.choices[0].message.content,
        "model": config["model"],
        "tier": tier.value,
        "usage": response.usage.model_dump()
    }

使用

router = RuleBasedRouter()

简单问题 -> 快速模型

result = router.call(“今天天气怎么样?”) print(f”模型: {result[‘model’]}”)

复杂问题 -> 强大模型

result = router.call(“请详细分析深度学习中的注意力机制原理”) print(f”模型: {result[‘model’]}”)


#### 任务分类路由

```python
from pydantic import BaseModel
from typing import Literal
import instructor

class TaskClassification(BaseModel):
    """任务分类"""
    task_type: Literal[
        "simple_qa",      # 简单问答
        "translation",    # 翻译
        "summarization",  # 摘要
        "code_generation", # 代码生成
        "analysis",       # 分析
        "creative_writing", # 创意写作
        "complex_reasoning" # 复杂推理
    ]
    complexity: Literal["low", "medium", "high"]
    confidence: float

class ClassificationRouter:
    """基于分类的路由器"""
    
    TASK_MODEL_MAP = {
        ("simple_qa", "low"): "gpt-4o-mini",
        ("simple_qa", "medium"): "gpt-4o-mini",
        ("simple_qa", "high"): "gpt-4o",
        ("translation", "low"): "gpt-4o-mini",
        ("translation", "medium"): "gpt-4o-mini",
        ("translation", "high"): "gpt-4o",
        ("summarization", "low"): "gpt-4o-mini",
        ("summarization", "medium"): "gpt-4o-mini",
        ("summarization", "high"): "gpt-4o",
        ("code_generation", "low"): "gpt-4o-mini",
        ("code_generation", "medium"): "gpt-4o",
        ("code_generation", "high"): "gpt-4o",
        ("analysis", "low"): "gpt-4o-mini",
        ("analysis", "medium"): "gpt-4o",
        ("analysis", "high"): "gpt-4o",
        ("creative_writing", "low"): "gpt-4o-mini",
        ("creative_writing", "medium"): "gpt-4o",
        ("creative_writing", "high"): "gpt-4o",
        ("complex_reasoning", "low"): "gpt-4o",
        ("complex_reasoning", "medium"): "gpt-4o",
        ("complex_reasoning", "high"): "gpt-4o",
    }
    
    def __init__(self):
        self.client = instructor.from_openai(OpenAI())
    
    def classify(self, prompt: str) -> TaskClassification:
        """分类任务"""
        return self.client.chat.completions.create(
            model="gpt-4o-mini",  # 用轻量模型分类
            response_model=TaskClassification,
            messages=[
                {
                    "role": "system",
                    "content": "分析用户请求的任务类型和复杂度"
                },
                {"role": "user", "content": prompt}
            ]
        )
    
    def route(self, prompt: str) -> str:
        """路由到合适的模型"""
        classification = self.classify(prompt)
        key = (classification.task_type, classification.complexity)
        return self.TASK_MODEL_MAP.get(key, "gpt-4o-mini")
    
    def call(self, prompt: str) -> dict:
        """分类、路由并调用"""
        classification = self.classify(prompt)
        model = self.TASK_MODEL_MAP.get(
            (classification.task_type, classification.complexity),
            "gpt-4o-mini"
        )
        
        response = OpenAI().chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )
        
        return {
            "content": response.choices[0].message.content,
            "model": model,
            "classification": classification.model_dump()
        }

智能路由

基于嵌入的语义路由

import numpy as np
from typing import List, Dict

class SemanticRouter:
    """语义路由器 - 基于嵌入相似度"""
    
    def __init__(self):
        self.client = OpenAI()
        self.routes: List[Dict] = []
        self.route_embeddings: List[List[float]] = []
    
    def add_route(
        self,
        name: str,
        model: str,
        examples: List[str],
        description: str = ""
    ):
        """添加路由"""
        # 计算示例的平均嵌入
        embeddings = []
        for example in examples:
            emb = self._get_embedding(example)
            embeddings.append(emb)
        
        avg_embedding = np.mean(embeddings, axis=0).tolist()
        
        self.routes.append({
            "name": name,
            "model": model,
            "description": description,
            "examples": examples
        })
        self.route_embeddings.append(avg_embedding)
    
    def _get_embedding(self, text: str) -> List[float]:
        """获取嵌入"""
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding
    
    def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
        """余弦相似度"""
        a = np.array(a)
        b = np.array(b)
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    
    def route(self, prompt: str) -> Dict:
        """路由到最匹配的模型"""
        prompt_embedding = self._get_embedding(prompt)
        
        similarities = [
            self._cosine_similarity(prompt_embedding, route_emb)
            for route_emb in self.route_embeddings
        ]
        
        best_idx = np.argmax(similarities)
        return {
            **self.routes[best_idx],
            "confidence": similarities[best_idx]
        }
    
    def call(self, prompt: str) -> dict:
        """路由并调用"""
        route = self.route(prompt)
        
        response = self.client.chat.completions.create(
            model=route["model"],
            messages=[{"role": "user", "content": prompt}]
        )
        
        return {
            "content": response.choices[0].message.content,
            "route": route["name"],
            "model": route["model"],
            "confidence": route["confidence"]
        }

# 使用
router = SemanticRouter()

# 添加代码生成路由
router.add_route(
    name="code",
    model="gpt-4o",
    examples=[
        "写一个 Python 函数计算斐波那契数列",
        "实现一个二叉树的遍历",
        "帮我写一个 API 接口",
        "如何用 JavaScript 实现防抖",
        "写一个排序算法"
    ]
)

# 添加简单问答路由
router.add_route(
    name="simple_qa",
    model="gpt-4o-mini",
    examples=[
        "今天天气怎么样",
        "北京的人口是多少",
        "苹果公司是哪年成立的",
        "珠穆朗玛峰有多高",
        "水的沸点是多少"
    ]
)

# 添加分析路由
router.add_route(
    name="analysis",
    model="gpt-4o",
    examples=[
        "分析这篇文章的论点",
        "评估这个商业计划的可行性",
        "比较两种技术方案的优劣",
        "解释这个现象背后的原因",
        "总结这份报告的要点"
    ]
)

# 测试路由
result = router.call("帮我写一个快速排序的实现")
print(f"路由: {result['route']}, 模型: {result['model']}")

LLM 驱动的路由

from pydantic import BaseModel
from typing import List
import instructor

class RoutingDecision(BaseModel):
    """路由决策"""
    selected_model: str
    reasoning: str
    confidence: float
    estimated_tokens: int

class LLMRouter:
    """LLM 驱动的智能路由器"""
    
    AVAILABLE_MODELS = {
        "gpt-4o-mini": {
            "description": "快速、便宜,适合简单任务",
            "cost": "",
            "capability": "中等",
            "speed": ""
        },
        "gpt-4o": {
            "description": "强大、全面,适合复杂任务",
            "cost": "",
            "capability": "",
            "speed": ""
        },
        "claude-3-haiku": {
            "description": "快速响应,适合简单对话",
            "cost": "",
            "capability": "中等",
            "speed": "很快"
        },
        "claude-3.5-sonnet": {
            "description": "长上下文,适合文档处理和代码",
            "cost": "中高",
            "capability": "",
            "speed": ""
        }
    }
    
    def __init__(self):
        self.client = instructor.from_openai(OpenAI())
    
    def route(self, prompt: str, constraints: dict = None) -> RoutingDecision:
        """智能路由决策"""
        constraints = constraints or {}
        
        models_info = "\n".join([
            f"- {name}: {info['description']} (成本:{info['cost']}, 能力:{info['capability']}, 速度:{info['speed']})"
            for name, info in self.AVAILABLE_MODELS.items()
        ])
        
        routing_prompt = f"""作为模型路由专家,选择最适合处理以下请求的模型。

可用模型:
{models_info}

用户请求:
{prompt}

约束条件:
- 最大成本偏好: {constraints.get('max_cost', '无限制')}
- 速度要求: {constraints.get('speed', '无特殊要求')}
- 质量要求: {constraints.get('quality', '标准')}

请选择最佳模型并说明理由。"""

        return self.client.chat.completions.create(
            model="gpt-4o-mini",  # 用轻量模型做路由决策
            response_model=RoutingDecision,
            messages=[{"role": "user", "content": routing_prompt}]
        )

# 使用
router = LLMRouter()

decision = router.route(
    "分析这份50页的技术文档并总结关键点",
    constraints={"quality": "", "speed": "不急"}
)

print(f"选择模型: {decision.selected_model}")
print(f"原因: {decision.reasoning}")

级联调用

简单到复杂的级联

class CascadingRouter:
    """级联路由 - 先用简单模型,不行再用复杂模型"""
    
    def __init__(
        self,
        fast_model: str = "gpt-4o-mini",
        powerful_model: str = "gpt-4o",
        confidence_threshold: float = 0.7
    ):
        self.client = OpenAI()
        self.fast_model = fast_model
        self.powerful_model = powerful_model
        self.threshold = confidence_threshold
    
    def evaluate_response(self, prompt: str, response: str) -> float:
        """评估响应质量"""
        eval_prompt = f"""评估以下回答的质量(0-1分):

问题: {prompt}
回答: {response}

评估标准:
- 是否完整回答了问题
- 信息是否准确
- 是否有明显错误或遗漏

只返回一个 0-1 之间的数字。"""

        eval_response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": eval_prompt}],
            max_tokens=10
        )
        
        try:
            return float(eval_response.choices[0].message.content.strip())
        except:
            return 0.5
    
    def call(self, prompt: str) -> dict:
        """级联调用"""
        # 1. 先用快速模型
        fast_response = self.client.chat.completions.create(
            model=self.fast_model,
            messages=[{"role": "user", "content": prompt}]
        )
        fast_content = fast_response.choices[0].message.content
        
        # 2. 评估质量
        confidence = self.evaluate_response(prompt, fast_content)
        
        # 3. 如果质量不够,用强大模型
        if confidence < self.threshold:
            powerful_response = self.client.chat.completions.create(
                model=self.powerful_model,
                messages=[{"role": "user", "content": prompt}]
            )
            return {
                "content": powerful_response.choices[0].message.content,
                "model": self.powerful_model,
                "cascaded": True,
                "fast_confidence": confidence
            }
        
        return {
            "content": fast_content,
            "model": self.fast_model,
            "cascaded": False,
            "confidence": confidence
        }

# 使用
cascade = CascadingRouter(confidence_threshold=0.7)

# 简单问题,快速模型就能处理
result = cascade.call("Python 中如何定义一个列表?")
print(f"模型: {result['model']}, 级联: {result['cascaded']}")

# 复杂问题,可能需要级联
result = cascade.call("解释 Transformer 架构中自注意力机制的数学原理")
print(f"模型: {result['model']}, 级联: {result['cascaded']}")

并行验证

import asyncio
from openai import AsyncOpenAI

class ParallelValidationRouter:
    """并行验证路由 - 同时调用,选择最佳结果"""
    
    def __init__(self, models: List[str]):
        self.client = AsyncOpenAI()
        self.models = models
    
    async def call_model(self, model: str, prompt: str) -> dict:
        """调用单个模型"""
        response = await self.client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )
        return {
            "model": model,
            "content": response.choices[0].message.content
        }
    
    async def evaluate_all(self, prompt: str, responses: List[dict]) -> dict:
        """评估所有响应,选择最佳"""
        eval_prompt = f"""从以下回答中选择最佳的一个。

问题: {prompt}

回答列表:
{chr(10).join(f"[{r['model']}]: {r['content'][:500]}..." for r in responses)}

返回最佳回答的模型名称。"""

        response = await self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": eval_prompt}],
            max_tokens=50
        )
        
        best_model = response.choices[0].message.content.strip()
        
        for r in responses:
            if r["model"] in best_model:
                return r
        
        return responses[0]  # 默认返回第一个
    
    async def call(self, prompt: str) -> dict:
        """并行调用所有模型"""
        # 并行调用
        tasks = [self.call_model(m, prompt) for m in self.models]
        responses = await asyncio.gather(*tasks)
        
        # 评估选择最佳
        best = await self.evaluate_all(prompt, responses)
        
        return {
            **best,
            "all_responses": responses
        }

# 使用
async def main():
    router = ParallelValidationRouter(["gpt-4o-mini", "gpt-4o"])
    result = await router.call("解释什么是量子纠缠")
    print(f"最佳模型: {result['model']}")

asyncio.run(main())

多 Agent 协作

专家 Agent 系统

from typing import Dict, List
from pydantic import BaseModel

class ExpertAgent:
    """专家 Agent"""
    
    def __init__(
        self,
        name: str,
        expertise: str,
        model: str,
        system_prompt: str
    ):
        self.name = name
        self.expertise = expertise
        self.model = model
        self.system_prompt = system_prompt
        self.client = OpenAI()
    
    def respond(self, query: str, context: str = "") -> str:
        """生成响应"""
        messages = [
            {"role": "system", "content": self.system_prompt}
        ]
        
        if context:
            messages.append({
                "role": "system",
                "content": f"上下文信息:{context}"
            })
        
        messages.append({"role": "user", "content": query})
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages
        )
        
        return response.choices[0].message.content

class MultiAgentOrchestrator:
    """多 Agent 协调器"""
    
    def __init__(self):
        self.agents: Dict[str, ExpertAgent] = {}
        self.client = OpenAI()
    
    def add_agent(self, agent: ExpertAgent):
        """添加 Agent"""
        self.agents[agent.name] = agent
    
    def select_agent(self, query: str) -> str:
        """选择合适的 Agent"""
        agents_info = "\n".join([
            f"- {name}: {agent.expertise}"
            for name, agent in self.agents.items()
        ])
        
        prompt = f"""根据用户问题选择最合适的专家。

可用专家:
{agents_info}

用户问题: {query}

返回最合适的专家名称。"""

        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=50
        )
        
        selected = response.choices[0].message.content.strip()
        
        # 匹配 Agent
        for name in self.agents:
            if name.lower() in selected.lower():
                return name
        
        return list(self.agents.keys())[0]  # 默认第一个
    
    def route_and_respond(self, query: str) -> dict:
        """路由到 Agent 并获取响应"""
        agent_name = self.select_agent(query)
        agent = self.agents[agent_name]
        response = agent.respond(query)
        
        return {
            "agent": agent_name,
            "expertise": agent.expertise,
            "response": response
        }
    
    def collaborative_response(self, query: str) -> dict:
        """多 Agent 协作响应"""
        # 收集所有相关 Agent 的输入
        insights = []
        for name, agent in self.agents.items():
            insight = agent.respond(
                f"从你的专业角度简要分析:{query}",
            )
            insights.append({
                "agent": name,
                "expertise": agent.expertise,
                "insight": insight
            })
        
        # 综合所有洞察
        synthesis_prompt = f"""综合以下专家意见,给出全面的回答。

问题: {query}

专家意见:
{chr(10).join(f"[{i['agent']}({i['expertise']})]: {i['insight']}" for i in insights)}

请综合所有专家意见,给出结构化的完整回答。"""

        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": synthesis_prompt}]
        )
        
        return {
            "final_response": response.choices[0].message.content,
            "expert_insights": insights
        }

# 使用
orchestrator = MultiAgentOrchestrator()

# 添加专家 Agent
orchestrator.add_agent(ExpertAgent(
    name="技术专家",
    expertise="软件开发、系统架构、技术选型",
    model="gpt-4o",
    system_prompt="你是一位资深技术专家,擅长软件开发和系统架构设计。"
))

orchestrator.add_agent(ExpertAgent(
    name="商业分析师",
    expertise="商业模式、市场分析、ROI 评估",
    model="gpt-4o",
    system_prompt="你是一位商业分析师,擅长商业模式分析和市场评估。"
))

orchestrator.add_agent(ExpertAgent(
    name="用户体验专家",
    expertise="用户研究、交互设计、可用性",
    model="gpt-4o-mini",
    system_prompt="你是一位用户体验专家,擅长用户研究和交互设计。"
))

# 单 Agent 响应
result = orchestrator.route_and_respond("如何设计一个高并发的系统?")
print(f"选择专家: {result['agent']}")
print(f"回答: {result['response']}")

# 多 Agent 协作
result = orchestrator.collaborative_response("如何设计一个成功的 SaaS 产品?")
print(f"综合回答: {result['final_response']}")

故障转移与负载均衡

自动故障转移

import time
from typing import List, Optional
from dataclasses import dataclass

@dataclass
class ModelEndpoint:
    """模型端点"""
    name: str
    model: str
    priority: int = 0
    is_healthy: bool = True
    failure_count: int = 0
    last_failure: float = 0

class FailoverRouter:
    """故障转移路由器"""
    
    def __init__(
        self,
        endpoints: List[ModelEndpoint],
        max_failures: int = 3,
        recovery_time: int = 60
    ):
        self.endpoints = sorted(endpoints, key=lambda x: x.priority)
        self.max_failures = max_failures
        self.recovery_time = recovery_time
        self.client = OpenAI()
    
    def get_healthy_endpoint(self) -> Optional[ModelEndpoint]:
        """获取健康的端点"""
        current_time = time.time()
        
        for endpoint in self.endpoints:
            # 检查是否可以恢复
            if not endpoint.is_healthy:
                if current_time - endpoint.last_failure > self.recovery_time:
                    endpoint.is_healthy = True
                    endpoint.failure_count = 0
            
            if endpoint.is_healthy:
                return endpoint
        
        return None
    
    def mark_failure(self, endpoint: ModelEndpoint):
        """标记失败"""
        endpoint.failure_count += 1
        endpoint.last_failure = time.time()
        
        if endpoint.failure_count >= self.max_failures:
            endpoint.is_healthy = False
    
    def mark_success(self, endpoint: ModelEndpoint):
        """标记成功"""
        endpoint.failure_count = 0
    
    def call(self, prompt: str) -> dict:
        """带故障转移的调用"""
        tried_endpoints = []
        
        while True:
            endpoint = self.get_healthy_endpoint()
            
            if not endpoint:
                raise Exception("所有端点都不可用")
            
            if endpoint in tried_endpoints:
                raise Exception("所有健康端点都已尝试")
            
            tried_endpoints.append(endpoint)
            
            try:
                response = self.client.chat.completions.create(
                    model=endpoint.model,
                    messages=[{"role": "user", "content": prompt}],
                    timeout=30
                )
                
                self.mark_success(endpoint)
                
                return {
                    "content": response.choices[0].message.content,
                    "endpoint": endpoint.name,
                    "model": endpoint.model
                }
                
            except Exception as e:
                self.mark_failure(endpoint)
                print(f"端点 {endpoint.name} 失败: {e}")
                continue

# 使用
endpoints = [
    ModelEndpoint(name="primary", model="gpt-4o", priority=0),
    ModelEndpoint(name="secondary", model="gpt-4o-mini", priority=1),
    ModelEndpoint(name="backup", model="gpt-3.5-turbo", priority=2),
]

router = FailoverRouter(endpoints)
result = router.call("你好")
print(f"使用端点: {result['endpoint']}")

最佳实践

路由策略选择

场景 推荐策略 说明
成本敏感 规则 + 级联 先用便宜模型
质量优先 语义路由 精确匹配任务
高可用 故障转移 自动切换
复杂任务 多 Agent 专家协作

监控指标

class RouterMetrics:
    """路由器监控指标"""
    
    def __init__(self):
        self.calls = []
    
    def record(self, model: str, latency: float, cost: float, success: bool):
        self.calls.append({
            "model": model,
            "latency": latency,
            "cost": cost,
            "success": success,
            "timestamp": time.time()
        })
    
    def get_stats(self) -> dict:
        by_model = {}
        for call in self.calls:
            model = call["model"]
            if model not in by_model:
                by_model[model] = {"count": 0, "latency": [], "cost": 0, "errors": 0}
            
            by_model[model]["count"] += 1
            by_model[model]["latency"].append(call["latency"])
            by_model[model]["cost"] += call["cost"]
            if not call["success"]:
                by_model[model]["errors"] += 1
        
        return {
            model: {
                "count": stats["count"],
                "avg_latency": np.mean(stats["latency"]),
                "total_cost": stats["cost"],
                "error_rate": stats["errors"] / stats["count"]
            }
            for model, stats in by_model.items()
        }

总结

多模型路由是优化 LLM 应用成本和性能的关键技术:

策略 复杂度 成本节省 适用场景
规则路由 30-50% 简单分类
语义路由 40-60% 多样任务
级联调用 50-70% 质量要求高
多 Agent 视情况 复杂协作

实施建议:

  1. 从简单规则开始
  2. 收集数据优化路由
  3. 监控成本和质量
  4. 持续迭代改进

参考资源

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:《 LLM应用开发——多模型路由与协作 》

本文链接:http://localhost:3015/ai/%E5%A4%9A%E6%A8%A1%E5%9E%8B%E8%B7%AF%E7%94%B1.html

本文最后一次更新为 天前,文章中的某些内容可能已过时!