ID3、C4.5、CART

前言

决策树是最直观的机器学习算法之一,模拟人类决策过程:通过一系列问题将数据分类。它是很多集成方法(随机森林、XGBoost)的基础。


基本概念

决策树结构

  • 根节点:包含所有样本
  • 内部节点:对特征的测试
  • 分支:测试的结果
  • 叶节点:类别标签或值
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import load_iris
from sklearn.tree import DecisionTreeClassifier, plot_tree

# 加载数据
iris = load_iris()
X, y = iris.data[:, [2, 3]], iris.target  # 只用花瓣长度和宽度
feature_names = [iris.feature_names[2], iris.feature_names[3]]

# 训练决策树
dt = DecisionTreeClassifier(max_depth=3, random_state=42)
dt.fit(X, y)

# 可视化决策树
plt.figure(figsize=(15, 10))
plot_tree(dt, feature_names=feature_names, class_names=iris.target_names,
          filled=True, rounded=True, fontsize=10)
plt.title('鸢尾花决策树')
plt.tight_layout()
plt.show()

决策边界可视化

# 绘制决策边界
def plot_decision_boundary(clf, X, y, ax, title):
    x_min, x_max = X[:, 0].min() - 0.5, X[:, 0].max() + 0.5
    y_min, y_max = X[:, 1].min() - 0.5, X[:, 1].max() + 0.5
    
    xx, yy = np.meshgrid(np.linspace(x_min, x_max, 200),
                         np.linspace(y_min, y_max, 200))
    
    Z = clf.predict(np.c_[xx.ravel(), yy.ravel()])
    Z = Z.reshape(xx.shape)
    
    ax.contourf(xx, yy, Z, alpha=0.3, cmap='viridis')
    scatter = ax.scatter(X[:, 0], X[:, 1], c=y, cmap='viridis', edgecolors='k')
    ax.set_xlabel(feature_names[0])
    ax.set_ylabel(feature_names[1])
    ax.set_title(title)
    return scatter

fig, axes = plt.subplots(1, 3, figsize=(15, 5))

for ax, depth in zip(axes, [1, 3, None]):
    dt_temp = DecisionTreeClassifier(max_depth=depth, random_state=42)
    dt_temp.fit(X, y)
    plot_decision_boundary(dt_temp, X, y, ax, f'max_depth={depth}')

plt.tight_layout()
plt.show()

特征选择标准

信息增益(ID3)

熵:度量数据的不确定性

\[H(D) = -\sum_{k=1}^{K} p_k \log_2 p_k\]

条件熵:在特征A条件下的熵

\[H(D|A) = \sum_{v} \frac{|D_v|}{|D|} H(D_v)\]

信息增益:

\[\text{Gain}(D, A) = H(D) - H(D|A)\]
def entropy(y):
    """计算熵"""
    _, counts = np.unique(y, return_counts=True)
    probs = counts / len(y)
    return -np.sum(probs * np.log2(probs + 1e-10))

def information_gain(X, y, feature_idx, threshold=None):
    """计算信息增益"""
    parent_entropy = entropy(y)
    
    if threshold is None:
        # 离散特征
        values = np.unique(X[:, feature_idx])
        child_entropy = 0
        for v in values:
            mask = X[:, feature_idx] == v
            child_entropy += np.sum(mask) / len(y) * entropy(y[mask])
    else:
        # 连续特征,按阈值分割
        left_mask = X[:, feature_idx] <= threshold
        right_mask = ~left_mask
        n = len(y)
        child_entropy = (np.sum(left_mask) * entropy(y[left_mask]) + 
                        np.sum(right_mask) * entropy(y[right_mask])) / n
    
    return parent_entropy - child_entropy

# 示例
print(f"数据集熵: {entropy(y):.4f}")
print(f"花瓣长度信息增益: {information_gain(X, y, 0, threshold=2.5):.4f}")
print(f"花瓣宽度信息增益: {information_gain(X, y, 1, threshold=1.5):.4f}")

信息增益率(C4.5)

信息增益倾向于选择取值多的特征,信息增益率进行校正:

\[\text{GainRatio}(D, A) = \frac{\text{Gain}(D, A)}{H_A(D)}\] \[H_A(D) = -\sum_v \frac{|D_v|}{|D|} \log_2 \frac{|D_v|}{|D|}\]
def split_entropy(X, feature_idx, threshold=None):
    """计算分裂信息"""
    if threshold is None:
        values = np.unique(X[:, feature_idx])
        probs = [np.sum(X[:, feature_idx] == v) / len(X) for v in values]
    else:
        left_ratio = np.sum(X[:, feature_idx] <= threshold) / len(X)
        probs = [left_ratio, 1 - left_ratio]
    
    return -np.sum([p * np.log2(p + 1e-10) for p in probs])

