K-Means、DBSCAN、层次聚类

前言

聚类是无监督学习的核心任务,目标是将相似的数据点分到同一组。与分类不同,聚类没有预定义的标签,需要算法自动发现数据结构。


聚类基础

什么是聚类

  • 组内相似度高(紧凑性)
  • 组间相似度低(分离性)

应用场景

领域 应用
市场营销 客户细分
图像处理 图像分割
文本分析 文档分类
异常检测 发现离群点
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_blobs, make_moons, make_circles
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.preprocessing import StandardScaler

np.random.seed(42)

# 生成不同类型的数据
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

datasets = [
    make_blobs(n_samples=300, centers=3, random_state=42),
    make_moons(n_samples=300, noise=0.1, random_state=42),
    make_circles(n_samples=300, noise=0.05, factor=0.5, random_state=42)
]
titles = ['球形簇', '月牙形', '同心圆']

for ax, (X, y), title in zip(axes, datasets, titles):
    ax.scatter(X[:, 0], X[:, 1], c=y, cmap='viridis')
    ax.set_title(title)
    ax.set_xlabel('Feature 1')
    ax.set_ylabel('Feature 2')

plt.tight_layout()
plt.show()

K-Means

算法原理

  1. 随机初始化K个中心
  2. 将每个点分配到最近的中心
  3. 更新中心为簇内均值
  4. 重复2-3直到收敛
# 生成数据
X_blobs, y_blobs = make_blobs(n_samples=300, centers=4, random_state=42)

# K-Means聚类
kmeans = KMeans(n_clusters=4, random_state=42, n_init=10)
labels = kmeans.fit_predict(X_blobs)

# 可视化
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.scatter(X_blobs[:, 0], X_blobs[:, 1], c='gray', alpha=0.5)
plt.title('原始数据')

plt.subplot(1, 2, 2)
plt.scatter(X_blobs[:, 0], X_blobs[:, 1], c=labels, cmap='viridis')
plt.scatter(kmeans.cluster_centers_[:, 0], kmeans.cluster_centers_[:, 1],
            c='red', marker='X', s=200, edgecolors='black', label='聚类中心')
plt.title(f'K-Means (K=4)\nInertia={kmeans.inertia_:.2f}')
plt.legend()

plt.tight_layout()
plt.show()

从零实现K-Means

class KMeansScratch:
    def __init__(self, n_clusters=3, max_iters=100, tol=1e-4, random_state=None):
        self.n_clusters = n_clusters
        self.max_iters = max_iters
        self.tol = tol
        self.random_state = random_state
        self.centers = None
        self.labels = None
    
    def fit(self, X):
        np.random.seed(self.random_state)
        n_samples = len(X)
        
        # 随机初始化中心(从数据点中选择)
        indices = np.random.choice(n_samples, self.n_clusters, replace=False)
        self.centers = X[indices].copy()
        
        for iteration in range(self.max_iters):
            old_centers = self.centers.copy()
            
            # 分配标签
            self.labels = self._assign_labels(X)
            
            # 更新中心
            for k in range(self.n_clusters):
                mask = self.labels == k
                if np.sum(mask) > 0:
                    self.centers[k] = X[mask].mean(axis=0)
            
            # 检查收敛
            center_shift = np.sqrt(np.sum((self.centers - old_centers) ** 2))
            if center_shift < self.tol:
                break
        
        return self
    
    def _assign_labels(self, X):
        """将每个点分配到最近的中心"""
        distances = np.zeros((len(X), self.n_clusters))
        for k in range(self.n_clusters):
            distances[:, k] = np.sqrt(np.sum((X - self.centers[k]) ** 2, axis=1))
        return np.argmin(distances, axis=1)
    
    def predict(self, X):
        return self._assign_labels(X)
    
    def fit_predict(self, X):
        self.fit(X)
        return self.labels
    
    @property
    def inertia_(self):
        """计算簇内平方和"""
        inertia = 0
        for k in range(self.n_clusters):
            mask = self.labels == k
            inertia += np.sum((X_blobs[mask] - self.centers[k]) ** 2)
        return inertia

# 测试
kmeans_scratch = KMeansScratch(n_clusters=4, random_state=42)
labels_scratch = kmeans_scratch.fit_predict(X_blobs)

print(f"自实现K-Means惯性: {kmeans_scratch.inertia_:.2f}")
print(f"sklearn K-Means惯性: {kmeans.inertia_:.2f}")

K-Means迭代过程可视化

# 可视化迭代过程
np.random.seed(42)
X_demo, _ = make_blobs(n_samples=150, centers=3, random_state=42)

