预训练模型、微调与领域适应

前言

迁移学习允许我们利用在大规模数据上预训练的模型,通过微调快速适应新任务,大大减少了对标注数据和计算资源的需求。


为什么需要迁移学习

传统深度学习的挑战

import numpy as np
import matplotlib.pyplot as plt

np.random.seed(42)

# 可视化数据需求
def visualize_data_requirement():
    # 模拟不同方法的性能曲线
    data_sizes = np.array([100, 500, 1000, 5000, 10000, 50000, 100000])
    
    # 从头训练
    scratch_perf = 0.9 * (1 - np.exp(-data_sizes / 30000))
    
    # 迁移学习
    transfer_perf = 0.5 + 0.45 * (1 - np.exp(-data_sizes / 2000))
    
    fig, ax = plt.subplots(figsize=(10, 6))
    
    ax.plot(data_sizes, scratch_perf, 'b-o', label='从头训练', linewidth=2)
    ax.plot(data_sizes, transfer_perf, 'r-s', label='迁移学习', linewidth=2)
    
    ax.set_xlabel('训练样本数')
    ax.set_ylabel('模型性能')
    ax.set_title('迁移学习 vs 从头训练')
    ax.legend()
    ax.set_xscale('log')
    ax.grid(True, alpha=0.3)
    ax.axhline(y=0.9, color='gray', linestyle='--', alpha=0.5)
    
    plt.tight_layout()
    plt.show()

visualize_data_requirement()

迁移学习的优势

优势 说明
更少的数据 利用预训练知识
更快的训练 从好的起点开始
更好的性能 学习到通用特征

迁移学习类型

特征提取

冻结预训练模型,只训练新的分类头。

class FeatureExtractor:
    """特征提取方式的迁移学习"""
    
    def __init__(self, pretrained_features, num_classes):
        self.features = pretrained_features  # 冻结
        self.classifier = np.random.randn(pretrained_features.shape[-1], num_classes) * 0.01
    
    def forward(self, x):
        # 特征提取(不更新)
        features = self.extract_features(x)
        # 分类(可更新)
        return features @ self.classifier
    
    def extract_features(self, x):
        # 模拟预训练模型的特征提取
        return np.tanh(x @ self.features)

# 示例
pretrained = np.random.randn(100, 512) * 0.1  # 模拟预训练权重
model = FeatureExtractor(pretrained, num_classes=10)

x = np.random.randn(32, 100)
output = model.forward(x)
print(f"输入: {x.shape}")
print(f"输出: {output.shape}")

微调(Fine-tuning)

解冻部分或全部层,用较小学习率训练。

class FineTuning:
    """微调方式的迁移学习"""
    
    def __init__(self, pretrained_model, num_classes, freeze_layers=None):
        self.layers = pretrained_model.copy()
        self.freeze_layers = freeze_layers or []
        
        # 添加新的分类头
        self.classifier = np.random.randn(512, num_classes) * 0.01
    
    def forward(self, x):
        for i, layer in enumerate(self.layers):
            x = np.maximum(0, x @ layer)  # ReLU
        return x @ self.classifier
    
    def get_trainable_params(self):
        """获取可训练的参数"""
        trainable = []
        for i, layer in enumerate(self.layers):
            if i not in self.freeze_layers:
                trainable.append(layer)
        trainable.append(self.classifier)
        return trainable

# 微调策略示例
print("微调策略:")
print("1. 冻结所有层,只训练分类头")
print("2. 冻结浅层,微调深层")
print("3. 全部微调(使用更小学习率)")

微调策略可视化