def gain_ratio(X, y, feature_idx, threshold=None):
    """计算信息增益率"""
    gain = information_gain(X, y, feature_idx, threshold)
    split_info = split_entropy(X, feature_idx, threshold)
    return gain / (split_info + 1e-10)

基尼指数(CART)

\[\text{Gini}(D) = 1 - \sum_{k=1}^{K} p_k^2\]

选择使基尼指数下降最多的分割:

\[\text{GiniGain}(D, A) = \text{Gini}(D) - \sum_v \frac{|D_v|}{|D|} \text{Gini}(D_v)\]
def gini(y):
    """计算基尼指数"""
    _, counts = np.unique(y, return_counts=True)
    probs = counts / len(y)
    return 1 - np.sum(probs ** 2)

def gini_impurity(X, y, feature_idx, threshold):
    """计算分割后的加权基尼指数"""
    left_mask = X[:, feature_idx] <= threshold
    right_mask = ~left_mask
    
    n = len(y)
    left_gini = gini(y[left_mask]) if np.sum(left_mask) > 0 else 0
    right_gini = gini(y[right_mask]) if np.sum(right_mask) > 0 else 0
    
    weighted_gini = (np.sum(left_mask) * left_gini + 
                    np.sum(right_mask) * right_gini) / n
    
    return weighted_gini

# 比较不同阈值
thresholds = np.linspace(X[:, 0].min(), X[:, 0].max(), 50)
gini_values = [gini_impurity(X, y, 0, t) for t in thresholds]

plt.figure(figsize=(10, 5))
plt.plot(thresholds, gini_values, 'b-', linewidth=2)
plt.xlabel('分割阈值(花瓣长度)')
plt.ylabel('加权基尼指数')
plt.title('不同分割点的基尼指数')
plt.axvline(x=thresholds[np.argmin(gini_values)], color='r', linestyle='--',
            label=f'最优阈值={thresholds[np.argmin(gini_values)]:.2f}')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

从零实现决策树

class DecisionTreeNode:
    def __init__(self, feature_idx=None, threshold=None, left=None, right=None, value=None):
        self.feature_idx = feature_idx  # 分割特征索引
        self.threshold = threshold       # 分割阈值
        self.left = left                 # 左子树
        self.right = right               # 右子树
        self.value = value               # 叶节点的预测值

class DecisionTreeClassifierScratch:
    def __init__(self, max_depth=None, min_samples_split=2, criterion='gini'):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.criterion = criterion
        self.root = None
        self.n_classes = None
    
    def _gini(self, y):
        """计算基尼指数"""
        _, counts = np.unique(y, return_counts=True)
        probs = counts / len(y)
        return 1 - np.sum(probs ** 2)
    
    def _entropy(self, y):
        """计算熵"""
        _, counts = np.unique(y, return_counts=True)
        probs = counts / len(y)
        return -np.sum(probs * np.log2(probs + 1e-10))
    
    def _impurity(self, y):
        if self.criterion == 'gini':
            return self._gini(y)
        else:
            return self._entropy(y)
    
    def _find_best_split(self, X, y):
        """找到最佳分割点"""
        best_gain = -1
        best_feature = None
        best_threshold = None
        
        n_features = X.shape[1]
        current_impurity = self._impurity(y)
        
        for feature_idx in range(n_features):
            thresholds = np.unique(X[:, feature_idx])
            
            for threshold in thresholds:
                left_mask = X[:, feature_idx] <= threshold
                right_mask = ~left_mask
                
                if np.sum(left_mask) == 0 or np.sum(right_mask) == 0:
                    continue
                
                # 计算加权不纯度
                n = len(y)
                left_impurity = self._impurity(y[left_mask])
                right_impurity = self._impurity(y[right_mask])
                weighted_impurity = (np.sum(left_mask) * left_impurity + 
                                    np.sum(right_mask) * right_impurity) / n
                
                gain = current_impurity - weighted_impurity
                
                if gain > best_gain:
                    best_gain = gain
                    best_feature = feature_idx
                    best_threshold = threshold
        
        return best_feature, best_threshold
    
    def _build_tree(self, X, y, depth=0):
        """递归构建决策树"""
        n_samples, n_features = X.shape
        n_classes = len(np.unique(y))
        
        # 终止条件
        if (self.max_depth is not None and depth >= self.max_depth) or \
           n_samples < self.min_samples_split or \
           n_classes == 1:
            # 返回叶节点
            leaf_value = np.bincount(y).argmax()
            return DecisionTreeNode(value=leaf_value)
        
        # 找最佳分割
        best_feature, best_threshold = self._find_best_split(X, y)
        
        if best_feature is None:
            leaf_value = np.bincount(y).argmax()
            return DecisionTreeNode(value=leaf_value)
        
        # 分割数据
        left_mask = X[:, best_feature] <= best_threshold
        right_mask = ~left_mask
        
        # 递归构建子树
        left_subtree = self._build_tree(X[left_mask], y[left_mask], depth + 1)
        right_subtree = self._build_tree(X[right_mask], y[right_mask], depth + 1)
        
        return DecisionTreeNode(
            feature_idx=best_feature,
            threshold=best_threshold,
            left=left_subtree,
            right=right_subtree
        )
    
    def fit(self, X, y):
        self.n_classes = len(np.unique(y))
        self.root = self._build_tree(np.array(X), np.array(y))
        return self
    
    def _predict_single(self, x, node):
        """预测单个样本"""
        if node.value is not None:
            return node.value
        
        if x[node.feature_idx] <= node.threshold:
            return self._predict_single(x, node.left)
        else:
            return self._predict_single(x, node.right)
    
    def predict(self, X):
        return np.array([self._predict_single(x, self.root) for x in X])
    
    def score(self, X, y):
        return np.mean(self.predict(X) == y)