fig, axes = plt.subplots(2, 3, figsize=(15, 10))

# 初始化中心
indices = np.random.choice(len(X_demo), 3, replace=False)
centers = X_demo[indices].copy()

iterations = [0, 1, 2, 3, 5, 10]

current_centers = centers.copy()
for ax, it in zip(axes.ravel(), iterations):
    for _ in range(it):
        # 分配标签
        distances = np.array([np.sqrt(np.sum((X_demo - c) ** 2, axis=1)) for c in current_centers]).T
        labels = np.argmin(distances, axis=1)
        # 更新中心
        for k in range(3):
            if np.sum(labels == k) > 0:
                current_centers[k] = X_demo[labels == k].mean(axis=0)
    
    distances = np.array([np.sqrt(np.sum((X_demo - c) ** 2, axis=1)) for c in current_centers]).T
    labels = np.argmin(distances, axis=1)
    
    ax.scatter(X_demo[:, 0], X_demo[:, 1], c=labels, cmap='viridis', alpha=0.6)
    ax.scatter(current_centers[:, 0], current_centers[:, 1], 
               c='red', marker='X', s=200, edgecolors='black')
    ax.set_title(f'迭代 {it}')
    
    # 重置中心用于下一轮从头开始
    if it == 0:
        current_centers = centers.copy()

plt.tight_layout()
plt.show()

选择K值

肘部法则

# 肘部法则
inertias = []
K_range = range(1, 11)

for k in K_range:
    kmeans_temp = KMeans(n_clusters=k, random_state=42, n_init=10)
    kmeans_temp.fit(X_blobs)
    inertias.append(kmeans_temp.inertia_)

plt.figure(figsize=(10, 5))
plt.plot(K_range, inertias, 'bo-')
plt.xlabel('K')
plt.ylabel('惯性 (Inertia)')
plt.title('肘部法则')
plt.axvline(x=4, color='r', linestyle='--', label='最优K=4')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

轮廓系数

from sklearn.metrics import silhouette_score, silhouette_samples

# 轮廓系数
silhouette_scores = []

for k in range(2, 11):
    kmeans_temp = KMeans(n_clusters=k, random_state=42, n_init=10)
    labels_temp = kmeans_temp.fit_predict(X_blobs)
    score = silhouette_score(X_blobs, labels_temp)
    silhouette_scores.append(score)
    print(f"K={k}: 轮廓系数={score:.4f}")

plt.figure(figsize=(10, 5))
plt.plot(range(2, 11), silhouette_scores, 'go-')
plt.xlabel('K')
plt.ylabel('轮廓系数')
plt.title('轮廓系数法')
plt.grid(True, alpha=0.3)
plt.show()

K-Means++初始化

# K-Means++改进初始化
kmeans_plus = KMeans(n_clusters=4, init='k-means++', random_state=42, n_init=10)
kmeans_random = KMeans(n_clusters=4, init='random', random_state=42, n_init=1)

# 多次运行比较
plus_inertias = []
random_inertias = []

for i in range(20):
    kmeans_plus = KMeans(n_clusters=4, init='k-means++', random_state=i, n_init=1)
    kmeans_random = KMeans(n_clusters=4, init='random', random_state=i, n_init=1)
    
    kmeans_plus.fit(X_blobs)
    kmeans_random.fit(X_blobs)
    
    plus_inertias.append(kmeans_plus.inertia_)
    random_inertias.append(kmeans_random.inertia_)

print(f"K-Means++ 惯性: {np.mean(plus_inertias):.2f} ± {np.std(plus_inertias):.2f}")
print(f"Random 惯性: {np.mean(random_inertias):.2f} ± {np.std(random_inertias):.2f}")

DBSCAN

原理

Density-Based Spatial Clustering of Applications with Noise

  • 核心点:eps邻域内至少有min_samples个点
  • 边界点:在核心点的eps邻域内,但自己不是核心点
  • 噪声点:既不是核心点也不是边界点
X_moons, _ = make_moons(n_samples=300, noise=0.1, random_state=42)

fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# K-Means(失败案例)
kmeans_moons = KMeans(n_clusters=2, random_state=42, n_init=10)
labels_km = kmeans_moons.fit_predict(X_moons)

axes[0].scatter(X_moons[:, 0], X_moons[:, 1], c=labels_km, cmap='viridis')
axes[0].set_title('K-Means (效果差)')

# DBSCAN
dbscan = DBSCAN(eps=0.2, min_samples=5)
labels_db = dbscan.fit_predict(X_moons)

