从数据到部署的完整机器学习流程

前言

本文将通过一个完整的实战项目,综合运用机器学习基础系列中学习的所有知识,涵盖从数据处理到模型部署的全流程。


项目概述

项目目标

构建一个房价预测系统,包括:

  • 数据探索与预处理
  • 特征工程
  • 模型选择与训练
  • 超参数调优
  • 模型评估与解释
  • 模型保存与部署
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score

np.random.seed(42)

# 生成模拟房价数据
def generate_housing_data(n_samples=1000):
    """生成模拟房价数据集"""
    
    data = {
        'area': np.random.uniform(50, 300, n_samples),  # 面积
        'rooms': np.random.randint(1, 6, n_samples),     # 房间数
        'age': np.random.uniform(0, 50, n_samples),      # 房龄
        'distance_center': np.random.uniform(1, 30, n_samples),  # 距市中心距离
        'floor': np.random.randint(1, 30, n_samples),    # 楼层
        'has_elevator': np.random.choice([0, 1], n_samples),  # 是否有电梯
        'has_parking': np.random.choice([0, 1], n_samples),   # 是否有停车位
        'school_rating': np.random.uniform(1, 10, n_samples)  # 学区评分
    }
    
    # 生成价格(带有一定的噪声)
    price = (
        data['area'] * 100 +
        data['rooms'] * 5000 +
        (50 - data['age']) * 200 +
        (30 - data['distance_center']) * 1000 +
        data['has_elevator'] * 3000 +
        data['has_parking'] * 5000 +
        data['school_rating'] * 2000 +
        np.random.normal(0, 5000, n_samples)
    )
    
    data['price'] = price
    
    return pd.DataFrame(data)

# 生成数据
df = generate_housing_data(1000)
print("数据集形状:", df.shape)
print("\n数据预览:")
print(df.head())
print("\n数据统计:")
print(df.describe())

数据探索分析

数据分布可视化

def explore_data(df):
    """探索性数据分析"""
    
    fig, axes = plt.subplots(2, 4, figsize=(16, 8))
    axes = axes.flatten()
    
    for i, col in enumerate(df.columns[:-1]):  # 排除价格
        ax = axes[i]
        ax.hist(df[col], bins=30, edgecolor='black', alpha=0.7)
        ax.set_title(f'{col}分布')
        ax.set_xlabel(col)
        ax.set_ylabel('频数')
    
    plt.tight_layout()
    plt.show()
    
    # 目标变量分布
    fig, ax = plt.subplots(figsize=(10, 6))
    ax.hist(df['price'], bins=50, edgecolor='black', alpha=0.7, color='coral')
    ax.set_title('房价分布')
    ax.set_xlabel('价格')
    ax.set_ylabel('频数')
    plt.show()

explore_data(df)

相关性分析

def correlation_analysis(df):
    """相关性分析"""
    
    corr = df.corr()
    
    fig, ax = plt.subplots(figsize=(10, 8))
    im = ax.imshow(corr, cmap='coolwarm', aspect='auto')
    
    ax.set_xticks(range(len(corr.columns)))
    ax.set_yticks(range(len(corr.columns)))
    ax.set_xticklabels(corr.columns, rotation=45, ha='right')
    ax.set_yticklabels(corr.columns)
    
    # 添加相关系数数值
    for i in range(len(corr)):
        for j in range(len(corr)):
            ax.text(j, i, f'{corr.iloc[i, j]:.2f}', ha='center', va='center',
                   color='white' if abs(corr.iloc[i, j]) > 0.5 else 'black')
    
    plt.colorbar(im)
    plt.title('特征相关性热力图')
    plt.tight_layout()
    plt.show()
    
    # 与价格的相关性
    price_corr = corr['price'].drop('price').sort_values(ascending=False)
    print("与价格的相关性:")
    print(price_corr)

correlation_analysis(df)

特征工程

特征创建与转换