# 测试
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

dt_scratch = DecisionTreeClassifierScratch(max_depth=3, criterion='gini')
dt_scratch.fit(X_train, y_train)

print(f"自实现 - 训练准确率: {dt_scratch.score(X_train, y_train):.4f}")
print(f"自实现 - 测试准确率: {dt_scratch.score(X_test, y_test):.4f}")

# 与sklearn对比
dt_sklearn = DecisionTreeClassifier(max_depth=3, criterion='gini', random_state=42)
dt_sklearn.fit(X_train, y_train)
print(f"sklearn - 测试准确率: {dt_sklearn.score(X_test, y_test):.4f}")

剪枝

为什么需要剪枝

完全生长的决策树容易过拟合,需要剪枝来提高泛化能力。

预剪枝

在树生成过程中提前停止:

  • max_depth:最大深度
  • min_samples_split:节点分裂所需最小样本数
  • min_samples_leaf:叶节点最小样本数
  • max_leaf_nodes:最大叶节点数
# 不同剪枝参数的影响
fig, axes = plt.subplots(2, 2, figsize=(14, 12))

params = [
    {'max_depth': None},
    {'max_depth': 3},
    {'min_samples_split': 10},
    {'min_samples_leaf': 5}
]

for ax, param in zip(axes.ravel(), params):
    dt_temp = DecisionTreeClassifier(**param, random_state=42)
    dt_temp.fit(X, y)
    
    plot_decision_boundary(dt_temp, X, y, ax, str(param))
    train_score = dt_temp.score(X, y)
    ax.set_title(f"{param}\nTrain Acc: {train_score:.3f}")

plt.tight_layout()
plt.show()

后剪枝(代价复杂度剪枝)

\[R_\alpha(T) = R(T) + \alpha |T|\]
  • $R(T)$:树的训练误差
  • $ T $:叶节点数量
  • $\alpha$:复杂度参数
# 代价复杂度剪枝路径
path = dt.cost_complexity_pruning_path(X_train, y_train)
ccp_alphas = path.ccp_alphas
impurities = path.impurities

# 不同alpha值的树
trees = []
for ccp_alpha in ccp_alphas:
    dt_temp = DecisionTreeClassifier(ccp_alpha=ccp_alpha, random_state=42)
    dt_temp.fit(X_train, y_train)
    trees.append(dt_temp)

# 评估
train_scores = [tree.score(X_train, y_train) for tree in trees]
test_scores = [tree.score(X_test, y_test) for tree in trees]
n_leaves = [tree.get_n_leaves() for tree in trees]

fig, axes = plt.subplots(1, 3, figsize=(15, 4))

axes[0].plot(ccp_alphas, impurities, 'b-o', markersize=3)
axes[0].set_xlabel('alpha')
axes[0].set_ylabel('不纯度')
axes[0].set_title('剪枝路径')