axes[1].scatter(X_moons[:, 0], X_moons[:, 1], c=labels_db, cmap='viridis')
axes[1].set_title(f'DBSCAN (eps=0.2)\n簇数={len(set(labels_db))-1}, 噪声={sum(labels_db==-1)}')

# 显示核心点
core_mask = np.zeros(len(X_moons), dtype=bool)
core_mask[dbscan.core_sample_indices_] = True

axes[2].scatter(X_moons[~core_mask, 0], X_moons[~core_mask, 1], c='gray', alpha=0.5, label='边界/噪声')
axes[2].scatter(X_moons[core_mask, 0], X_moons[core_mask, 1], c=labels_db[core_mask], 
                cmap='viridis', label='核心点')
axes[2].set_title('核心点可视化')
axes[2].legend()

plt.tight_layout()
plt.show()

eps和min_samples的影响

fig, axes = plt.subplots(2, 3, figsize=(15, 10))

eps_values = [0.1, 0.2, 0.3]
min_samples_values = [3, 5, 10]

for i, eps in enumerate(eps_values):
    for j, min_samples in enumerate(min_samples_values):
        ax = axes[i, j] if i < 2 else axes[1, j]
        
        dbscan_temp = DBSCAN(eps=eps, min_samples=min_samples)
        labels_temp = dbscan_temp.fit_predict(X_moons)
        
        n_clusters = len(set(labels_temp)) - (1 if -1 in labels_temp else 0)
        n_noise = sum(labels_temp == -1)
        
        ax.scatter(X_moons[:, 0], X_moons[:, 1], c=labels_temp, cmap='viridis')
        ax.set_title(f'eps={eps}, min_samples={min_samples}\n簇数={n_clusters}, 噪声={n_noise}')

for i, eps in enumerate(eps_values):
    axes[0, i].set_ylabel('min_samples=3')
    
plt.tight_layout()
plt.show()

层次聚类

凝聚层次聚类

自底向上:

  1. 每个点是一个簇
  2. 合并最近的两个簇
  3. 重复直到达到指定簇数
from scipy.cluster.hierarchy import dendrogram, linkage

# 生成数据
X_hier, _ = make_blobs(n_samples=50, centers=3, random_state=42)

# 计算层次结构
linkage_matrix = linkage(X_hier, method='ward')

# 绘制树状图
plt.figure(figsize=(15, 5))

plt.subplot(1, 2, 1)
dendrogram(linkage_matrix, truncate_mode='level', p=5)
plt.xlabel('样本')
plt.ylabel('距离')
plt.title('层次聚类树状图')

# 聚类结果
plt.subplot(1, 2, 2)
agg = AgglomerativeClustering(n_clusters=3, linkage='ward')
labels_agg = agg.fit_predict(X_hier)
plt.scatter(X_hier[:, 0], X_hier[:, 1], c=labels_agg, cmap='viridis')
plt.title('层次聚类结果 (K=3)')

plt.tight_layout()
plt.show()

不同链接方法

方法 距离计算
single 最近点距离
complete 最远点距离
average 平均距离
ward 最小化方差
fig, axes = plt.subplots(1, 4, figsize=(16, 4))

methods = ['single', 'complete', 'average', 'ward']

for ax, method in zip(axes, methods):
    agg_temp = AgglomerativeClustering(n_clusters=3, linkage=method)
    labels_temp = agg_temp.fit_predict(X_hier)
    ax.scatter(X_hier[:, 0], X_hier[:, 1], c=labels_temp, cmap='viridis')
    ax.set_title(f'{method} linkage')

plt.tight_layout()
plt.show()

聚类算法对比

from sklearn.cluster import SpectralClustering, MeanShift

# 准备不同类型的数据集
datasets = {
    'blobs': make_blobs(n_samples=300, centers=3, random_state=42),
    'moons': make_moons(n_samples=300, noise=0.1, random_state=42),
    'circles': make_circles(n_samples=300, noise=0.05, factor=0.5, random_state=42)
}

algorithms = {
    'K-Means': lambda: KMeans(n_clusters=3, random_state=42, n_init=10),
    'DBSCAN': lambda: DBSCAN(eps=0.3, min_samples=5),
    'Hierarchical': lambda: AgglomerativeClustering(n_clusters=3),
    'Spectral': lambda: SpectralClustering(n_clusters=3, random_state=42)
}

fig, axes = plt.subplots(len(datasets), len(algorithms), figsize=(16, 12))

