已经是最新一篇文章了!
已经是最后一篇文章了!
ID3、C4.5、CART
前言
决策树是最直观的机器学习算法之一,模拟人类决策过程:通过一系列问题将数据分类。它是很多集成方法(随机森林、XGBoost)的基础。
基本概念
决策树结构
- 根节点:包含所有样本
- 内部节点:对特征的测试
- 分支:测试的结果
- 叶节点:类别标签或值
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import load_iris
from sklearn.tree import DecisionTreeClassifier, plot_tree
# 加载数据
iris = load_iris()
X, y = iris.data[:, [2, 3]], iris.target # 只用花瓣长度和宽度
feature_names = [iris.feature_names[2], iris.feature_names[3]]
# 训练决策树
dt = DecisionTreeClassifier(max_depth=3, random_state=42)
dt.fit(X, y)
# 可视化决策树
plt.figure(figsize=(15, 10))
plot_tree(dt, feature_names=feature_names, class_names=iris.target_names,
filled=True, rounded=True, fontsize=10)
plt.title('鸢尾花决策树')
plt.tight_layout()
plt.show()
决策边界可视化
# 绘制决策边界
def plot_decision_boundary(clf, X, y, ax, title):
x_min, x_max = X[:, 0].min() - 0.5, X[:, 0].max() + 0.5
y_min, y_max = X[:, 1].min() - 0.5, X[:, 1].max() + 0.5
xx, yy = np.meshgrid(np.linspace(x_min, x_max, 200),
np.linspace(y_min, y_max, 200))
Z = clf.predict(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
ax.contourf(xx, yy, Z, alpha=0.3, cmap='viridis')
scatter = ax.scatter(X[:, 0], X[:, 1], c=y, cmap='viridis', edgecolors='k')
ax.set_xlabel(feature_names[0])
ax.set_ylabel(feature_names[1])
ax.set_title(title)
return scatter
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
for ax, depth in zip(axes, [1, 3, None]):
dt_temp = DecisionTreeClassifier(max_depth=depth, random_state=42)
dt_temp.fit(X, y)
plot_decision_boundary(dt_temp, X, y, ax, f'max_depth={depth}')
plt.tight_layout()
plt.show()
特征选择标准
信息增益(ID3)
熵:度量数据的不确定性
\[H(D) = -\sum_{k=1}^{K} p_k \log_2 p_k\]条件熵:在特征A条件下的熵
\[H(D|A) = \sum_{v} \frac{|D_v|}{|D|} H(D_v)\]信息增益:
\[\text{Gain}(D, A) = H(D) - H(D|A)\]def entropy(y):
"""计算熵"""
_, counts = np.unique(y, return_counts=True)
probs = counts / len(y)
return -np.sum(probs * np.log2(probs + 1e-10))
def information_gain(X, y, feature_idx, threshold=None):
"""计算信息增益"""
parent_entropy = entropy(y)
if threshold is None:
# 离散特征
values = np.unique(X[:, feature_idx])
child_entropy = 0
for v in values:
mask = X[:, feature_idx] == v
child_entropy += np.sum(mask) / len(y) * entropy(y[mask])
else:
# 连续特征,按阈值分割
left_mask = X[:, feature_idx] <= threshold
right_mask = ~left_mask
n = len(y)
child_entropy = (np.sum(left_mask) * entropy(y[left_mask]) +
np.sum(right_mask) * entropy(y[right_mask])) / n
return parent_entropy - child_entropy
# 示例
print(f"数据集熵: {entropy(y):.4f}")
print(f"花瓣长度信息增益: {information_gain(X, y, 0, threshold=2.5):.4f}")
print(f"花瓣宽度信息增益: {information_gain(X, y, 1, threshold=1.5):.4f}")
信息增益率(C4.5)
信息增益倾向于选择取值多的特征,信息增益率进行校正:
\[\text{GainRatio}(D, A) = \frac{\text{Gain}(D, A)}{H_A(D)}\] \[H_A(D) = -\sum_v \frac{|D_v|}{|D|} \log_2 \frac{|D_v|}{|D|}\]def split_entropy(X, feature_idx, threshold=None):
"""计算分裂信息"""
if threshold is None:
values = np.unique(X[:, feature_idx])
probs = [np.sum(X[:, feature_idx] == v) / len(X) for v in values]
else:
left_ratio = np.sum(X[:, feature_idx] <= threshold) / len(X)
probs = [left_ratio, 1 - left_ratio]
return -np.sum([p * np.log2(p + 1e-10) for p in probs])
def gain_ratio(X, y, feature_idx, threshold=None):
"""计算信息增益率"""
gain = information_gain(X, y, feature_idx, threshold)
split_info = split_entropy(X, feature_idx, threshold)
return gain / (split_info + 1e-10)
基尼指数(CART)
\[\text{Gini}(D) = 1 - \sum_{k=1}^{K} p_k^2\]选择使基尼指数下降最多的分割:
\[\text{GiniGain}(D, A) = \text{Gini}(D) - \sum_v \frac{|D_v|}{|D|} \text{Gini}(D_v)\]def gini(y):
"""计算基尼指数"""
_, counts = np.unique(y, return_counts=True)
probs = counts / len(y)
return 1 - np.sum(probs ** 2)
def gini_impurity(X, y, feature_idx, threshold):
"""计算分割后的加权基尼指数"""
left_mask = X[:, feature_idx] <= threshold
right_mask = ~left_mask
n = len(y)
left_gini = gini(y[left_mask]) if np.sum(left_mask) > 0 else 0
right_gini = gini(y[right_mask]) if np.sum(right_mask) > 0 else 0
weighted_gini = (np.sum(left_mask) * left_gini +
np.sum(right_mask) * right_gini) / n
return weighted_gini
# 比较不同阈值
thresholds = np.linspace(X[:, 0].min(), X[:, 0].max(), 50)
gini_values = [gini_impurity(X, y, 0, t) for t in thresholds]
plt.figure(figsize=(10, 5))
plt.plot(thresholds, gini_values, 'b-', linewidth=2)
plt.xlabel('分割阈值(花瓣长度)')
plt.ylabel('加权基尼指数')
plt.title('不同分割点的基尼指数')
plt.axvline(x=thresholds[np.argmin(gini_values)], color='r', linestyle='--',
label=f'最优阈值={thresholds[np.argmin(gini_values)]:.2f}')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
从零实现决策树
class DecisionTreeNode:
def __init__(self, feature_idx=None, threshold=None, left=None, right=None, value=None):
self.feature_idx = feature_idx # 分割特征索引
self.threshold = threshold # 分割阈值
self.left = left # 左子树
self.right = right # 右子树
self.value = value # 叶节点的预测值
class DecisionTreeClassifierScratch:
def __init__(self, max_depth=None, min_samples_split=2, criterion='gini'):
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.criterion = criterion
self.root = None
self.n_classes = None
def _gini(self, y):
"""计算基尼指数"""
_, counts = np.unique(y, return_counts=True)
probs = counts / len(y)
return 1 - np.sum(probs ** 2)
def _entropy(self, y):
"""计算熵"""
_, counts = np.unique(y, return_counts=True)
probs = counts / len(y)
return -np.sum(probs * np.log2(probs + 1e-10))
def _impurity(self, y):
if self.criterion == 'gini':
return self._gini(y)
else:
return self._entropy(y)
def _find_best_split(self, X, y):
"""找到最佳分割点"""
best_gain = -1
best_feature = None
best_threshold = None
n_features = X.shape[1]
current_impurity = self._impurity(y)
for feature_idx in range(n_features):
thresholds = np.unique(X[:, feature_idx])
for threshold in thresholds:
left_mask = X[:, feature_idx] <= threshold
right_mask = ~left_mask
if np.sum(left_mask) == 0 or np.sum(right_mask) == 0:
continue
# 计算加权不纯度
n = len(y)
left_impurity = self._impurity(y[left_mask])
right_impurity = self._impurity(y[right_mask])
weighted_impurity = (np.sum(left_mask) * left_impurity +
np.sum(right_mask) * right_impurity) / n
gain = current_impurity - weighted_impurity
if gain > best_gain:
best_gain = gain
best_feature = feature_idx
best_threshold = threshold
return best_feature, best_threshold
def _build_tree(self, X, y, depth=0):
"""递归构建决策树"""
n_samples, n_features = X.shape
n_classes = len(np.unique(y))
# 终止条件
if (self.max_depth is not None and depth >= self.max_depth) or \
n_samples < self.min_samples_split or \
n_classes == 1:
# 返回叶节点
leaf_value = np.bincount(y).argmax()
return DecisionTreeNode(value=leaf_value)
# 找最佳分割
best_feature, best_threshold = self._find_best_split(X, y)
if best_feature is None:
leaf_value = np.bincount(y).argmax()
return DecisionTreeNode(value=leaf_value)
# 分割数据
left_mask = X[:, best_feature] <= best_threshold
right_mask = ~left_mask
# 递归构建子树
left_subtree = self._build_tree(X[left_mask], y[left_mask], depth + 1)
right_subtree = self._build_tree(X[right_mask], y[right_mask], depth + 1)
return DecisionTreeNode(
feature_idx=best_feature,
threshold=best_threshold,
left=left_subtree,
right=right_subtree
)
def fit(self, X, y):
self.n_classes = len(np.unique(y))
self.root = self._build_tree(np.array(X), np.array(y))
return self
def _predict_single(self, x, node):
"""预测单个样本"""
if node.value is not None:
return node.value
if x[node.feature_idx] <= node.threshold:
return self._predict_single(x, node.left)
else:
return self._predict_single(x, node.right)
def predict(self, X):
return np.array([self._predict_single(x, self.root) for x in X])
def score(self, X, y):
return np.mean(self.predict(X) == y)
# 测试
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
dt_scratch = DecisionTreeClassifierScratch(max_depth=3, criterion='gini')
dt_scratch.fit(X_train, y_train)
print(f"自实现 - 训练准确率: {dt_scratch.score(X_train, y_train):.4f}")
print(f"自实现 - 测试准确率: {dt_scratch.score(X_test, y_test):.4f}")
# 与sklearn对比
dt_sklearn = DecisionTreeClassifier(max_depth=3, criterion='gini', random_state=42)
dt_sklearn.fit(X_train, y_train)
print(f"sklearn - 测试准确率: {dt_sklearn.score(X_test, y_test):.4f}")
剪枝
为什么需要剪枝
完全生长的决策树容易过拟合,需要剪枝来提高泛化能力。
预剪枝
在树生成过程中提前停止:
-
max_depth:最大深度 -
min_samples_split:节点分裂所需最小样本数 -
min_samples_leaf:叶节点最小样本数 -
max_leaf_nodes:最大叶节点数
# 不同剪枝参数的影响
fig, axes = plt.subplots(2, 2, figsize=(14, 12))
params = [
{'max_depth': None},
{'max_depth': 3},
{'min_samples_split': 10},
{'min_samples_leaf': 5}
]
for ax, param in zip(axes.ravel(), params):
dt_temp = DecisionTreeClassifier(**param, random_state=42)
dt_temp.fit(X, y)
plot_decision_boundary(dt_temp, X, y, ax, str(param))
train_score = dt_temp.score(X, y)
ax.set_title(f"{param}\nTrain Acc: {train_score:.3f}")
plt.tight_layout()
plt.show()
后剪枝(代价复杂度剪枝)
\[R_\alpha(T) = R(T) + \alpha |T|\]- $R(T)$:树的训练误差
-
$ T $:叶节点数量 - $\alpha$:复杂度参数
# 代价复杂度剪枝路径
path = dt.cost_complexity_pruning_path(X_train, y_train)
ccp_alphas = path.ccp_alphas
impurities = path.impurities
# 不同alpha值的树
trees = []
for ccp_alpha in ccp_alphas:
dt_temp = DecisionTreeClassifier(ccp_alpha=ccp_alpha, random_state=42)
dt_temp.fit(X_train, y_train)
trees.append(dt_temp)
# 评估
train_scores = [tree.score(X_train, y_train) for tree in trees]
test_scores = [tree.score(X_test, y_test) for tree in trees]
n_leaves = [tree.get_n_leaves() for tree in trees]
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
axes[0].plot(ccp_alphas, impurities, 'b-o', markersize=3)
axes[0].set_xlabel('alpha')
axes[0].set_ylabel('不纯度')
axes[0].set_title('剪枝路径')
axes[1].plot(ccp_alphas, n_leaves, 'g-o', markersize=3)
axes[1].set_xlabel('alpha')
axes[1].set_ylabel('叶节点数')
axes[1].set_title('树大小')
axes[2].plot(ccp_alphas, train_scores, 'b-', label='训练集')
axes[2].plot(ccp_alphas, test_scores, 'r-', label='测试集')
axes[2].set_xlabel('alpha')
axes[2].set_ylabel('准确率')
axes[2].set_title('准确率 vs alpha')
axes[2].legend()
plt.tight_layout()
plt.show()
# 找最优alpha
best_idx = np.argmax(test_scores)
print(f"最优alpha: {ccp_alphas[best_idx]:.6f}")
print(f"最优测试准确率: {test_scores[best_idx]:.4f}")
print(f"叶节点数: {n_leaves[best_idx]}")
决策树回归
from sklearn.tree import DecisionTreeRegressor
# 生成回归数据
np.random.seed(42)
X_reg = np.sort(np.random.rand(200) * 10).reshape(-1, 1)
y_reg = np.sin(X_reg).ravel() + np.random.randn(200) * 0.3
# 不同深度的回归树
plt.figure(figsize=(15, 5))
X_plot = np.linspace(0, 10, 100).reshape(-1, 1)
for i, depth in enumerate([2, 5, None]):
plt.subplot(1, 3, i+1)
dt_reg = DecisionTreeRegressor(max_depth=depth, random_state=42)
dt_reg.fit(X_reg, y_reg)
y_pred = dt_reg.predict(X_plot)
plt.scatter(X_reg, y_reg, c='blue', alpha=0.3, label='数据')
plt.plot(X_plot, y_pred, 'r-', linewidth=2, label='预测')
plt.plot(X_plot, np.sin(X_plot), 'g--', linewidth=2, label='真实')
r2 = dt_reg.score(X_reg, y_reg)
plt.title(f'max_depth={depth}, R²={r2:.3f}')
plt.legend()
plt.tight_layout()
plt.show()
特征重要性
# 使用完整的鸢尾花数据集
X_full = iris.data
dt_full = DecisionTreeClassifier(max_depth=4, random_state=42)
dt_full.fit(X_full, y)
# 特征重要性
importances = dt_full.feature_importances_
plt.figure(figsize=(10, 6))
indices = np.argsort(importances)[::-1]
plt.bar(range(len(importances)), importances[indices], align='center')
plt.xticks(range(len(importances)), np.array(iris.feature_names)[indices], rotation=45)
plt.xlabel('特征')
plt.ylabel('重要性')
plt.title('决策树特征重要性')
plt.tight_layout()
plt.show()
for name, imp in zip(np.array(iris.feature_names)[indices], importances[indices]):
print(f"{name}: {imp:.4f}")
ID3, C4.5, CART对比
| 算法 | 特征选择 | 树类型 | 缺失值处理 | 剪枝 |
|---|---|---|---|---|
| ID3 | 信息增益 | 多叉树 | 不支持 | 无 |
| C4.5 | 信息增益率 | 多叉树 | 支持 | 后剪枝 |
| CART | 基尼指数 | 二叉树 | 支持 | 后剪枝 |
常见问题
Q1: 决策树的优缺点?
| 优点 | 缺点 |
|---|---|
| 可解释性强 | 容易过拟合 |
| 无需特征缩放 | 不稳定(数据小变化导致树大变化) |
| 处理非线性关系 | 难以拟合复杂边界 |
| 处理缺失值 | 偏向多值特征 |
Q2: 什么时候用基尼,什么时候用熵?
- 通常差别不大
- 基尼计算更快(无对数运算)
- 熵对类别分布更敏感
Q3: 如何处理连续特征?
- 将连续值排序
- 遍历相邻值中点作为候选分割点
- 选择最优分割点
Q4: 如何处理缺失值?
- C4.5:使用概率分配到各分支
- CART:代理分裂(surrogate split)
总结
| 概念 | 说明 |
|---|---|
| 核心思想 | 递归分割特征空间 |
| ID3 | 信息增益,多叉树 |
| C4.5 | 信息增益率,改进ID3 |
| CART | 基尼指数,二叉树 |
| 过拟合 | 使用剪枝控制 |
参考资料
- 《机器学习》周志华 第4章
- 《统计学习方法》李航 第5章
- scikit-learn 文档:Decision Trees
版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。
(采用 CC BY-NC-SA 4.0 许可协议进行授权)
本文标题:《 机器学习基础系列——决策树 》
本文链接:http://localhost:3015/ai/%E5%86%B3%E7%AD%96%E6%A0%91.html
本文最后一次更新为 天前,文章中的某些内容可能已过时!