Bagging、Boosting与Stacking详解

前言

集成学习通过组合多个模型来获得比单个模型更好的预测性能。本文详细介绍三种主要的集成方法:Bagging、Boosting和Stacking。


集成学习概述

为什么集成有效

import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification, make_moons
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score

np.random.seed(42)

# 演示集成的优势
def demonstrate_ensemble_benefit():
    # 生成数据
    X, y = make_moons(n_samples=500, noise=0.3, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
    
    # 单个决策树
    single_tree = DecisionTreeClassifier(max_depth=5, random_state=42)
    single_tree.fit(X_train, y_train)
    single_acc = accuracy_score(y_test, single_tree.predict(X_test))
    
    # 多个决策树投票
    n_trees = 50
    predictions = []
    individual_accs = []
    
    for i in range(n_trees):
        # 使用不同随机种子
        tree = DecisionTreeClassifier(max_depth=5, random_state=i)
        # Bootstrap采样
        indices = np.random.choice(len(X_train), len(X_train), replace=True)
        tree.fit(X_train[indices], y_train[indices])
        pred = tree.predict(X_test)
        predictions.append(pred)
        individual_accs.append(accuracy_score(y_test, pred))
    
    # 投票
    ensemble_pred = np.round(np.mean(predictions, axis=0))
    ensemble_acc = accuracy_score(y_test, ensemble_pred)
    
    print(f"单个决策树准确率: {single_acc:.4f}")
    print(f"单个树平均准确率: {np.mean(individual_accs):.4f}")
    print(f"集成投票准确率: {ensemble_acc:.4f}")
    
    return individual_accs, ensemble_acc

individual_accs, ensemble_acc = demonstrate_ensemble_benefit()

集成方法分类

方法 基学习器关系 组合方式 代表算法
Bagging 并行独立 投票/平均 Random Forest
Boosting 串行依赖 加权求和 AdaBoost, GBDT
Stacking 两层结构 元学习器 Stacking

Bagging

Bootstrap Aggregating原理

class SimpleBagging:
    """简单Bagging实现"""
    
    def __init__(self, base_estimator, n_estimators=10):
        self.base_estimator = base_estimator
        self.n_estimators = n_estimators
        self.estimators = []
    
    def fit(self, X, y):
        self.estimators = []
        n_samples = X.shape[0]
        
        for i in range(self.n_estimators):
            # Bootstrap采样
            indices = np.random.choice(n_samples, n_samples, replace=True)
            X_bootstrap = X[indices]
            y_bootstrap = y[indices]
            
            # 训练基学习器
            estimator = DecisionTreeClassifier(max_depth=5, random_state=i)
            estimator.fit(X_bootstrap, y_bootstrap)
            self.estimators.append(estimator)
        
        return self
    
    def predict(self, X):
        # 收集所有预测
        predictions = np.array([est.predict(X) for est in self.estimators])
        # 多数投票
        return np.round(np.mean(predictions, axis=0)).astype(int)
    
    def predict_proba(self, X):
        # 平均概率
        probas = np.array([est.predict_proba(X) for est in self.estimators])
        return np.mean(probas, axis=0)

# 测试
X, y = make_moons(n_samples=500, noise=0.3, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

bagging = SimpleBagging(DecisionTreeClassifier(), n_estimators=50)
bagging.fit(X_train, y_train)
bagging_acc = accuracy_score(y_test, bagging.predict(X_test))
print(f"Bagging准确率: {bagging_acc:.4f}")

Out-of-Bag评估

def oob_score(X, y, n_estimators=50):
    """计算OOB分数"""
    n_samples = X.shape[0]
    oob_predictions = np.zeros((n_samples, 2))  # 存储预测
    oob_counts = np.zeros(n_samples)  # 存储被预测次数
    
    for i in range(n_estimators):
        # Bootstrap采样
        indices = np.random.choice(n_samples, n_samples, replace=True)
        oob_indices = np.setdiff1d(np.arange(n_samples), np.unique(indices))
        
        # 训练
        tree = DecisionTreeClassifier(max_depth=5, random_state=i)
        tree.fit(X[indices], y[indices])
        
        # 对OOB样本预测
        if len(oob_indices) > 0:
            oob_pred = tree.predict_proba(X[oob_indices])
            oob_predictions[oob_indices] += oob_pred
            oob_counts[oob_indices] += 1
    
    # 计算OOB准确率
    valid_mask = oob_counts > 0
    final_pred = np.argmax(oob_predictions[valid_mask], axis=1)
    oob_accuracy = accuracy_score(y[valid_mask], final_pred)
    
    print(f"OOB样本比例: {np.mean(oob_counts > 0):.2%}")
    print(f"OOB准确率: {oob_accuracy:.4f}")
    
    return oob_accuracy

oob_score(X, y)

Boosting

AdaBoost原理

class SimpleAdaBoost:
    """简单AdaBoost实现"""
    
    def __init__(self, n_estimators=50):
        self.n_estimators = n_estimators
        self.estimators = []
        self.alphas = []
    
    def fit(self, X, y):
        n_samples = X.shape[0]
        # 初始化权重
        weights = np.ones(n_samples) / n_samples
        
        self.estimators = []
        self.alphas = []
        
        for t in range(self.n_estimators):
            # 使用带权重的数据训练弱学习器
            estimator = DecisionTreeClassifier(max_depth=1)  # 决策树桩
            
            # 根据权重采样
            indices = np.random.choice(n_samples, n_samples, replace=True, p=weights)
            estimator.fit(X[indices], y[indices])
            
            # 预测
            predictions = estimator.predict(X)
            
            # 计算加权错误率
            incorrect = predictions != y
            error = np.sum(weights * incorrect) / np.sum(weights)
            
            # 避免除零
            error = np.clip(error, 1e-10, 1 - 1e-10)
            
            # 计算学习器权重
            alpha = 0.5 * np.log((1 - error) / error)
            
            # 更新样本权重
            weights *= np.exp(-alpha * y * (2 * predictions - 1))
            weights /= np.sum(weights)
            
            self.estimators.append(estimator)
            self.alphas.append(alpha)
        
        return self
    
    def predict(self, X):
        # 加权投票
        predictions = np.zeros(X.shape[0])
        for alpha, estimator in zip(self.alphas, self.estimators):
            predictions += alpha * (2 * estimator.predict(X) - 1)
        return (predictions > 0).astype(int)

# 测试AdaBoost
adaboost = SimpleAdaBoost(n_estimators=50)
adaboost.fit(X_train, y_train)
ada_acc = accuracy_score(y_test, adaboost.predict(X_test))
print(f"AdaBoost准确率: {ada_acc:.4f}")

Gradient Boosting原理

class SimpleGradientBoosting:
    """简单梯度提升实现(回归)"""
    
    def __init__(self, n_estimators=100, learning_rate=0.1, max_depth=3):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.estimators = []
        self.init_pred = None
    
    def fit(self, X, y):
        # 初始预测为均值
        self.init_pred = np.mean(y)
        current_pred = np.full(len(y), self.init_pred)
        
        self.estimators = []
        
        for i in range(self.n_estimators):
            # 计算负梯度(残差)
            residuals = y - current_pred
            
            # 拟合残差
            tree = DecisionTreeClassifier(max_depth=self.max_depth, random_state=i)
            # 对于回归,这里简化处理
            tree.fit(X, (residuals > 0).astype(int))
            
            # 更新预测
            update = tree.predict(X) * 2 - 1  # 转换为-1/1
            current_pred += self.learning_rate * update
            
            self.estimators.append(tree)
        
        return self
    
    def predict(self, X):
        pred = np.full(X.shape[0], self.init_pred)
        for tree in self.estimators:
            update = tree.predict(X) * 2 - 1
            pred += self.learning_rate * update
        return pred

print("Gradient Boosting演示完成")

Stacking

Stacking原理与实现

from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import cross_val_predict

class SimpleStacking:
    """简单Stacking实现"""
    
    def __init__(self, base_estimators, meta_estimator):
        self.base_estimators = base_estimators
        self.meta_estimator = meta_estimator
        self.fitted_base = []
    
    def fit(self, X, y):
        # 生成元特征
        meta_features = np.zeros((X.shape[0], len(self.base_estimators)))
        
        self.fitted_base = []
        for i, estimator in enumerate(self.base_estimators):
            # 使用交叉验证生成元特征(避免过拟合)
            meta_features[:, i] = cross_val_predict(estimator, X, y, cv=5)
            
            # 在全部数据上训练
            estimator.fit(X, y)
            self.fitted_base.append(estimator)
        
        # 训练元学习器
        self.meta_estimator.fit(meta_features, y)
        
        return self
    
    def predict(self, X):
        # 生成元特征
        meta_features = np.zeros((X.shape[0], len(self.fitted_base)))
        for i, estimator in enumerate(self.fitted_base):
            meta_features[:, i] = estimator.predict(X)
        
        # 元学习器预测
        return self.meta_estimator.predict(meta_features)

# 测试Stacking
base_estimators = [
    DecisionTreeClassifier(max_depth=5, random_state=42),
    KNeighborsClassifier(n_neighbors=5),
    SVC(kernel='rbf', random_state=42)
]
meta_estimator = LogisticRegression(random_state=42)

stacking = SimpleStacking(base_estimators, meta_estimator)
stacking.fit(X_train, y_train)
stacking_acc = accuracy_score(y_test, stacking.predict(X_test))
print(f"Stacking准确率: {stacking_acc:.4f}")

Sklearn集成方法

使用sklearn实现

from sklearn.ensemble import (RandomForestClassifier, AdaBoostClassifier,
                              GradientBoostingClassifier, VotingClassifier,
                              StackingClassifier, BaggingClassifier)

# 准备数据
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
                          random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 各种集成方法
models = {
    'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
    'AdaBoost': AdaBoostClassifier(n_estimators=100, random_state=42),
    'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
    'Bagging': BaggingClassifier(n_estimators=100, random_state=42),
}

results = []
for name, model in models.items():
    model.fit(X_train, y_train)
    acc = accuracy_score(y_test, model.predict(X_test))
    results.append({'Model': name, 'Accuracy': acc})
    print(f"{name}: {acc:.4f}")

# Voting集成
voting = VotingClassifier(
    estimators=[
        ('rf', RandomForestClassifier(n_estimators=50, random_state=42)),
        ('gb', GradientBoostingClassifier(n_estimators=50, random_state=42)),
        ('ada', AdaBoostClassifier(n_estimators=50, random_state=42))
    ],
    voting='soft'
)
voting.fit(X_train, y_train)
voting_acc = accuracy_score(y_test, voting.predict(X_test))
print(f"Voting: {voting_acc:.4f}")

可视化对比

决策边界对比

def plot_ensemble_comparison():
    # 生成数据
    X, y = make_moons(n_samples=300, noise=0.25, random_state=42)
    
    models = {
        'Decision Tree': DecisionTreeClassifier(max_depth=5, random_state=42),
        'Random Forest': RandomForestClassifier(n_estimators=50, random_state=42),
        'AdaBoost': AdaBoostClassifier(n_estimators=50, random_state=42),
        'Gradient Boosting': GradientBoostingClassifier(n_estimators=50, random_state=42)
    }
    
    fig, axes = plt.subplots(2, 2, figsize=(12, 10))
    axes = axes.flatten()
    
    # 创建网格
    x_min, x_max = X[:, 0].min() - 0.5, X[:, 0].max() + 0.5
    y_min, y_max = X[:, 1].min() - 0.5, X[:, 1].max() + 0.5
    xx, yy = np.meshgrid(np.linspace(x_min, x_max, 100),
                         np.linspace(y_min, y_max, 100))
    
    for ax, (name, model) in zip(axes, models.items()):
        model.fit(X, y)
        Z = model.predict(np.c_[xx.ravel(), yy.ravel()])
        Z = Z.reshape(xx.shape)
        
        ax.contourf(xx, yy, Z, alpha=0.3, cmap='coolwarm')
        ax.scatter(X[:, 0], X[:, 1], c=y, cmap='coolwarm', edgecolors='black')
        ax.set_title(name)
    
    plt.tight_layout()
    plt.show()

plot_ensemble_comparison()

常见问题

Q1: Bagging和Boosting的区别?

特性 Bagging Boosting
训练方式 并行独立 串行依赖
样本权重 均匀 自适应
目标 降低方差 降低偏差
对异常值 鲁棒 敏感

Q2: 何时使用Stacking?

  • 基学习器互补性强
  • 有足够计算资源
  • 追求最高性能

Q3: 集成模型会过拟合吗?

Bagging通常不会,Boosting需要控制迭代次数和学习率。

Q4: 基学习器数量如何选择?

  • Bagging: 越多越好,但边际收益递减
  • Boosting: 需要早停或交叉验证

总结

方法 优点 缺点
Bagging 降低方差,易并行 不能显著降低偏差
Boosting 降低偏差,精度高 易过拟合,难并行
Stacking 灵活,性能好 复杂,计算量大

参考资料

  • Breiman, L. (1996). “Bagging Predictors”
  • Freund, Y. & Schapire, R. (1997). “A Decision-Theoretic Generalization of On-Line Learning”
  • Friedman, J. (2001). “Greedy Function Approximation: A Gradient Boosting Machine”
  • Wolpert, D. (1992). “Stacked Generalization”

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:《 机器学习基础系列——集成学习 》

本文链接:http://localhost:3015/ai/%E9%9B%86%E6%88%90%E5%AD%A6%E4%B9%A0.html

本文最后一次更新为 天前,文章中的某些内容可能已过时!