已经是最新一篇文章了!
已经是最后一篇文章了!
K-Means、DBSCAN、层次聚类
前言
聚类是无监督学习的核心任务,目标是将相似的数据点分到同一组。与分类不同,聚类没有预定义的标签,需要算法自动发现数据结构。
聚类基础
什么是聚类
- 组内相似度高(紧凑性)
- 组间相似度低(分离性)
应用场景
| 领域 | 应用 |
|---|---|
| 市场营销 | 客户细分 |
| 图像处理 | 图像分割 |
| 文本分析 | 文档分类 |
| 异常检测 | 发现离群点 |
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_blobs, make_moons, make_circles
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.preprocessing import StandardScaler
np.random.seed(42)
# 生成不同类型的数据
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
datasets = [
make_blobs(n_samples=300, centers=3, random_state=42),
make_moons(n_samples=300, noise=0.1, random_state=42),
make_circles(n_samples=300, noise=0.05, factor=0.5, random_state=42)
]
titles = ['球形簇', '月牙形', '同心圆']
for ax, (X, y), title in zip(axes, datasets, titles):
ax.scatter(X[:, 0], X[:, 1], c=y, cmap='viridis')
ax.set_title(title)
ax.set_xlabel('Feature 1')
ax.set_ylabel('Feature 2')
plt.tight_layout()
plt.show()
K-Means
算法原理
- 随机初始化K个中心
- 将每个点分配到最近的中心
- 更新中心为簇内均值
- 重复2-3直到收敛
# 生成数据
X_blobs, y_blobs = make_blobs(n_samples=300, centers=4, random_state=42)
# K-Means聚类
kmeans = KMeans(n_clusters=4, random_state=42, n_init=10)
labels = kmeans.fit_predict(X_blobs)
# 可视化
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.scatter(X_blobs[:, 0], X_blobs[:, 1], c='gray', alpha=0.5)
plt.title('原始数据')
plt.subplot(1, 2, 2)
plt.scatter(X_blobs[:, 0], X_blobs[:, 1], c=labels, cmap='viridis')
plt.scatter(kmeans.cluster_centers_[:, 0], kmeans.cluster_centers_[:, 1],
c='red', marker='X', s=200, edgecolors='black', label='聚类中心')
plt.title(f'K-Means (K=4)\nInertia={kmeans.inertia_:.2f}')
plt.legend()
plt.tight_layout()
plt.show()
从零实现K-Means
class KMeansScratch:
def __init__(self, n_clusters=3, max_iters=100, tol=1e-4, random_state=None):
self.n_clusters = n_clusters
self.max_iters = max_iters
self.tol = tol
self.random_state = random_state
self.centers = None
self.labels = None
def fit(self, X):
np.random.seed(self.random_state)
n_samples = len(X)
# 随机初始化中心(从数据点中选择)
indices = np.random.choice(n_samples, self.n_clusters, replace=False)
self.centers = X[indices].copy()
for iteration in range(self.max_iters):
old_centers = self.centers.copy()
# 分配标签
self.labels = self._assign_labels(X)
# 更新中心
for k in range(self.n_clusters):
mask = self.labels == k
if np.sum(mask) > 0:
self.centers[k] = X[mask].mean(axis=0)
# 检查收敛
center_shift = np.sqrt(np.sum((self.centers - old_centers) ** 2))
if center_shift < self.tol:
break
return self
def _assign_labels(self, X):
"""将每个点分配到最近的中心"""
distances = np.zeros((len(X), self.n_clusters))
for k in range(self.n_clusters):
distances[:, k] = np.sqrt(np.sum((X - self.centers[k]) ** 2, axis=1))
return np.argmin(distances, axis=1)
def predict(self, X):
return self._assign_labels(X)
def fit_predict(self, X):
self.fit(X)
return self.labels
@property
def inertia_(self):
"""计算簇内平方和"""
inertia = 0
for k in range(self.n_clusters):
mask = self.labels == k
inertia += np.sum((X_blobs[mask] - self.centers[k]) ** 2)
return inertia
# 测试
kmeans_scratch = KMeansScratch(n_clusters=4, random_state=42)
labels_scratch = kmeans_scratch.fit_predict(X_blobs)
print(f"自实现K-Means惯性: {kmeans_scratch.inertia_:.2f}")
print(f"sklearn K-Means惯性: {kmeans.inertia_:.2f}")
K-Means迭代过程可视化
# 可视化迭代过程
np.random.seed(42)
X_demo, _ = make_blobs(n_samples=150, centers=3, random_state=42)
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# 初始化中心
indices = np.random.choice(len(X_demo), 3, replace=False)
centers = X_demo[indices].copy()
iterations = [0, 1, 2, 3, 5, 10]
current_centers = centers.copy()
for ax, it in zip(axes.ravel(), iterations):
for _ in range(it):
# 分配标签
distances = np.array([np.sqrt(np.sum((X_demo - c) ** 2, axis=1)) for c in current_centers]).T
labels = np.argmin(distances, axis=1)
# 更新中心
for k in range(3):
if np.sum(labels == k) > 0:
current_centers[k] = X_demo[labels == k].mean(axis=0)
distances = np.array([np.sqrt(np.sum((X_demo - c) ** 2, axis=1)) for c in current_centers]).T
labels = np.argmin(distances, axis=1)
ax.scatter(X_demo[:, 0], X_demo[:, 1], c=labels, cmap='viridis', alpha=0.6)
ax.scatter(current_centers[:, 0], current_centers[:, 1],
c='red', marker='X', s=200, edgecolors='black')
ax.set_title(f'迭代 {it}')
# 重置中心用于下一轮从头开始
if it == 0:
current_centers = centers.copy()
plt.tight_layout()
plt.show()
选择K值
肘部法则
# 肘部法则
inertias = []
K_range = range(1, 11)
for k in K_range:
kmeans_temp = KMeans(n_clusters=k, random_state=42, n_init=10)
kmeans_temp.fit(X_blobs)
inertias.append(kmeans_temp.inertia_)
plt.figure(figsize=(10, 5))
plt.plot(K_range, inertias, 'bo-')
plt.xlabel('K')
plt.ylabel('惯性 (Inertia)')
plt.title('肘部法则')
plt.axvline(x=4, color='r', linestyle='--', label='最优K=4')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
轮廓系数
from sklearn.metrics import silhouette_score, silhouette_samples
# 轮廓系数
silhouette_scores = []
for k in range(2, 11):
kmeans_temp = KMeans(n_clusters=k, random_state=42, n_init=10)
labels_temp = kmeans_temp.fit_predict(X_blobs)
score = silhouette_score(X_blobs, labels_temp)
silhouette_scores.append(score)
print(f"K={k}: 轮廓系数={score:.4f}")
plt.figure(figsize=(10, 5))
plt.plot(range(2, 11), silhouette_scores, 'go-')
plt.xlabel('K')
plt.ylabel('轮廓系数')
plt.title('轮廓系数法')
plt.grid(True, alpha=0.3)
plt.show()
K-Means++初始化
# K-Means++改进初始化
kmeans_plus = KMeans(n_clusters=4, init='k-means++', random_state=42, n_init=10)
kmeans_random = KMeans(n_clusters=4, init='random', random_state=42, n_init=1)
# 多次运行比较
plus_inertias = []
random_inertias = []
for i in range(20):
kmeans_plus = KMeans(n_clusters=4, init='k-means++', random_state=i, n_init=1)
kmeans_random = KMeans(n_clusters=4, init='random', random_state=i, n_init=1)
kmeans_plus.fit(X_blobs)
kmeans_random.fit(X_blobs)
plus_inertias.append(kmeans_plus.inertia_)
random_inertias.append(kmeans_random.inertia_)
print(f"K-Means++ 惯性: {np.mean(plus_inertias):.2f} ± {np.std(plus_inertias):.2f}")
print(f"Random 惯性: {np.mean(random_inertias):.2f} ± {np.std(random_inertias):.2f}")
DBSCAN
原理
Density-Based Spatial Clustering of Applications with Noise
- 核心点:eps邻域内至少有min_samples个点
- 边界点:在核心点的eps邻域内,但自己不是核心点
- 噪声点:既不是核心点也不是边界点
X_moons, _ = make_moons(n_samples=300, noise=0.1, random_state=42)
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# K-Means(失败案例)
kmeans_moons = KMeans(n_clusters=2, random_state=42, n_init=10)
labels_km = kmeans_moons.fit_predict(X_moons)
axes[0].scatter(X_moons[:, 0], X_moons[:, 1], c=labels_km, cmap='viridis')
axes[0].set_title('K-Means (效果差)')
# DBSCAN
dbscan = DBSCAN(eps=0.2, min_samples=5)
labels_db = dbscan.fit_predict(X_moons)
axes[1].scatter(X_moons[:, 0], X_moons[:, 1], c=labels_db, cmap='viridis')
axes[1].set_title(f'DBSCAN (eps=0.2)\n簇数={len(set(labels_db))-1}, 噪声={sum(labels_db==-1)}')
# 显示核心点
core_mask = np.zeros(len(X_moons), dtype=bool)
core_mask[dbscan.core_sample_indices_] = True
axes[2].scatter(X_moons[~core_mask, 0], X_moons[~core_mask, 1], c='gray', alpha=0.5, label='边界/噪声')
axes[2].scatter(X_moons[core_mask, 0], X_moons[core_mask, 1], c=labels_db[core_mask],
cmap='viridis', label='核心点')
axes[2].set_title('核心点可视化')
axes[2].legend()
plt.tight_layout()
plt.show()
eps和min_samples的影响
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
eps_values = [0.1, 0.2, 0.3]
min_samples_values = [3, 5, 10]
for i, eps in enumerate(eps_values):
for j, min_samples in enumerate(min_samples_values):
ax = axes[i, j] if i < 2 else axes[1, j]
dbscan_temp = DBSCAN(eps=eps, min_samples=min_samples)
labels_temp = dbscan_temp.fit_predict(X_moons)
n_clusters = len(set(labels_temp)) - (1 if -1 in labels_temp else 0)
n_noise = sum(labels_temp == -1)
ax.scatter(X_moons[:, 0], X_moons[:, 1], c=labels_temp, cmap='viridis')
ax.set_title(f'eps={eps}, min_samples={min_samples}\n簇数={n_clusters}, 噪声={n_noise}')
for i, eps in enumerate(eps_values):
axes[0, i].set_ylabel('min_samples=3')
plt.tight_layout()
plt.show()
层次聚类
凝聚层次聚类
自底向上:
- 每个点是一个簇
- 合并最近的两个簇
- 重复直到达到指定簇数
from scipy.cluster.hierarchy import dendrogram, linkage
# 生成数据
X_hier, _ = make_blobs(n_samples=50, centers=3, random_state=42)
# 计算层次结构
linkage_matrix = linkage(X_hier, method='ward')
# 绘制树状图
plt.figure(figsize=(15, 5))
plt.subplot(1, 2, 1)
dendrogram(linkage_matrix, truncate_mode='level', p=5)
plt.xlabel('样本')
plt.ylabel('距离')
plt.title('层次聚类树状图')
# 聚类结果
plt.subplot(1, 2, 2)
agg = AgglomerativeClustering(n_clusters=3, linkage='ward')
labels_agg = agg.fit_predict(X_hier)
plt.scatter(X_hier[:, 0], X_hier[:, 1], c=labels_agg, cmap='viridis')
plt.title('层次聚类结果 (K=3)')
plt.tight_layout()
plt.show()
不同链接方法
| 方法 | 距离计算 |
|---|---|
| single | 最近点距离 |
| complete | 最远点距离 |
| average | 平均距离 |
| ward | 最小化方差 |
fig, axes = plt.subplots(1, 4, figsize=(16, 4))
methods = ['single', 'complete', 'average', 'ward']
for ax, method in zip(axes, methods):
agg_temp = AgglomerativeClustering(n_clusters=3, linkage=method)
labels_temp = agg_temp.fit_predict(X_hier)
ax.scatter(X_hier[:, 0], X_hier[:, 1], c=labels_temp, cmap='viridis')
ax.set_title(f'{method} linkage')
plt.tight_layout()
plt.show()
聚类算法对比
from sklearn.cluster import SpectralClustering, MeanShift
# 准备不同类型的数据集
datasets = {
'blobs': make_blobs(n_samples=300, centers=3, random_state=42),
'moons': make_moons(n_samples=300, noise=0.1, random_state=42),
'circles': make_circles(n_samples=300, noise=0.05, factor=0.5, random_state=42)
}
algorithms = {
'K-Means': lambda: KMeans(n_clusters=3, random_state=42, n_init=10),
'DBSCAN': lambda: DBSCAN(eps=0.3, min_samples=5),
'Hierarchical': lambda: AgglomerativeClustering(n_clusters=3),
'Spectral': lambda: SpectralClustering(n_clusters=3, random_state=42)
}
fig, axes = plt.subplots(len(datasets), len(algorithms), figsize=(16, 12))
for i, (data_name, (X, _)) in enumerate(datasets.items()):
X_scaled = StandardScaler().fit_transform(X)
for j, (algo_name, algo_func) in enumerate(algorithms.items()):
ax = axes[i, j]
try:
algo = algo_func()
labels = algo.fit_predict(X_scaled)
ax.scatter(X_scaled[:, 0], X_scaled[:, 1], c=labels, cmap='viridis', s=10)
except:
ax.scatter(X_scaled[:, 0], X_scaled[:, 1], c='gray', s=10)
if i == 0:
ax.set_title(algo_name)
if j == 0:
ax.set_ylabel(data_name)
plt.tight_layout()
plt.show()
聚类评估
外部指标(有真实标签)
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score, v_measure_score
X_eval, y_true = make_blobs(n_samples=300, centers=3, random_state=42)
# 不同算法的聚类结果
results = {}
for name, algo in [
('K-Means', KMeans(n_clusters=3, random_state=42, n_init=10)),
('DBSCAN', DBSCAN(eps=0.5, min_samples=5)),
('Hierarchical', AgglomerativeClustering(n_clusters=3))
]:
labels_pred = algo.fit_predict(X_eval)
results[name] = {
'ARI': adjusted_rand_score(y_true, labels_pred),
'NMI': normalized_mutual_info_score(y_true, labels_pred),
'V-measure': v_measure_score(y_true, labels_pred)
}
import pandas as pd
df_metrics = pd.DataFrame(results).T
print(df_metrics)
内部指标(无真实标签)
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score
print("\n内部评估指标:")
for name, algo in [
('K-Means', KMeans(n_clusters=3, random_state=42, n_init=10)),
('Hierarchical', AgglomerativeClustering(n_clusters=3))
]:
labels_pred = algo.fit_predict(X_eval)
sil = silhouette_score(X_eval, labels_pred)
ch = calinski_harabasz_score(X_eval, labels_pred)
db = davies_bouldin_score(X_eval, labels_pred)
print(f"{name}: Silhouette={sil:.3f}, CH={ch:.1f}, DB={db:.3f}")
实战:图像压缩
from sklearn.datasets import load_sample_image
# 加载示例图像
try:
china = load_sample_image("china.jpg")
except:
# 如果没有示例图像,生成一个
china = np.random.randint(0, 256, (100, 100, 3), dtype=np.uint8)
# 原始图像
print(f"原始图像形状: {china.shape}")
# 将图像reshape为像素列表
X_img = china.reshape(-1, 3).astype(np.float64)
print(f"像素数量: {len(X_img)}")
# 不同K值的压缩
fig, axes = plt.subplots(1, 4, figsize=(16, 4))
K_values = [2, 8, 32, 128]
axes[0].imshow(china)
axes[0].set_title(f'原始 (16M色)')
axes[0].axis('off')
for ax, K in zip(axes[1:], K_values[1:]):
# K-Means聚类
kmeans_img = KMeans(n_clusters=K, random_state=42, n_init=3)
kmeans_img.fit(X_img)
# 用聚类中心替换像素
X_compressed = kmeans_img.cluster_centers_[kmeans_img.labels_]
compressed_img = X_compressed.reshape(china.shape).astype(np.uint8)
ax.imshow(compressed_img)
ax.set_title(f'K={K} ({K}色)')
ax.axis('off')
plt.tight_layout()
plt.show()
常见问题
Q1: K-Means的局限性?
- 需要预先指定K
- 假设簇是球形
- 对初始化敏感
- 对噪声和离群点敏感
Q2: DBSCAN如何选择eps?
- K距离图法
- 根据数据分布估计
- 领域知识
Q3: 何时用哪种算法?
| 场景 | 推荐算法 |
|---|---|
| 球形簇 | K-Means |
| 任意形状 | DBSCAN, 谱聚类 |
| 有噪声 | DBSCAN |
| 需要层次结构 | 层次聚类 |
| 大规模数据 | Mini-Batch K-Means |
Q4: 如何处理高维数据聚类?
- 先降维(PCA, t-SNE)
- 使用适合高维的算法
总结
| 算法 | 原理 | 优点 | 缺点 |
|---|---|---|---|
| K-Means | 最小化簇内方差 | 简单高效 | 需预设K |
| DBSCAN | 基于密度 | 发现任意形状 | 参数敏感 |
| 层次聚类 | 逐步合并/分裂 | 可视化好 | 计算量大 |
参考资料
- 《机器学习》周志华 第9章
- scikit-learn 文档:Clustering
- MacQueen, J. (1967). “Some methods for classification and analysis of multivariate observations”
版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。
(采用 CC BY-NC-SA 4.0 许可协议进行授权)
本文标题:《 机器学习基础系列——聚类算法 》
本文链接:http://localhost:3015/ai/%E8%81%9A%E7%B1%BB%E7%AE%97%E6%B3%95.html
本文最后一次更新为 天前,文章中的某些内容可能已过时!