def visualize_finetuning_strategies():
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    layers = ['Conv1', 'Conv2', 'Conv3', 'Conv4', 'Conv5', 'FC1', 'FC2']
    n_layers = len(layers)
    
    strategies = [
        ('特征提取', [0]*6 + [1]),  # 只训练最后一层
        ('部分微调', [0]*3 + [1]*4),  # 训练后半部分
        ('全部微调', [1]*7)  # 全部训练
    ]
    
    for ax, (name, trainable) in zip(axes, strategies):
        colors = ['coral' if t else 'lightblue' for t in trainable]
        bars = ax.barh(layers, [1]*n_layers, color=colors)
        ax.set_title(name)
        ax.set_xlim(0, 1.5)
        ax.set_xlabel('')
        
        # 图例
        if ax == axes[0]:
            ax.barh([], [], color='coral', label='可训练')
            ax.barh([], [], color='lightblue', label='冻结')
            ax.legend(loc='lower right')
    
    plt.tight_layout()
    plt.show()

visualize_finetuning_strategies()

计算机视觉中的迁移学习

使用预训练CNN

try:
    import torch
    import torch.nn as nn
    import torchvision.models as models
    
    # 加载预训练ResNet
    resnet = models.resnet50(weights='IMAGENET1K_V1')
    
    # 方法1: 特征提取
    for param in resnet.parameters():
        param.requires_grad = False
    
    # 替换最后的全连接层
    num_features = resnet.fc.in_features
    resnet.fc = nn.Linear(num_features, 10)  # 10类分类
    
    print("特征提取模式:")
    trainable = sum(p.numel() for p in resnet.parameters() if p.requires_grad)
    total = sum(p.numel() for p in resnet.parameters())
    print(f"  可训练参数: {trainable:,}")
    print(f"  总参数: {total:,}")
    print(f"  可训练比例: {trainable/total:.2%}")
    
    # 方法2: 微调最后几层
    resnet2 = models.resnet50(weights='IMAGENET1K_V1')
    
    # 冻结前面的层
    for name, param in resnet2.named_parameters():
        if 'layer4' not in name and 'fc' not in name:
            param.requires_grad = False
    
    resnet2.fc = nn.Linear(num_features, 10)
    
    print("\n微调模式 (layer4 + fc):")
    trainable2 = sum(p.numel() for p in resnet2.parameters() if p.requires_grad)
    print(f"  可训练参数: {trainable2:,}")
    print(f"  可训练比例: {trainable2/total:.2%}")
    
except ImportError:
    print("PyTorch或torchvision未安装")

数据增强

try:
    from torchvision import transforms
    
    # 训练时的数据增强
    train_transform = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                           std=[0.229, 0.224, 0.225])
    ])
    
    # 验证时的变换
    val_transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                           std=[0.229, 0.224, 0.225])
    ])
    
    print("数据增强配置已创建")
    
except ImportError:
    print("torchvision未安装")

NLP中的迁移学习

预训练语言模型

# 常见预训练模型
pretrained_models = {
    'Word2Vec': {
        '年份': 2013,
        '类型': '词向量',
        '任务': '词预测'
    },
    'GloVe': {
        '年份': 2014,
        '类型': '词向量',
        '任务': '共现矩阵分解'
    },
    'ELMo': {
        '年份': 2018,
        '类型': '上下文词向量',
        '任务': '语言建模'
    },
    'BERT': {
        '年份': 2018,
        '类型': 'Transformer Encoder',
        '任务': 'MLM + NSP'
    },
    'GPT-2/3': {
        '年份': '2019/2020',
        '类型': 'Transformer Decoder',
        '任务': '语言建模'
    }
}

import pandas as pd
df = pd.DataFrame(pretrained_models).T
print("预训练语言模型发展:")
print(df)

使用Hugging Face