def feature_engineering(df):
    """特征工程"""
    
    df_new = df.copy()
    
    # 创建新特征
    df_new['area_per_room'] = df_new['area'] / df_new['rooms']  # 每房间面积
    df_new['price_factor'] = df_new['area'] * (50 - df_new['age'])  # 价格因子
    df_new['convenience_score'] = (
        df_new['has_elevator'] + 
        df_new['has_parking'] + 
        (10 - df_new['distance_center']) / 10
    )  # 便利性得分
    
    # 分箱特征
    df_new['age_group'] = pd.cut(df_new['age'], bins=[0, 10, 20, 30, 50], 
                                  labels=['新房', '较新', '一般', '老旧'])
    
    # One-hot编码
    df_new = pd.get_dummies(df_new, columns=['age_group'], prefix='age')
    
    print("新增特征:")
    print(df_new[['area_per_room', 'price_factor', 'convenience_score']].head())
    
    return df_new

df_featured = feature_engineering(df)
print("\n特征工程后数据形状:", df_featured.shape)

数据预处理

数据分割与标准化

def preprocess_data(df):
    """数据预处理"""
    
    # 分离特征和目标
    X = df.drop('price', axis=1)
    y = df['price']
    
    # 分割数据
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    # 标准化
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    print(f"训练集: {X_train_scaled.shape}")
    print(f"测试集: {X_test_scaled.shape}")
    
    return X_train_scaled, X_test_scaled, y_train.values, y_test.values, scaler, X.columns

X_train, X_test, y_train, y_test, scaler, feature_names = preprocess_data(df_featured)

模型选择与训练

基准模型对比

from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.svm import SVR
from sklearn.neighbors import KNeighborsRegressor

def train_and_evaluate_models(X_train, X_test, y_train, y_test):
    """训练和评估多个模型"""
    
    models = {
        'Linear Regression': LinearRegression(),
        'Ridge': Ridge(alpha=1.0),
        'Lasso': Lasso(alpha=1.0),
        'ElasticNet': ElasticNet(alpha=1.0, l1_ratio=0.5),
        'Decision Tree': DecisionTreeRegressor(max_depth=10, random_state=42),
        'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
        'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=42),
        'KNN': KNeighborsRegressor(n_neighbors=5)
    }
    
    results = []
    
    for name, model in models.items():
        # 训练
        model.fit(X_train, y_train)
        
        # 预测
        y_pred = model.predict(X_test)
        
        # 评估
        mse = mean_squared_error(y_test, y_pred)
        rmse = np.sqrt(mse)
        r2 = r2_score(y_test, y_pred)
        
        results.append({
            'Model': name,
            'RMSE': rmse,
            'R2': r2
        })
        
        print(f"{name}:")
        print(f"  RMSE: {rmse:.2f}")
        print(f"  R2: {r2:.4f}")
        print()
    
    return pd.DataFrame(results), models

results_df, trained_models = train_and_evaluate_models(X_train, X_test, y_train, y_test)

模型对比可视化

def visualize_model_comparison(results_df):
    """可视化模型对比"""
    
    fig, axes = plt.subplots(1, 2, figsize=(14, 6))
    
    # RMSE对比
    ax = axes[0]
    bars = ax.barh(results_df['Model'], results_df['RMSE'], color='steelblue')
    ax.set_xlabel('RMSE')
    ax.set_title('模型RMSE对比 (越低越好)')
    ax.invert_yaxis()
    
    # R2对比
    ax = axes[1]
    bars = ax.barh(results_df['Model'], results_df['R2'], color='coral')
    ax.set_xlabel('R2 Score')
    ax.set_title('模型R2对比 (越高越好)')
    ax.invert_yaxis()
    ax.set_xlim(0, 1)
    
    plt.tight_layout()
    plt.show()

visualize_model_comparison(results_df)

超参数调优

使用交叉验证网格搜索

from sklearn.model_selection import GridSearchCV, cross_val_score