axes[1].plot(ccp_alphas, n_leaves, 'g-o', markersize=3)
axes[1].set_xlabel('alpha')
axes[1].set_ylabel('叶节点数')
axes[1].set_title('树大小')

axes[2].plot(ccp_alphas, train_scores, 'b-', label='训练集')
axes[2].plot(ccp_alphas, test_scores, 'r-', label='测试集')
axes[2].set_xlabel('alpha')
axes[2].set_ylabel('准确率')
axes[2].set_title('准确率 vs alpha')
axes[2].legend()

plt.tight_layout()
plt.show()

# 找最优alpha
best_idx = np.argmax(test_scores)
print(f"最优alpha: {ccp_alphas[best_idx]:.6f}")
print(f"最优测试准确率: {test_scores[best_idx]:.4f}")
print(f"叶节点数: {n_leaves[best_idx]}")

决策树回归

from sklearn.tree import DecisionTreeRegressor

# 生成回归数据
np.random.seed(42)
X_reg = np.sort(np.random.rand(200) * 10).reshape(-1, 1)
y_reg = np.sin(X_reg).ravel() + np.random.randn(200) * 0.3

# 不同深度的回归树
plt.figure(figsize=(15, 5))

X_plot = np.linspace(0, 10, 100).reshape(-1, 1)

for i, depth in enumerate([2, 5, None]):
    plt.subplot(1, 3, i+1)
    
    dt_reg = DecisionTreeRegressor(max_depth=depth, random_state=42)
    dt_reg.fit(X_reg, y_reg)
    y_pred = dt_reg.predict(X_plot)
    
    plt.scatter(X_reg, y_reg, c='blue', alpha=0.3, label='数据')
    plt.plot(X_plot, y_pred, 'r-', linewidth=2, label='预测')
    plt.plot(X_plot, np.sin(X_plot), 'g--', linewidth=2, label='真实')
    
    r2 = dt_reg.score(X_reg, y_reg)
    plt.title(f'max_depth={depth}, R²={r2:.3f}')
    plt.legend()

plt.tight_layout()
plt.show()

特征重要性

# 使用完整的鸢尾花数据集
X_full = iris.data
dt_full = DecisionTreeClassifier(max_depth=4, random_state=42)
dt_full.fit(X_full, y)

# 特征重要性
importances = dt_full.feature_importances_

plt.figure(figsize=(10, 6))
indices = np.argsort(importances)[::-1]

plt.bar(range(len(importances)), importances[indices], align='center')
plt.xticks(range(len(importances)), np.array(iris.feature_names)[indices], rotation=45)
plt.xlabel('特征')
plt.ylabel('重要性')
plt.title('决策树特征重要性')
plt.tight_layout()
plt.show()

for name, imp in zip(np.array(iris.feature_names)[indices], importances[indices]):
    print(f"{name}: {imp:.4f}")

ID3, C4.5, CART对比

算法 特征选择 树类型 缺失值处理 剪枝
ID3 信息增益 多叉树 不支持
C4.5 信息增益率 多叉树 支持 后剪枝
CART 基尼指数 二叉树 支持 后剪枝

常见问题

Q1: 决策树的优缺点?

优点 缺点
可解释性强 容易过拟合
无需特征缩放 不稳定(数据小变化导致树大变化)
处理非线性关系 难以拟合复杂边界
处理缺失值 偏向多值特征

Q2: 什么时候用基尼,什么时候用熵?

  • 通常差别不大
  • 基尼计算更快(无对数运算)
  • 熵对类别分布更敏感

Q3: 如何处理连续特征?

  • 将连续值排序
  • 遍历相邻值中点作为候选分割点
  • 选择最优分割点

Q4: 如何处理缺失值?

  • C4.5:使用概率分配到各分支
  • CART:代理分裂(surrogate split)

总结

概念 说明
核心思想 递归分割特征空间
ID3 信息增益,多叉树
C4.5 信息增益率,改进ID3
CART 基尼指数,二叉树
过拟合 使用剪枝控制

参考资料

  • 《机器学习》周志华 第4章
  • 《统计学习方法》李航 第5章
  • scikit-learn 文档:Decision Trees

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:《 机器学习基础系列——决策树 》

本文链接:http://localhost:3015/ai/%E5%86%B3%E7%AD%96%E6%A0%91.html

本文最后一次更新为 天前,文章中的某些内容可能已过时!