Q-learning与Policy Gradient详解

前言

强化学习(RL)让智能体通过与环境交互来学习最优策略。本文介绍强化学习的基本概念、Q-learning和Policy Gradient方法。


强化学习基础

核心概念

import numpy as np
import matplotlib.pyplot as plt

np.random.seed(42)

# 强化学习核心要素
print("强化学习核心概念:")
print("=" * 50)
print("• 智能体 (Agent): 学习和决策的主体")
print("• 环境 (Environment): 智能体交互的外部世界")
print("• 状态 (State): 环境的描述")
print("• 动作 (Action): 智能体可执行的操作")
print("• 奖励 (Reward): 环境对动作的反馈")
print("• 策略 (Policy): 状态到动作的映射")
print("• 价值函数 (Value): 状态/动作的长期价值")

马尔可夫决策过程(MDP)

class SimpleMDP:
    """简单MDP环境"""
    
    def __init__(self):
        # 5个状态的网格世界
        self.n_states = 5
        self.n_actions = 2  # 左=0, 右=1
        self.goal_state = 4
        self.state = 0
        
        # 转移概率(简化为确定性)
        self.transitions = {
            # (state, action): next_state
            (0, 0): 0, (0, 1): 1,
            (1, 0): 0, (1, 1): 2,
            (2, 0): 1, (2, 1): 3,
            (3, 0): 2, (3, 1): 4,
            (4, 0): 4, (4, 1): 4,  # 终止状态
        }
        
        # 奖励
        self.rewards = {4: 10}  # 只有到达目标有奖励
    
    def reset(self):
        self.state = 0
        return self.state
    
    def step(self, action):
        next_state = self.transitions[(self.state, action)]
        reward = self.rewards.get(next_state, -1)  # 每步-1惩罚
        done = next_state == self.goal_state
        self.state = next_state
        return next_state, reward, done
    
    def render(self):
        grid = ['_'] * self.n_states
        grid[self.state] = 'A'
        grid[self.goal_state] = 'G' if self.state != self.goal_state else 'A'
        print(' '.join(grid))

# 测试环境
env = SimpleMDP()
env.reset()
print("初始状态:")
env.render()

for action in [1, 1, 1, 1]:  # 一直向右
    state, reward, done = env.step(action)
    print(f"动作: {'' if action else ''}, 奖励: {reward}")
    env.render()
    if done:
        print("到达目标!")
        break

Q-Learning

算法原理

Q-learning更新规则:

\[Q(s, a) \leftarrow Q(s, a) + \alpha [r + \gamma \max_{a'} Q(s', a') - Q(s, a)]\]
class QLearningAgent:
    """Q-Learning智能体"""
    
    def __init__(self, n_states, n_actions, learning_rate=0.1, 
                 discount_factor=0.99, epsilon=0.1):
        self.n_states = n_states
        self.n_actions = n_actions
        self.lr = learning_rate
        self.gamma = discount_factor
        self.epsilon = epsilon
        
        # Q表初始化
        self.Q = np.zeros((n_states, n_actions))
    
    def choose_action(self, state):
        """ε-贪婪策略"""
        if np.random.random() < self.epsilon:
            return np.random.randint(self.n_actions)
        return np.argmax(self.Q[state])
    
    def update(self, state, action, reward, next_state, done):
        """更新Q值"""
        if done:
            target = reward
        else:
            target = reward + self.gamma * np.max(self.Q[next_state])
        
        self.Q[state, action] += self.lr * (target - self.Q[state, action])
    
    def get_policy(self):
        """获取最优策略"""
        return np.argmax(self.Q, axis=1)


def train_q_learning(env, agent, episodes=500):
    """训练Q-learning"""
    
    rewards_history = []
    
    for episode in range(episodes):
        state = env.reset()
        total_reward = 0
        
        for step in range(100):  # 最大步数
            action = agent.choose_action(state)
            next_state, reward, done = env.step(action)
            
            agent.update(state, action, reward, next_state, done)
            
            total_reward += reward
            state = next_state
            
            if done:
                break
        
        rewards_history.append(total_reward)
        
        if (episode + 1) % 100 == 0:
            avg_reward = np.mean(rewards_history[-100:])
            print(f"Episode {episode+1}, 平均奖励: {avg_reward:.2f}")
    
    return rewards_history

# 训练
env = SimpleMDP()
agent = QLearningAgent(n_states=5, n_actions=2)
rewards = train_q_learning(env, agent)

print("\nQ表:")
print(agent.Q)
print("\n最优策略 (0=左, 1=右):")
print(agent.get_policy())

可视化训练过程

def plot_training(rewards, window=50):
    """可视化训练曲线"""
    
    fig, ax = plt.subplots(figsize=(10, 5))
    
    # 原始奖励
    ax.plot(rewards, alpha=0.3, label='每回合奖励')
    
    # 滑动平均
    if len(rewards) >= window:
        moving_avg = np.convolve(rewards, np.ones(window)/window, mode='valid')
        ax.plot(range(window-1, len(rewards)), moving_avg, 
               label=f'{window}回合滑动平均')
    
    ax.set_xlabel('回合')
    ax.set_ylabel('累积奖励')
    ax.set_title('Q-Learning训练曲线')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

plot_training(rewards)

SARSA

算法对比

class SARSAAgent:
    """SARSA智能体(On-policy)"""
    
    def __init__(self, n_states, n_actions, learning_rate=0.1,
                 discount_factor=0.99, epsilon=0.1):
        self.n_states = n_states
        self.n_actions = n_actions
        self.lr = learning_rate
        self.gamma = discount_factor
        self.epsilon = epsilon
        self.Q = np.zeros((n_states, n_actions))
    
    def choose_action(self, state):
        if np.random.random() < self.epsilon:
            return np.random.randint(self.n_actions)
        return np.argmax(self.Q[state])
    
    def update(self, state, action, reward, next_state, next_action, done):
        """SARSA更新:使用实际执行的下一个动作"""
        if done:
            target = reward
        else:
            # 与Q-learning的区别:使用next_action而非max
            target = reward + self.gamma * self.Q[next_state, next_action]
        
        self.Q[state, action] += self.lr * (target - self.Q[state, action])

# Q-learning vs SARSA
print("Q-Learning vs SARSA:")
print("=" * 50)
print("Q-Learning (Off-policy):")
print("  • 使用 max Q(s', a') 更新")
print("  • 学习最优策略")
print("  • 可能更激进")
print()
print("SARSA (On-policy):")
print("  • 使用 Q(s', a') 其中a'是实际动作")
print("  • 学习当前策略的价值")
print("  • 更保守,考虑探索")

Deep Q-Network (DQN)

DQN实现

try:
    import torch
    import torch.nn as nn
    import torch.optim as optim
    from collections import deque
    import random
    
    class DQN(nn.Module):
        """深度Q网络"""
        
        def __init__(self, state_dim, action_dim, hidden_dim=64):
            super().__init__()
            self.network = nn.Sequential(
                nn.Linear(state_dim, hidden_dim),
                nn.ReLU(),
                nn.Linear(hidden_dim, hidden_dim),
                nn.ReLU(),
                nn.Linear(hidden_dim, action_dim)
            )
        
        def forward(self, x):
            return self.network(x)
    
    
    class ReplayBuffer:
        """经验回放缓冲区"""
        
        def __init__(self, capacity=10000):
            self.buffer = deque(maxlen=capacity)
        
        def push(self, state, action, reward, next_state, done):
            self.buffer.append((state, action, reward, next_state, done))
        
        def sample(self, batch_size):
            batch = random.sample(self.buffer, batch_size)
            states, actions, rewards, next_states, dones = zip(*batch)
            return (np.array(states), np.array(actions), np.array(rewards),
                   np.array(next_states), np.array(dones))
        
        def __len__(self):
            return len(self.buffer)
    
    
    class DQNAgent:
        """DQN智能体"""
        
        def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99, 
                     epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
            self.action_dim = action_dim
            self.gamma = gamma
            self.epsilon = epsilon
            self.epsilon_decay = epsilon_decay
            self.epsilon_min = epsilon_min
            
            self.q_network = DQN(state_dim, action_dim)
            self.target_network = DQN(state_dim, action_dim)
            self.target_network.load_state_dict(self.q_network.state_dict())
            
            self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
            self.buffer = ReplayBuffer()
        
        def choose_action(self, state):
            if np.random.random() < self.epsilon:
                return np.random.randint(self.action_dim)
            
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            with torch.no_grad():
                q_values = self.q_network(state_tensor)
            return q_values.argmax().item()
        
        def train_step(self, batch_size=32):
            if len(self.buffer) < batch_size:
                return
            
            states, actions, rewards, next_states, dones = self.buffer.sample(batch_size)
            
            states = torch.FloatTensor(states)
            actions = torch.LongTensor(actions)
            rewards = torch.FloatTensor(rewards)
            next_states = torch.FloatTensor(next_states)
            dones = torch.FloatTensor(dones)
            
            # 当前Q值
            current_q = self.q_network(states).gather(1, actions.unsqueeze(1))
            
            # 目标Q值
            with torch.no_grad():
                next_q = self.target_network(next_states).max(1)[0]
                target_q = rewards + self.gamma * next_q * (1 - dones)
            
            # 损失
            loss = nn.MSELoss()(current_q.squeeze(), target_q)
            
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            
            # 衰减epsilon
            self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
            
            return loss.item()
        
        def update_target(self):
            self.target_network.load_state_dict(self.q_network.state_dict())
    
    print("DQN实现完成")
    print("关键技术:")
    print("  • 经验回放: 打破数据相关性")
    print("  • 目标网络: 稳定训练")
    print("  • ε-greedy: 探索与利用平衡")
    
except ImportError:
    print("PyTorch未安装")

Policy Gradient

REINFORCE算法

try:
    class PolicyNetwork(nn.Module):
        """策略网络"""
        
        def __init__(self, state_dim, action_dim, hidden_dim=64):
            super().__init__()
            self.network = nn.Sequential(
                nn.Linear(state_dim, hidden_dim),
                nn.ReLU(),
                nn.Linear(hidden_dim, action_dim),
                nn.Softmax(dim=-1)
            )
        
        def forward(self, x):
            return self.network(x)
    
    
    class REINFORCEAgent:
        """REINFORCE智能体"""
        
        def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99):
            self.gamma = gamma
            self.policy = PolicyNetwork(state_dim, action_dim)
            self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
            
            self.log_probs = []
            self.rewards = []
        
        def choose_action(self, state):
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            probs = self.policy(state_tensor)
            
            # 从分布中采样
            dist = torch.distributions.Categorical(probs)
            action = dist.sample()
            
            self.log_probs.append(dist.log_prob(action))
            
            return action.item()
        
        def store_reward(self, reward):
            self.rewards.append(reward)
        
        def update(self):
            # 计算回报
            returns = []
            G = 0
            for r in reversed(self.rewards):
                G = r + self.gamma * G
                returns.insert(0, G)
            
            returns = torch.tensor(returns)
            returns = (returns - returns.mean()) / (returns.std() + 1e-8)
            
            # 计算损失
            policy_loss = []
            for log_prob, G in zip(self.log_probs, returns):
                policy_loss.append(-log_prob * G)
            
            loss = torch.stack(policy_loss).sum()
            
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            
            # 清空
            self.log_probs = []
            self.rewards = []
            
            return loss.item()
    
    print("REINFORCE算法:")
    print("  目标: 最大化期望回报")
    print("  梯度: ∇J(θ) = E[∇log π(a|s) * G]")
    print("  特点: 无需值函数,直接优化策略")
    
except NameError:
    print("需要先导入PyTorch")

Actor-Critic

try:
    class ActorCritic(nn.Module):
        """Actor-Critic网络"""
        
        def __init__(self, state_dim, action_dim, hidden_dim=64):
            super().__init__()
            
            # 共享层
            self.shared = nn.Sequential(
                nn.Linear(state_dim, hidden_dim),
                nn.ReLU()
            )
            
            # Actor(策略)
            self.actor = nn.Sequential(
                nn.Linear(hidden_dim, action_dim),
                nn.Softmax(dim=-1)
            )
            
            # Critic(价值)
            self.critic = nn.Linear(hidden_dim, 1)
        
        def forward(self, x):
            shared = self.shared(x)
            policy = self.actor(shared)
            value = self.critic(shared)
            return policy, value
    
    print("Actor-Critic:")
    print("  • Actor: 学习策略 π(a|s)")
    print("  • Critic: 估计价值 V(s)")
    print("  • 优势: 使用优势函数减少方差")
    print("  • A(s,a) = Q(s,a) - V(s) ≈ r + γV(s') - V(s)")
    
except NameError:
    print("需要先导入PyTorch")

算法对比

算法 类型 特点
Q-Learning 值函数,Off-policy 简单,表格方法
SARSA 值函数,On-policy 保守,安全
DQN 深度值函数 处理高维状态
REINFORCE 策略梯度 直接优化策略
Actor-Critic 混合 低方差,高效
PPO 策略梯度 稳定,广泛使用

常见问题

Q1: On-policy和Off-policy的区别?

  • On-policy:用当前策略采集数据并学习
  • Off-policy:可以用其他策略的数据学习

Q2: 为什么需要探索?

平衡探索(发现新状态)和利用(使用已知最优)。

Q3: 奖励稀疏问题如何解决?

  • 奖励塑造
  • 课程学习
  • 内在动机

Q4: 如何选择RL算法?

  • 离散动作:DQN
  • 连续动作:PPO, SAC
  • 需要样本效率:Off-policy
  • 需要稳定性:PPO

总结

概念 描述
值函数 估计状态/动作的长期价值
策略 从状态到动作的映射
TD学习 时序差分,逐步更新
策略梯度 直接优化策略参数

参考资料

  • Sutton, R. & Barto, A. (2018). “Reinforcement Learning: An Introduction”
  • Mnih, V. et al. (2015). “Human-level control through deep reinforcement learning”
  • Schulman, J. et al. (2017). “Proximal Policy Optimization Algorithms”
  • OpenAI Spinning Up教程

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:《 机器学习基础系列——强化学习基础 》

本文链接:http://localhost:3015/ai/%E5%BC%BA%E5%8C%96%E5%AD%A6%E4%B9%A0%E5%9F%BA%E7%A1%80.html

本文最后一次更新为 天前,文章中的某些内容可能已过时!