def hyperparameter_tuning(X_train, y_train):
    """超参数调优"""
    
    # 选择表现最好的模型进行调优(以Random Forest为例)
    param_grid = {
        'n_estimators': [50, 100, 200],
        'max_depth': [5, 10, 15, None],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    }
    
    rf = RandomForestRegressor(random_state=42)
    
    grid_search = GridSearchCV(
        rf, param_grid, cv=5,
        scoring='neg_root_mean_squared_error',
        n_jobs=-1, verbose=1
    )
    
    grid_search.fit(X_train, y_train)
    
    print("\n最佳参数:")
    print(grid_search.best_params_)
    print(f"\n最佳交叉验证RMSE: {-grid_search.best_score_:.2f}")
    
    return grid_search.best_estimator_

best_model = hyperparameter_tuning(X_train, y_train)

模型评估

详细评估最佳模型

def detailed_evaluation(model, X_test, y_test):
    """详细评估模型"""
    
    y_pred = model.predict(X_test)
    
    # 计算各项指标
    mse = mean_squared_error(y_test, y_pred)
    rmse = np.sqrt(mse)
    mae = np.mean(np.abs(y_test - y_pred))
    r2 = r2_score(y_test, y_pred)
    mape = np.mean(np.abs((y_test - y_pred) / y_test)) * 100
    
    print("模型评估结果:")
    print(f"  MSE:  {mse:.2f}")
    print(f"  RMSE: {rmse:.2f}")
    print(f"  MAE:  {mae:.2f}")
    print(f"  R2:   {r2:.4f}")
    print(f"  MAPE: {mape:.2f}%")
    
    # 可视化
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    # 预测vs实际
    ax = axes[0]
    ax.scatter(y_test, y_pred, alpha=0.5)
    ax.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
    ax.set_xlabel('实际值')
    ax.set_ylabel('预测值')
    ax.set_title('预测值 vs 实际值')
    
    # 残差分布
    ax = axes[1]
    residuals = y_test - y_pred
    ax.hist(residuals, bins=30, edgecolor='black', alpha=0.7)
    ax.axvline(x=0, color='r', linestyle='--')
    ax.set_xlabel('残差')
    ax.set_ylabel('频数')
    ax.set_title('残差分布')
    
    # 残差vs预测值
    ax = axes[2]
    ax.scatter(y_pred, residuals, alpha=0.5)
    ax.axhline(y=0, color='r', linestyle='--')
    ax.set_xlabel('预测值')
    ax.set_ylabel('残差')
    ax.set_title('残差 vs 预测值')
    
    plt.tight_layout()
    plt.show()
    
    return {'mse': mse, 'rmse': rmse, 'mae': mae, 'r2': r2, 'mape': mape}

metrics = detailed_evaluation(best_model, X_test, y_test)

特征重要性分析

可视化特征重要性

def feature_importance_analysis(model, feature_names):
    """特征重要性分析"""
    
    importance = model.feature_importances_
    
    # 排序
    indices = np.argsort(importance)[::-1]
    
    fig, ax = plt.subplots(figsize=(12, 8))
    
    # 只显示前15个特征
    top_n = min(15, len(feature_names))
    ax.barh(range(top_n), importance[indices[:top_n]], color='steelblue')
    ax.set_yticks(range(top_n))
    ax.set_yticklabels([feature_names[i] for i in indices[:top_n]])
    ax.set_xlabel('重要性')
    ax.set_title('特征重要性排名')
    ax.invert_yaxis()
    
    plt.tight_layout()
    plt.show()
    
    print("\n特征重要性排名:")
    for i in range(top_n):
        print(f"  {feature_names[indices[i]]}: {importance[indices[i]]:.4f}")

feature_importance_analysis(best_model, feature_names)

模型保存与加载

使用joblib保存模型

import joblib
import os

def save_model(model, scaler, feature_names, save_dir='models'):
    """保存模型和预处理器"""
    
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    
    # 保存模型
    joblib.dump(model, os.path.join(save_dir, 'best_model.pkl'))
    
    # 保存scaler
    joblib.dump(scaler, os.path.join(save_dir, 'scaler.pkl'))
    
    # 保存特征名
    joblib.dump(feature_names.tolist(), os.path.join(save_dir, 'feature_names.pkl'))
    
    print(f"模型已保存到 {save_dir}/")