for i, (data_name, (X, _)) in enumerate(datasets.items()):
    X_scaled = StandardScaler().fit_transform(X)
    
    for j, (algo_name, algo_func) in enumerate(algorithms.items()):
        ax = axes[i, j]
        
        try:
            algo = algo_func()
            labels = algo.fit_predict(X_scaled)
            ax.scatter(X_scaled[:, 0], X_scaled[:, 1], c=labels, cmap='viridis', s=10)
        except:
            ax.scatter(X_scaled[:, 0], X_scaled[:, 1], c='gray', s=10)
        
        if i == 0:
            ax.set_title(algo_name)
        if j == 0:
            ax.set_ylabel(data_name)

plt.tight_layout()
plt.show()

聚类评估

外部指标(有真实标签)

from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score, v_measure_score

X_eval, y_true = make_blobs(n_samples=300, centers=3, random_state=42)

# 不同算法的聚类结果
results = {}
for name, algo in [
    ('K-Means', KMeans(n_clusters=3, random_state=42, n_init=10)),
    ('DBSCAN', DBSCAN(eps=0.5, min_samples=5)),
    ('Hierarchical', AgglomerativeClustering(n_clusters=3))
]:
    labels_pred = algo.fit_predict(X_eval)
    results[name] = {
        'ARI': adjusted_rand_score(y_true, labels_pred),
        'NMI': normalized_mutual_info_score(y_true, labels_pred),
        'V-measure': v_measure_score(y_true, labels_pred)
    }

import pandas as pd
df_metrics = pd.DataFrame(results).T
print(df_metrics)

内部指标(无真实标签)

from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score

print("\n内部评估指标:")
for name, algo in [
    ('K-Means', KMeans(n_clusters=3, random_state=42, n_init=10)),
    ('Hierarchical', AgglomerativeClustering(n_clusters=3))
]:
    labels_pred = algo.fit_predict(X_eval)
    
    sil = silhouette_score(X_eval, labels_pred)
    ch = calinski_harabasz_score(X_eval, labels_pred)
    db = davies_bouldin_score(X_eval, labels_pred)
    
    print(f"{name}: Silhouette={sil:.3f}, CH={ch:.1f}, DB={db:.3f}")

实战:图像压缩

from sklearn.datasets import load_sample_image

# 加载示例图像
try:
    china = load_sample_image("china.jpg")
except:
    # 如果没有示例图像,生成一个
    china = np.random.randint(0, 256, (100, 100, 3), dtype=np.uint8)

# 原始图像
print(f"原始图像形状: {china.shape}")

# 将图像reshape为像素列表
X_img = china.reshape(-1, 3).astype(np.float64)
print(f"像素数量: {len(X_img)}")

# 不同K值的压缩
fig, axes = plt.subplots(1, 4, figsize=(16, 4))

K_values = [2, 8, 32, 128]

axes[0].imshow(china)
axes[0].set_title(f'原始 (16M色)')
axes[0].axis('off')

for ax, K in zip(axes[1:], K_values[1:]):
    # K-Means聚类
    kmeans_img = KMeans(n_clusters=K, random_state=42, n_init=3)
    kmeans_img.fit(X_img)
    
    # 用聚类中心替换像素
    X_compressed = kmeans_img.cluster_centers_[kmeans_img.labels_]
    compressed_img = X_compressed.reshape(china.shape).astype(np.uint8)
    
    ax.imshow(compressed_img)
    ax.set_title(f'K={K} ({K}色)')
    ax.axis('off')

plt.tight_layout()
plt.show()

常见问题

Q1: K-Means的局限性?

  • 需要预先指定K
  • 假设簇是球形
  • 对初始化敏感
  • 对噪声和离群点敏感

Q2: DBSCAN如何选择eps?

  • K距离图法
  • 根据数据分布估计
  • 领域知识

Q3: 何时用哪种算法?

场景 推荐算法
球形簇 K-Means
任意形状 DBSCAN, 谱聚类
有噪声 DBSCAN
需要层次结构 层次聚类
大规模数据 Mini-Batch K-Means

Q4: 如何处理高维数据聚类?

  • 先降维(PCA, t-SNE)
  • 使用适合高维的算法

总结

算法 原理 优点 缺点
K-Means 最小化簇内方差 简单高效 需预设K
DBSCAN 基于密度 发现任意形状 参数敏感
层次聚类 逐步合并/分裂 可视化好 计算量大

参考资料

  • 《机器学习》周志华 第9章
  • scikit-learn 文档:Clustering
  • MacQueen, J. (1967). “Some methods for classification and analysis of multivariate observations”

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:《 机器学习基础系列——聚类算法 》

本文链接:http://localhost:3015/ai/%E8%81%9A%E7%B1%BB%E7%AE%97%E6%B3%95.html

本文最后一次更新为 天前,文章中的某些内容可能已过时!