已经是最新一篇文章了!
已经是最后一篇文章了!
从数据到部署的完整机器学习流程
前言
本文将通过一个完整的实战项目,综合运用机器学习基础系列中学习的所有知识,涵盖从数据处理到模型部署的全流程。
项目概述
项目目标
构建一个房价预测系统,包括:
- 数据探索与预处理
- 特征工程
- 模型选择与训练
- 超参数调优
- 模型评估与解释
- 模型保存与部署
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score
np.random.seed(42)
# 生成模拟房价数据
def generate_housing_data(n_samples=1000):
"""生成模拟房价数据集"""
data = {
'area': np.random.uniform(50, 300, n_samples), # 面积
'rooms': np.random.randint(1, 6, n_samples), # 房间数
'age': np.random.uniform(0, 50, n_samples), # 房龄
'distance_center': np.random.uniform(1, 30, n_samples), # 距市中心距离
'floor': np.random.randint(1, 30, n_samples), # 楼层
'has_elevator': np.random.choice([0, 1], n_samples), # 是否有电梯
'has_parking': np.random.choice([0, 1], n_samples), # 是否有停车位
'school_rating': np.random.uniform(1, 10, n_samples) # 学区评分
}
# 生成价格(带有一定的噪声)
price = (
data['area'] * 100 +
data['rooms'] * 5000 +
(50 - data['age']) * 200 +
(30 - data['distance_center']) * 1000 +
data['has_elevator'] * 3000 +
data['has_parking'] * 5000 +
data['school_rating'] * 2000 +
np.random.normal(0, 5000, n_samples)
)
data['price'] = price
return pd.DataFrame(data)
# 生成数据
df = generate_housing_data(1000)
print("数据集形状:", df.shape)
print("\n数据预览:")
print(df.head())
print("\n数据统计:")
print(df.describe())
数据探索分析
数据分布可视化
def explore_data(df):
"""探索性数据分析"""
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
axes = axes.flatten()
for i, col in enumerate(df.columns[:-1]): # 排除价格
ax = axes[i]
ax.hist(df[col], bins=30, edgecolor='black', alpha=0.7)
ax.set_title(f'{col}分布')
ax.set_xlabel(col)
ax.set_ylabel('频数')
plt.tight_layout()
plt.show()
# 目标变量分布
fig, ax = plt.subplots(figsize=(10, 6))
ax.hist(df['price'], bins=50, edgecolor='black', alpha=0.7, color='coral')
ax.set_title('房价分布')
ax.set_xlabel('价格')
ax.set_ylabel('频数')
plt.show()
explore_data(df)
相关性分析
def correlation_analysis(df):
"""相关性分析"""
corr = df.corr()
fig, ax = plt.subplots(figsize=(10, 8))
im = ax.imshow(corr, cmap='coolwarm', aspect='auto')
ax.set_xticks(range(len(corr.columns)))
ax.set_yticks(range(len(corr.columns)))
ax.set_xticklabels(corr.columns, rotation=45, ha='right')
ax.set_yticklabels(corr.columns)
# 添加相关系数数值
for i in range(len(corr)):
for j in range(len(corr)):
ax.text(j, i, f'{corr.iloc[i, j]:.2f}', ha='center', va='center',
color='white' if abs(corr.iloc[i, j]) > 0.5 else 'black')
plt.colorbar(im)
plt.title('特征相关性热力图')
plt.tight_layout()
plt.show()
# 与价格的相关性
price_corr = corr['price'].drop('price').sort_values(ascending=False)
print("与价格的相关性:")
print(price_corr)
correlation_analysis(df)
特征工程
特征创建与转换
def feature_engineering(df):
"""特征工程"""
df_new = df.copy()
# 创建新特征
df_new['area_per_room'] = df_new['area'] / df_new['rooms'] # 每房间面积
df_new['price_factor'] = df_new['area'] * (50 - df_new['age']) # 价格因子
df_new['convenience_score'] = (
df_new['has_elevator'] +
df_new['has_parking'] +
(10 - df_new['distance_center']) / 10
) # 便利性得分
# 分箱特征
df_new['age_group'] = pd.cut(df_new['age'], bins=[0, 10, 20, 30, 50],
labels=['新房', '较新', '一般', '老旧'])
# One-hot编码
df_new = pd.get_dummies(df_new, columns=['age_group'], prefix='age')
print("新增特征:")
print(df_new[['area_per_room', 'price_factor', 'convenience_score']].head())
return df_new
df_featured = feature_engineering(df)
print("\n特征工程后数据形状:", df_featured.shape)
数据预处理
数据分割与标准化
def preprocess_data(df):
"""数据预处理"""
# 分离特征和目标
X = df.drop('price', axis=1)
y = df['price']
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
print(f"训练集: {X_train_scaled.shape}")
print(f"测试集: {X_test_scaled.shape}")
return X_train_scaled, X_test_scaled, y_train.values, y_test.values, scaler, X.columns
X_train, X_test, y_train, y_test, scaler, feature_names = preprocess_data(df_featured)
模型选择与训练
基准模型对比
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.svm import SVR
from sklearn.neighbors import KNeighborsRegressor
def train_and_evaluate_models(X_train, X_test, y_train, y_test):
"""训练和评估多个模型"""
models = {
'Linear Regression': LinearRegression(),
'Ridge': Ridge(alpha=1.0),
'Lasso': Lasso(alpha=1.0),
'ElasticNet': ElasticNet(alpha=1.0, l1_ratio=0.5),
'Decision Tree': DecisionTreeRegressor(max_depth=10, random_state=42),
'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=42),
'KNN': KNeighborsRegressor(n_neighbors=5)
}
results = []
for name, model in models.items():
# 训练
model.fit(X_train, y_train)
# 预测
y_pred = model.predict(X_test)
# 评估
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_test, y_pred)
results.append({
'Model': name,
'RMSE': rmse,
'R2': r2
})
print(f"{name}:")
print(f" RMSE: {rmse:.2f}")
print(f" R2: {r2:.4f}")
print()
return pd.DataFrame(results), models
results_df, trained_models = train_and_evaluate_models(X_train, X_test, y_train, y_test)
模型对比可视化
def visualize_model_comparison(results_df):
"""可视化模型对比"""
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# RMSE对比
ax = axes[0]
bars = ax.barh(results_df['Model'], results_df['RMSE'], color='steelblue')
ax.set_xlabel('RMSE')
ax.set_title('模型RMSE对比 (越低越好)')
ax.invert_yaxis()
# R2对比
ax = axes[1]
bars = ax.barh(results_df['Model'], results_df['R2'], color='coral')
ax.set_xlabel('R2 Score')
ax.set_title('模型R2对比 (越高越好)')
ax.invert_yaxis()
ax.set_xlim(0, 1)
plt.tight_layout()
plt.show()
visualize_model_comparison(results_df)
超参数调优
使用交叉验证网格搜索
from sklearn.model_selection import GridSearchCV, cross_val_score
def hyperparameter_tuning(X_train, y_train):
"""超参数调优"""
# 选择表现最好的模型进行调优(以Random Forest为例)
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [5, 10, 15, None],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4]
}
rf = RandomForestRegressor(random_state=42)
grid_search = GridSearchCV(
rf, param_grid, cv=5,
scoring='neg_root_mean_squared_error',
n_jobs=-1, verbose=1
)
grid_search.fit(X_train, y_train)
print("\n最佳参数:")
print(grid_search.best_params_)
print(f"\n最佳交叉验证RMSE: {-grid_search.best_score_:.2f}")
return grid_search.best_estimator_
best_model = hyperparameter_tuning(X_train, y_train)
模型评估
详细评估最佳模型
def detailed_evaluation(model, X_test, y_test):
"""详细评估模型"""
y_pred = model.predict(X_test)
# 计算各项指标
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mae = np.mean(np.abs(y_test - y_pred))
r2 = r2_score(y_test, y_pred)
mape = np.mean(np.abs((y_test - y_pred) / y_test)) * 100
print("模型评估结果:")
print(f" MSE: {mse:.2f}")
print(f" RMSE: {rmse:.2f}")
print(f" MAE: {mae:.2f}")
print(f" R2: {r2:.4f}")
print(f" MAPE: {mape:.2f}%")
# 可视化
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# 预测vs实际
ax = axes[0]
ax.scatter(y_test, y_pred, alpha=0.5)
ax.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
ax.set_xlabel('实际值')
ax.set_ylabel('预测值')
ax.set_title('预测值 vs 实际值')
# 残差分布
ax = axes[1]
residuals = y_test - y_pred
ax.hist(residuals, bins=30, edgecolor='black', alpha=0.7)
ax.axvline(x=0, color='r', linestyle='--')
ax.set_xlabel('残差')
ax.set_ylabel('频数')
ax.set_title('残差分布')
# 残差vs预测值
ax = axes[2]
ax.scatter(y_pred, residuals, alpha=0.5)
ax.axhline(y=0, color='r', linestyle='--')
ax.set_xlabel('预测值')
ax.set_ylabel('残差')
ax.set_title('残差 vs 预测值')
plt.tight_layout()
plt.show()
return {'mse': mse, 'rmse': rmse, 'mae': mae, 'r2': r2, 'mape': mape}
metrics = detailed_evaluation(best_model, X_test, y_test)
特征重要性分析
可视化特征重要性
def feature_importance_analysis(model, feature_names):
"""特征重要性分析"""
importance = model.feature_importances_
# 排序
indices = np.argsort(importance)[::-1]
fig, ax = plt.subplots(figsize=(12, 8))
# 只显示前15个特征
top_n = min(15, len(feature_names))
ax.barh(range(top_n), importance[indices[:top_n]], color='steelblue')
ax.set_yticks(range(top_n))
ax.set_yticklabels([feature_names[i] for i in indices[:top_n]])
ax.set_xlabel('重要性')
ax.set_title('特征重要性排名')
ax.invert_yaxis()
plt.tight_layout()
plt.show()
print("\n特征重要性排名:")
for i in range(top_n):
print(f" {feature_names[indices[i]]}: {importance[indices[i]]:.4f}")
feature_importance_analysis(best_model, feature_names)
模型保存与加载
使用joblib保存模型
import joblib
import os
def save_model(model, scaler, feature_names, save_dir='models'):
"""保存模型和预处理器"""
if not os.path.exists(save_dir):
os.makedirs(save_dir)
# 保存模型
joblib.dump(model, os.path.join(save_dir, 'best_model.pkl'))
# 保存scaler
joblib.dump(scaler, os.path.join(save_dir, 'scaler.pkl'))
# 保存特征名
joblib.dump(feature_names.tolist(), os.path.join(save_dir, 'feature_names.pkl'))
print(f"模型已保存到 {save_dir}/")
def load_model(save_dir='models'):
"""加载模型和预处理器"""
model = joblib.load(os.path.join(save_dir, 'best_model.pkl'))
scaler = joblib.load(os.path.join(save_dir, 'scaler.pkl'))
feature_names = joblib.load(os.path.join(save_dir, 'feature_names.pkl'))
return model, scaler, feature_names
# 保存模型
save_model(best_model, scaler, feature_names)
模型部署预测
创建预测函数
class HousePricePredictor:
"""房价预测器"""
def __init__(self, model_dir='models'):
self.model, self.scaler, self.feature_names = load_model(model_dir)
def predict(self, features_dict):
"""
预测房价
参数:
features_dict: 特征字典
返回:
预测价格
"""
# 创建特征数组
features = []
for name in self.feature_names:
if name in features_dict:
features.append(features_dict[name])
else:
features.append(0) # 默认值
features = np.array(features).reshape(1, -1)
# 标准化
features_scaled = self.scaler.transform(features)
# 预测
price = self.model.predict(features_scaled)[0]
return price
def predict_batch(self, df):
"""批量预测"""
# 确保特征顺序正确
X = df[self.feature_names].values
X_scaled = self.scaler.transform(X)
return self.model.predict(X_scaled)
# 使用示例
predictor = HousePricePredictor()
# 单个预测
sample_house = {
'area': 120,
'rooms': 3,
'age': 5,
'distance_center': 10,
'floor': 15,
'has_elevator': 1,
'has_parking': 1,
'school_rating': 8
}
# 注意:需要添加特征工程生成的特征
sample_house['area_per_room'] = sample_house['area'] / sample_house['rooms']
sample_house['price_factor'] = sample_house['area'] * (50 - sample_house['age'])
sample_house['convenience_score'] = (
sample_house['has_elevator'] +
sample_house['has_parking'] +
(10 - sample_house['distance_center']) / 10
)
# 添加one-hot编码特征
for col in ['age_新房', 'age_较新', 'age_一般', 'age_老旧']:
if col in predictor.feature_names:
if col == 'age_新房' and sample_house['age'] <= 10:
sample_house[col] = 1
else:
sample_house[col] = 0
predicted_price = predictor.predict(sample_house)
print(f"\n预测房价: {predicted_price:.2f}")
项目总结
完整流程回顾
| 阶段 | 内容 | 要点 |
|---|---|---|
| 数据探索 | EDA、可视化 | 了解数据分布和特征 |
| 特征工程 | 创建新特征 | 领域知识很重要 |
| 数据预处理 | 分割、标准化 | 防止数据泄露 |
| 模型选择 | 多模型对比 | 从简单到复杂 |
| 超参数调优 | GridSearch | 使用交叉验证 |
| 模型评估 | 多指标评估 | 关注业务指标 |
| 模型部署 | 保存与加载 | 考虑线上环境 |
常见问题
Q1: 特征工程如何做?
- 领域知识驱动
- 统计分析发现
- 自动化特征生成
Q2: 如何选择评估指标?
根据业务需求:
- 回归:RMSE、MAE、R2
- 分类:准确率、AUC、F1
Q3: 模型过拟合怎么办?
- 增加正则化
- 减少模型复杂度
- 增加训练数据
- 使用集成方法
Q4: 线上部署需要注意什么?
- 特征一致性
- 模型版本管理
- 监控预测质量
- 处理异常输入
总结
| 概念 | 描述 |
|---|---|
| 端到端流程 | 从数据到部署的完整流程 |
| 特征工程 | 创造有意义的特征 |
| 模型选择 | 对比多个模型选择最佳 |
| 评估指标 | 使用合适的业务指标 |
参考资料
- Géron, A. (2019). “Hands-On Machine Learning with Scikit-Learn, Keras, and TensorFlow”
- Scikit-learn官方文档
- Kaggle竞赛经验总结
- MLOps最佳实践指南
版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。
(采用 CC BY-NC-SA 4.0 许可协议进行授权)
本文标题:《 机器学习基础系列——实战项目 》
本文链接:http://localhost:3015/ai/%E5%AE%9E%E6%88%98%E9%A1%B9%E7%9B%AE.html
本文最后一次更新为 天前,文章中的某些内容可能已过时!