def load_model(save_dir='models'):
    """加载模型和预处理器"""
    
    model = joblib.load(os.path.join(save_dir, 'best_model.pkl'))
    scaler = joblib.load(os.path.join(save_dir, 'scaler.pkl'))
    feature_names = joblib.load(os.path.join(save_dir, 'feature_names.pkl'))
    
    return model, scaler, feature_names

# 保存模型
save_model(best_model, scaler, feature_names)

模型部署预测

创建预测函数

class HousePricePredictor:
    """房价预测器"""
    
    def __init__(self, model_dir='models'):
        self.model, self.scaler, self.feature_names = load_model(model_dir)
    
    def predict(self, features_dict):
        """
        预测房价
        
        参数:
            features_dict: 特征字典
        返回:
            预测价格
        """
        
        # 创建特征数组
        features = []
        for name in self.feature_names:
            if name in features_dict:
                features.append(features_dict[name])
            else:
                features.append(0)  # 默认值
        
        features = np.array(features).reshape(1, -1)
        
        # 标准化
        features_scaled = self.scaler.transform(features)
        
        # 预测
        price = self.model.predict(features_scaled)[0]
        
        return price
    
    def predict_batch(self, df):
        """批量预测"""
        
        # 确保特征顺序正确
        X = df[self.feature_names].values
        X_scaled = self.scaler.transform(X)
        
        return self.model.predict(X_scaled)

# 使用示例
predictor = HousePricePredictor()

# 单个预测
sample_house = {
    'area': 120,
    'rooms': 3,
    'age': 5,
    'distance_center': 10,
    'floor': 15,
    'has_elevator': 1,
    'has_parking': 1,
    'school_rating': 8
}

# 注意:需要添加特征工程生成的特征
sample_house['area_per_room'] = sample_house['area'] / sample_house['rooms']
sample_house['price_factor'] = sample_house['area'] * (50 - sample_house['age'])
sample_house['convenience_score'] = (
    sample_house['has_elevator'] + 
    sample_house['has_parking'] + 
    (10 - sample_house['distance_center']) / 10
)
# 添加one-hot编码特征
for col in ['age_新房', 'age_较新', 'age_一般', 'age_老旧']:
    if col in predictor.feature_names:
        if col == 'age_新房' and sample_house['age'] <= 10:
            sample_house[col] = 1
        else:
            sample_house[col] = 0

predicted_price = predictor.predict(sample_house)
print(f"\n预测房价: {predicted_price:.2f}")

项目总结

完整流程回顾

阶段 内容 要点
数据探索 EDA、可视化 了解数据分布和特征
特征工程 创建新特征 领域知识很重要
数据预处理 分割、标准化 防止数据泄露
模型选择 多模型对比 从简单到复杂
超参数调优 GridSearch 使用交叉验证
模型评估 多指标评估 关注业务指标
模型部署 保存与加载 考虑线上环境

常见问题

Q1: 特征工程如何做?

  • 领域知识驱动
  • 统计分析发现
  • 自动化特征生成

Q2: 如何选择评估指标?

根据业务需求:

  • 回归:RMSE、MAE、R2
  • 分类:准确率、AUC、F1

Q3: 模型过拟合怎么办?

  • 增加正则化
  • 减少模型复杂度
  • 增加训练数据
  • 使用集成方法

Q4: 线上部署需要注意什么?

  • 特征一致性
  • 模型版本管理
  • 监控预测质量
  • 处理异常输入

总结

概念 描述
端到端流程 从数据到部署的完整流程
特征工程 创造有意义的特征
模型选择 对比多个模型选择最佳
评估指标 使用合适的业务指标

参考资料

  • Géron, A. (2019). “Hands-On Machine Learning with Scikit-Learn, Keras, and TensorFlow”
  • Scikit-learn官方文档
  • Kaggle竞赛经验总结
  • MLOps最佳实践指南

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:《 机器学习基础系列——实战项目 》

本文链接:http://localhost:3015/ai/%E5%AE%9E%E6%88%98%E9%A1%B9%E7%9B%AE.html

本文最后一次更新为 天前,文章中的某些内容可能已过时!