try:
    from transformers import BertTokenizer, BertForSequenceClassification
    import torch
    
    # 加载预训练BERT
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    model = BertForSequenceClassification.from_pretrained(
        'bert-base-uncased',
        num_labels=2  # 二分类
    )
    
    # 冻结BERT层,只训练分类头
    for name, param in model.named_parameters():
        if 'classifier' not in name:
            param.requires_grad = False
    
    # 测试
    text = "This movie is great!"
    inputs = tokenizer(text, return_tensors='pt', padding=True, truncation=True)
    
    with torch.no_grad():
        outputs = model(**inputs)
    
    print("BERT微调示例:")
    print(f"  输入文本: {text}")
    print(f"  输出logits: {outputs.logits}")
    
    # 统计参数
    trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
    total = sum(p.numel() for p in model.parameters())
    print(f"\n  可训练参数: {trainable:,}")
    print(f"  总参数: {total:,}")
    
except ImportError:
    print("transformers库未安装")
    print("安装命令: pip install transformers")

微调技巧

学习率设置

def visualize_lr_strategies():
    """可视化不同学习率策略"""
    
    epochs = np.arange(50)
    
    # 固定学习率
    lr_fixed = np.ones(50) * 1e-4
    
    # 学习率衰减
    lr_decay = 1e-4 * (0.95 ** epochs)
    
    # 差分学习率(不同层不同学习率)
    base_lr = 1e-4
    lr_layers = {
        '分类头': base_lr,
        '深层': base_lr * 0.1,
        '浅层': base_lr * 0.01
    }
    
    # Warmup + 衰减
    warmup_epochs = 5
    lr_warmup = np.where(
        epochs < warmup_epochs,
        1e-4 * epochs / warmup_epochs,
        1e-4 * (0.95 ** (epochs - warmup_epochs))
    )
    
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    ax = axes[0]
    ax.plot(epochs, lr_fixed, label='固定学习率')
    ax.plot(epochs, lr_decay, label='学习率衰减')
    ax.plot(epochs, lr_warmup, label='Warmup + 衰减')
    ax.set_xlabel('Epoch')
    ax.set_ylabel('Learning Rate')
    ax.set_title('学习率调度策略')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    ax = axes[1]
    layers = list(lr_layers.keys())
    lrs = list(lr_layers.values())
    ax.barh(layers, lrs)
    ax.set_xlabel('Learning Rate')
    ax.set_title('差分学习率')
    ax.set_xscale('log')
    for i, lr in enumerate(lrs):
        ax.text(lr*1.2, i, f'{lr:.0e}', va='center')
    
    plt.tight_layout()
    plt.show()

visualize_lr_strategies()

渐进式解冻

