已经是最新一篇文章了!
已经是最后一篇文章了!
Q-learning与Policy Gradient详解
前言
强化学习(RL)让智能体通过与环境交互来学习最优策略。本文介绍强化学习的基本概念、Q-learning和Policy Gradient方法。
强化学习基础
核心概念
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
# 强化学习核心要素
print("强化学习核心概念:")
print("=" * 50)
print("• 智能体 (Agent): 学习和决策的主体")
print("• 环境 (Environment): 智能体交互的外部世界")
print("• 状态 (State): 环境的描述")
print("• 动作 (Action): 智能体可执行的操作")
print("• 奖励 (Reward): 环境对动作的反馈")
print("• 策略 (Policy): 状态到动作的映射")
print("• 价值函数 (Value): 状态/动作的长期价值")
马尔可夫决策过程(MDP)
class SimpleMDP:
"""简单MDP环境"""
def __init__(self):
# 5个状态的网格世界
self.n_states = 5
self.n_actions = 2 # 左=0, 右=1
self.goal_state = 4
self.state = 0
# 转移概率(简化为确定性)
self.transitions = {
# (state, action): next_state
(0, 0): 0, (0, 1): 1,
(1, 0): 0, (1, 1): 2,
(2, 0): 1, (2, 1): 3,
(3, 0): 2, (3, 1): 4,
(4, 0): 4, (4, 1): 4, # 终止状态
}
# 奖励
self.rewards = {4: 10} # 只有到达目标有奖励
def reset(self):
self.state = 0
return self.state
def step(self, action):
next_state = self.transitions[(self.state, action)]
reward = self.rewards.get(next_state, -1) # 每步-1惩罚
done = next_state == self.goal_state
self.state = next_state
return next_state, reward, done
def render(self):
grid = ['_'] * self.n_states
grid[self.state] = 'A'
grid[self.goal_state] = 'G' if self.state != self.goal_state else 'A'
print(' '.join(grid))
# 测试环境
env = SimpleMDP()
env.reset()
print("初始状态:")
env.render()
for action in [1, 1, 1, 1]: # 一直向右
state, reward, done = env.step(action)
print(f"动作: {'右' if action else '左'}, 奖励: {reward}")
env.render()
if done:
print("到达目标!")
break
Q-Learning
算法原理
Q-learning更新规则:
\[Q(s, a) \leftarrow Q(s, a) + \alpha [r + \gamma \max_{a'} Q(s', a') - Q(s, a)]\]class QLearningAgent:
"""Q-Learning智能体"""
def __init__(self, n_states, n_actions, learning_rate=0.1,
discount_factor=0.99, epsilon=0.1):
self.n_states = n_states
self.n_actions = n_actions
self.lr = learning_rate
self.gamma = discount_factor
self.epsilon = epsilon
# Q表初始化
self.Q = np.zeros((n_states, n_actions))
def choose_action(self, state):
"""ε-贪婪策略"""
if np.random.random() < self.epsilon:
return np.random.randint(self.n_actions)
return np.argmax(self.Q[state])
def update(self, state, action, reward, next_state, done):
"""更新Q值"""
if done:
target = reward
else:
target = reward + self.gamma * np.max(self.Q[next_state])
self.Q[state, action] += self.lr * (target - self.Q[state, action])
def get_policy(self):
"""获取最优策略"""
return np.argmax(self.Q, axis=1)
def train_q_learning(env, agent, episodes=500):
"""训练Q-learning"""
rewards_history = []
for episode in range(episodes):
state = env.reset()
total_reward = 0
for step in range(100): # 最大步数
action = agent.choose_action(state)
next_state, reward, done = env.step(action)
agent.update(state, action, reward, next_state, done)
total_reward += reward
state = next_state
if done:
break
rewards_history.append(total_reward)
if (episode + 1) % 100 == 0:
avg_reward = np.mean(rewards_history[-100:])
print(f"Episode {episode+1}, 平均奖励: {avg_reward:.2f}")
return rewards_history
# 训练
env = SimpleMDP()
agent = QLearningAgent(n_states=5, n_actions=2)
rewards = train_q_learning(env, agent)
print("\nQ表:")
print(agent.Q)
print("\n最优策略 (0=左, 1=右):")
print(agent.get_policy())
可视化训练过程
def plot_training(rewards, window=50):
"""可视化训练曲线"""
fig, ax = plt.subplots(figsize=(10, 5))
# 原始奖励
ax.plot(rewards, alpha=0.3, label='每回合奖励')
# 滑动平均
if len(rewards) >= window:
moving_avg = np.convolve(rewards, np.ones(window)/window, mode='valid')
ax.plot(range(window-1, len(rewards)), moving_avg,
label=f'{window}回合滑动平均')
ax.set_xlabel('回合')
ax.set_ylabel('累积奖励')
ax.set_title('Q-Learning训练曲线')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
plot_training(rewards)
SARSA
算法对比
class SARSAAgent:
"""SARSA智能体(On-policy)"""
def __init__(self, n_states, n_actions, learning_rate=0.1,
discount_factor=0.99, epsilon=0.1):
self.n_states = n_states
self.n_actions = n_actions
self.lr = learning_rate
self.gamma = discount_factor
self.epsilon = epsilon
self.Q = np.zeros((n_states, n_actions))
def choose_action(self, state):
if np.random.random() < self.epsilon:
return np.random.randint(self.n_actions)
return np.argmax(self.Q[state])
def update(self, state, action, reward, next_state, next_action, done):
"""SARSA更新:使用实际执行的下一个动作"""
if done:
target = reward
else:
# 与Q-learning的区别:使用next_action而非max
target = reward + self.gamma * self.Q[next_state, next_action]
self.Q[state, action] += self.lr * (target - self.Q[state, action])
# Q-learning vs SARSA
print("Q-Learning vs SARSA:")
print("=" * 50)
print("Q-Learning (Off-policy):")
print(" • 使用 max Q(s', a') 更新")
print(" • 学习最优策略")
print(" • 可能更激进")
print()
print("SARSA (On-policy):")
print(" • 使用 Q(s', a') 其中a'是实际动作")
print(" • 学习当前策略的价值")
print(" • 更保守,考虑探索")
Deep Q-Network (DQN)
DQN实现
try:
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
class DQN(nn.Module):
"""深度Q网络"""
def __init__(self, state_dim, action_dim, hidden_dim=64):
super().__init__()
self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim)
)
def forward(self, x):
return self.network(x)
class ReplayBuffer:
"""经验回放缓冲区"""
def __init__(self, capacity=10000):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
return (np.array(states), np.array(actions), np.array(rewards),
np.array(next_states), np.array(dones))
def __len__(self):
return len(self.buffer)
class DQNAgent:
"""DQN智能体"""
def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99,
epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
self.action_dim = action_dim
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_min = epsilon_min
self.q_network = DQN(state_dim, action_dim)
self.target_network = DQN(state_dim, action_dim)
self.target_network.load_state_dict(self.q_network.state_dict())
self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
self.buffer = ReplayBuffer()
def choose_action(self, state):
if np.random.random() < self.epsilon:
return np.random.randint(self.action_dim)
state_tensor = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
q_values = self.q_network(state_tensor)
return q_values.argmax().item()
def train_step(self, batch_size=32):
if len(self.buffer) < batch_size:
return
states, actions, rewards, next_states, dones = self.buffer.sample(batch_size)
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions)
rewards = torch.FloatTensor(rewards)
next_states = torch.FloatTensor(next_states)
dones = torch.FloatTensor(dones)
# 当前Q值
current_q = self.q_network(states).gather(1, actions.unsqueeze(1))
# 目标Q值
with torch.no_grad():
next_q = self.target_network(next_states).max(1)[0]
target_q = rewards + self.gamma * next_q * (1 - dones)
# 损失
loss = nn.MSELoss()(current_q.squeeze(), target_q)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# 衰减epsilon
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
return loss.item()
def update_target(self):
self.target_network.load_state_dict(self.q_network.state_dict())
print("DQN实现完成")
print("关键技术:")
print(" • 经验回放: 打破数据相关性")
print(" • 目标网络: 稳定训练")
print(" • ε-greedy: 探索与利用平衡")
except ImportError:
print("PyTorch未安装")
Policy Gradient
REINFORCE算法
try:
class PolicyNetwork(nn.Module):
"""策略网络"""
def __init__(self, state_dim, action_dim, hidden_dim=64):
super().__init__()
self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim),
nn.Softmax(dim=-1)
)
def forward(self, x):
return self.network(x)
class REINFORCEAgent:
"""REINFORCE智能体"""
def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99):
self.gamma = gamma
self.policy = PolicyNetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
self.log_probs = []
self.rewards = []
def choose_action(self, state):
state_tensor = torch.FloatTensor(state).unsqueeze(0)
probs = self.policy(state_tensor)
# 从分布中采样
dist = torch.distributions.Categorical(probs)
action = dist.sample()
self.log_probs.append(dist.log_prob(action))
return action.item()
def store_reward(self, reward):
self.rewards.append(reward)
def update(self):
# 计算回报
returns = []
G = 0
for r in reversed(self.rewards):
G = r + self.gamma * G
returns.insert(0, G)
returns = torch.tensor(returns)
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
# 计算损失
policy_loss = []
for log_prob, G in zip(self.log_probs, returns):
policy_loss.append(-log_prob * G)
loss = torch.stack(policy_loss).sum()
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# 清空
self.log_probs = []
self.rewards = []
return loss.item()
print("REINFORCE算法:")
print(" 目标: 最大化期望回报")
print(" 梯度: ∇J(θ) = E[∇log π(a|s) * G]")
print(" 特点: 无需值函数,直接优化策略")
except NameError:
print("需要先导入PyTorch")
Actor-Critic
try:
class ActorCritic(nn.Module):
"""Actor-Critic网络"""
def __init__(self, state_dim, action_dim, hidden_dim=64):
super().__init__()
# 共享层
self.shared = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU()
)
# Actor(策略)
self.actor = nn.Sequential(
nn.Linear(hidden_dim, action_dim),
nn.Softmax(dim=-1)
)
# Critic(价值)
self.critic = nn.Linear(hidden_dim, 1)
def forward(self, x):
shared = self.shared(x)
policy = self.actor(shared)
value = self.critic(shared)
return policy, value
print("Actor-Critic:")
print(" • Actor: 学习策略 π(a|s)")
print(" • Critic: 估计价值 V(s)")
print(" • 优势: 使用优势函数减少方差")
print(" • A(s,a) = Q(s,a) - V(s) ≈ r + γV(s') - V(s)")
except NameError:
print("需要先导入PyTorch")
算法对比
| 算法 | 类型 | 特点 |
|---|---|---|
| Q-Learning | 值函数,Off-policy | 简单,表格方法 |
| SARSA | 值函数,On-policy | 保守,安全 |
| DQN | 深度值函数 | 处理高维状态 |
| REINFORCE | 策略梯度 | 直接优化策略 |
| Actor-Critic | 混合 | 低方差,高效 |
| PPO | 策略梯度 | 稳定,广泛使用 |
常见问题
Q1: On-policy和Off-policy的区别?
- On-policy:用当前策略采集数据并学习
- Off-policy:可以用其他策略的数据学习
Q2: 为什么需要探索?
平衡探索(发现新状态)和利用(使用已知最优)。
Q3: 奖励稀疏问题如何解决?
- 奖励塑造
- 课程学习
- 内在动机
Q4: 如何选择RL算法?
- 离散动作:DQN
- 连续动作:PPO, SAC
- 需要样本效率:Off-policy
- 需要稳定性:PPO
总结
| 概念 | 描述 |
|---|---|
| 值函数 | 估计状态/动作的长期价值 |
| 策略 | 从状态到动作的映射 |
| TD学习 | 时序差分,逐步更新 |
| 策略梯度 | 直接优化策略参数 |
参考资料
- Sutton, R. & Barto, A. (2018). “Reinforcement Learning: An Introduction”
- Mnih, V. et al. (2015). “Human-level control through deep reinforcement learning”
- Schulman, J. et al. (2017). “Proximal Policy Optimization Algorithms”
- OpenAI Spinning Up教程
版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。
(采用 CC BY-NC-SA 4.0 许可协议进行授权)
本文标题:《 机器学习基础系列——强化学习基础 》
本文链接:http://localhost:3015/ai/%E5%BC%BA%E5%8C%96%E5%AD%A6%E4%B9%A0%E5%9F%BA%E7%A1%80.html
本文最后一次更新为 天前,文章中的某些内容可能已过时!