class ProgressiveUnfreezing:
    """渐进式解冻"""
    
    def __init__(self, model_layers):
        self.layers = model_layers
        self.n_layers = len(model_layers)
    
    def unfreeze_schedule(self, epoch, unfreeze_every=2):
        """每隔几个epoch解冻一层"""
        layers_to_unfreeze = min(epoch // unfreeze_every + 1, self.n_layers)
        
        frozen = self.n_layers - layers_to_unfreeze
        unfrozen = layers_to_unfreeze
        
        return {
            'epoch': epoch,
            'frozen_layers': frozen,
            'unfrozen_layers': unfrozen,
            'trainable_layers': list(range(frozen, self.n_layers))
        }

# 示例
layers = ['embed', 'layer1', 'layer2', 'layer3', 'layer4', 'classifier']
progressive = ProgressiveUnfreezing(layers)

print("渐进式解冻计划:")
for epoch in [0, 2, 4, 6, 8, 10]:
    schedule = progressive.unfreeze_schedule(epoch)
    trainable = [layers[i] for i in schedule['trainable_layers']]
    print(f"  Epoch {epoch}: 可训练层 = {trainable}")

领域适应

Domain Shift问题

def visualize_domain_shift():
    """可视化领域偏移"""
    
    np.random.seed(42)
    
    # 源域数据
    source_data = np.random.randn(100, 2) + np.array([2, 2])
    
    # 目标域数据(有偏移)
    target_data = np.random.randn(100, 2) * 1.5 + np.array([5, 3])
    
    fig, ax = plt.subplots(figsize=(10, 8))
    
    ax.scatter(source_data[:, 0], source_data[:, 1], c='blue', label='源域', alpha=0.6)
    ax.scatter(target_data[:, 0], target_data[:, 1], c='red', label='目标域', alpha=0.6)
    
    # 标注中心
    ax.scatter(*source_data.mean(axis=0), c='blue', s=200, marker='*', edgecolors='black')
    ax.scatter(*target_data.mean(axis=0), c='red', s=200, marker='*', edgecolors='black')
    
    ax.set_xlabel('特征1')
    ax.set_ylabel('特征2')
    ax.set_title('领域偏移示意图')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    plt.show()

visualize_domain_shift()

领域适应方法

方法 描述
Fine-tuning 在目标域数据上微调
Feature Alignment 对齐源域和目标域特征分布
Domain Adversarial 使用判别器进行对抗训练
Self-training 用伪标签进行自训练

实战示例

图像分类微调

try:
    import torch
    import torch.nn as nn
    import torch.optim as optim
    from torchvision import models
    
    def create_transfer_model(num_classes, freeze_backbone=True):
        """创建迁移学习模型"""
        
        # 加载预训练模型
        model = models.resnet18(weights='IMAGENET1K_V1')
        
        # 冻结backbone
        if freeze_backbone:
            for param in model.parameters():
                param.requires_grad = False
        
        # 替换分类头
        num_features = model.fc.in_features
        model.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(num_features, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, num_classes)
        )
        
        return model
    
    def get_optimizer(model, base_lr=1e-3, backbone_lr=1e-5):
        """差分学习率优化器"""
        
        # 分离backbone和分类头参数
        backbone_params = []
        classifier_params = []
        
        for name, param in model.named_parameters():
            if 'fc' in name:
                classifier_params.append(param)
            else:
                backbone_params.append(param)
        
        optimizer = optim.Adam([
            {'params': backbone_params, 'lr': backbone_lr},
            {'params': classifier_params, 'lr': base_lr}
        ])
        
        return optimizer
    
    # 创建模型
    model = create_transfer_model(num_classes=5, freeze_backbone=False)
    optimizer = get_optimizer(model)
    
    print("迁移学习模型已创建")
    print(f"参数组数: {len(optimizer.param_groups)}")
    for i, group in enumerate(optimizer.param_groups):
        print(f"{i+1} 学习率: {group['lr']}")
    
except ImportError:
    print("PyTorch未安装")

常见问题

Q1: 何时使用特征提取 vs 微调?

情况 推荐方法
数据很少 特征提取
数据中等 冻结浅层,微调深层
数据很多 全部微调
任务相似 微调
任务差异大 特征提取或重新训练

Q2: 如何选择预训练模型?

  • 任务相关性
  • 模型规模 vs 计算资源
  • 预训练数据的领域

Q3: 微调时学习率如何设置?

通常比从头训练小1-2个数量级,如1e-4到1e-5。

Q4: 如何避免灾难性遗忘?

  • 使用较小学习率
  • 渐进式解冻
  • Elastic Weight Consolidation
  • 数据混合训练

总结

概念 描述
特征提取 冻结预训练模型,只训练分类头
微调 解冻部分/全部层,小学习率训练
领域适应 处理源域和目标域的分布差异
差分学习率 不同层使用不同学习率

参考资料

  • Yosinski, J. et al. (2014). “How transferable are features in deep neural networks?”
  • Howard, J. & Ruder, S. (2018). “Universal Language Model Fine-tuning for Text Classification”
  • Devlin, J. et al. (2018). “BERT: Pre-training of Deep Bidirectional Transformers”
  • Hugging Face Transformers文档

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:《 机器学习基础系列——迁移学习 》

本文链接:http://localhost:3015/ai/%E8%BF%81%E7%A7%BB%E5%AD%A6%E4%B9%A0.html

本文最后一次更新为 天前,文章中的某些内容可能已过时!